diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 862e5b109e4..7f958076a96 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -23,7 +23,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Winlogbeat* - *Functionbeat* ==== Bugfixes @@ -34,6 +33,10 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d +*Filebeat* + +- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568] + *Auditbeat* diff --git a/libbeat/reader/readfile/bench_test.go b/libbeat/reader/readfile/bench_test.go index b1f6e7667f6..e1d61248848 100644 --- a/libbeat/reader/readfile/bench_test.go +++ b/libbeat/reader/readfile/bench_test.go @@ -20,6 +20,7 @@ package readfile import ( "bytes" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -39,7 +40,7 @@ func BenchmarkEncoderReader(b *testing.B) { b.Run(name, func(b *testing.B) { b.ReportAllocs() for bN := 0; bN < b.N; bN++ { - reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit}) + reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit, false}) if err != nil { b.Fatal("failed to initialize reader:", err) } @@ -48,7 +49,7 @@ func BenchmarkEncoderReader(b *testing.B) { for i := 0; ; i++ { msg, err := reader.Next() if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { b.ReportMetric(float64(i), "processed_lines") break } else { diff --git a/libbeat/reader/readfile/encode.go b/libbeat/reader/readfile/encode.go index 13b5c2d2712..14183aa91dc 100644 --- a/libbeat/reader/readfile/encode.go +++ b/libbeat/reader/readfile/encode.go @@ -40,9 +40,15 @@ type Config struct { BufferSize int Terminator LineTerminator MaxBytes int + // If CollectOnEOF is set to true (default false) the line reader will return the buffer if EOF reached: this + // will ensure full content including last line with no EOL will be returned for fully retrieved content that's + // not appended anymore between reads. + // If CollectOnEOF is set to false the line reader will return 0 content and keep the buffer at the current + // state of appending data after temporarily EOF. + CollectOnEOF bool } -// New creates a new Encode reader from input reader by applying +// NewEncodeReader creates a new Encode reader from input reader by applying // the given codec. func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) { eReader, err := NewLineReader(r, config) diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index d2db172706b..b5932a233f4 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -19,6 +19,7 @@ package readfile import ( "bytes" + "errors" "fmt" "io" @@ -33,18 +34,22 @@ const unlimited = 0 // LineReader reads lines from underlying reader, decoding the input stream // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. +// If collectOnEOF is set to true (default false) it will return the buffer if EOF reached. +// If collectOnEOF is set to false it will return 0 content and keep the buffer at the current +// state of appending data after temporarily EOF. type LineReader struct { - reader io.ReadCloser - maxBytes int // max bytes per line limit to avoid OOM with malformatted files - nl []byte - decodedNl []byte - inBuffer *streambuf.Buffer - outBuffer *streambuf.Buffer - inOffset int // input buffer read offset - byteCount int // number of bytes decoded from input buffer into output buffer - decoder transform.Transformer - tempBuffer []byte - logger *logp.Logger + reader io.ReadCloser + maxBytes int // max bytes per line limit to avoid OOM with malformatted files + nl []byte + decodedNl []byte + collectOnEOF bool + inBuffer *streambuf.Buffer + outBuffer *streambuf.Buffer + inOffset int // input buffer read offset + byteCount int // number of bytes decoded from input buffer into output buffer + decoder transform.Transformer + tempBuffer []byte + logger *logp.Logger } // NewLineReader creates a new reader object @@ -63,15 +68,16 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) { } return &LineReader{ - reader: input, - maxBytes: config.MaxBytes, - decoder: config.Codec.NewDecoder(), - nl: nl, - decodedNl: terminator, - inBuffer: streambuf.New(nil), - outBuffer: streambuf.New(nil), - tempBuffer: make([]byte, config.BufferSize), - logger: logp.NewLogger("reader_line"), + reader: input, + maxBytes: config.MaxBytes, + decoder: config.Codec.NewDecoder(), + nl: nl, + decodedNl: terminator, + collectOnEOF: config.CollectOnEOF, + inBuffer: streambuf.New(nil), + outBuffer: streambuf.New(nil), + tempBuffer: make([]byte, config.BufferSize), + logger: logp.NewLogger("reader_line"), }, nil } @@ -88,9 +94,46 @@ func (r *LineReader) Next() (b []byte, n int, err error) { // read next 'potential' line from input buffer/reader err := r.advance() if err != nil { - return nil, 0, err - } + if errors.Is(err, io.EOF) && r.collectOnEOF { + // Found EOF and collectOnEOF is true + // -> decode input sequence into outBuffer + // let's take whole buffer len without len(nl) if it ends with it + end := r.inBuffer.Len() + if bytes.HasSuffix(r.inBuffer.Bytes(), r.decodedNl) { + end -= len(r.nl) + } + + sz, err := r.decode(end) + if err != nil { + r.logger.Errorf("Error decoding line: %s", err) + // In case of error increase size by unencoded length + sz = r.inBuffer.Len() + } + + // Consume transformed bytes from input buffer + _ = r.inBuffer.Advance(sz) + r.inBuffer.Reset() + // output buffer contains untile EOF. Extract + // byte slice from buffer and reset output buffer. + bytes, err := r.outBuffer.Collect(r.outBuffer.Len()) + r.outBuffer.Reset() + if err != nil { + // This should never happen as otherwise we have a broken state + panic(err) + } + + // return and reset consumed bytes count + sz = r.byteCount + r.byteCount = 0 + return bytes, sz, io.EOF + } + + // return and reset consumed bytes count + sz := r.byteCount + r.byteCount = 0 + return nil, sz, err + } // Check last decoded byte really being newline also unencoded // if not, continue reading buf := r.outBuffer.Bytes() @@ -141,13 +184,13 @@ func (r *LineReader) advance() error { // Try to read more bytes into buffer n, err := r.reader.Read(r.tempBuffer) - if err == io.EOF && n > 0 { + if errors.Is(err, io.EOF) && n > 0 { // Continue processing the returned bytes. The next call will yield EOF with 0 bytes. err = nil } // Write to buffer also in case of err - r.inBuffer.Write(r.tempBuffer[:n]) + _, _ = r.inBuffer.Write(r.tempBuffer[:n]) if err != nil { return err @@ -166,7 +209,7 @@ func (r *LineReader) advance() error { // If newLine is found, drop the lines longer than maxBytes for idx != -1 && idx > r.maxBytes { r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx) - err = r.inBuffer.Advance(idx + len(r.nl)) + _ = r.inBuffer.Advance(idx + len(r.nl)) r.byteCount += idx + len(r.nl) r.inBuffer.Reset() r.inOffset = 0 @@ -234,8 +277,8 @@ func (r *LineReader) skipUntilNewLine() (int, error) { idx = bytes.Index(r.tempBuffer[:n], r.nl) if idx != -1 { - r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n]) - skipped += idx + _, _ = r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n]) + skipped += idx + len(r.nl) } else { skipped += n } @@ -264,8 +307,8 @@ func (r *LineReader) decode(end int) (int, error) { nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false) if err != nil { // Check if error is different from destination buffer too short - if err != transform.ErrShortDst { - r.outBuffer.Write(inBytes[0:end]) + if !(errors.Is(err, transform.ErrShortDst)) { + _, _ = r.outBuffer.Write(inBytes[0:end]) start = end break } @@ -275,7 +318,7 @@ func (r *LineReader) decode(end int) (int, error) { } start += nSrc - r.outBuffer.Write(r.tempBuffer[:nDst]) + _, _ = r.outBuffer.Write(r.tempBuffer[:nDst]) } r.byteCount += start diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index 17fdfcf1039..eb4989b06f7 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -23,6 +23,8 @@ package readfile import ( "bytes" "encoding/hex" + "errors" + "fmt" "io" "io/ioutil" "math/rand" @@ -39,44 +41,47 @@ import ( // Sample texts are from http://www.columbia.edu/~kermit/utf8.html type lineTestCase struct { - encoding string - strings []string + encoding string + strings []string + collectOnEOF bool + withEOL bool + lineTerminator LineTerminator } var tests = []lineTestCase{ - {"plain", []string{"I can", "eat glass"}}, - {"latin1", []string{"I kå Glas frässa", "ond des macht mr nix!"}}, - {"utf-16be", []string{"Pot să mănânc sticlă", "și ea nu mă rănește."}}, - {"utf-16le", []string{"काचं शक्नोम्यत्तुम् ।", "नोपहिनस्ति माम् ॥"}}, - {"big5", []string{"我能吞下玻", "璃而不傷身體。"}}, - {"gb18030", []string{"我能吞下玻璃", "而不傷身。體"}}, - {"euc-kr", []string{" 나는 유리를 먹을 수 있어요.", " 그래도 아프지 않아요"}}, - {"euc-jp", []string{"私はガラスを食べられます。", "それは私を傷つけません。"}}, - {"plain", []string{"I can", "eat glass"}}, - {"iso8859-1", []string{"Filebeat is my favourite"}}, - {"iso8859-2", []string{"Filebeat je môj obľúbený"}}, // slovak: filebeat is my favourite - {"iso8859-3", []string{"büyükannem Filebeat kullanıyor"}}, // turkish: my granmother uses filebeat - {"iso8859-4", []string{"Filebeat on mõeldud kõigile"}}, // estonian: filebeat is for everyone - {"iso8859-5", []string{"я люблю кодировки"}}, // russian: i love encodings - {"iso8859-6", []string{"أنا بحاجة إلى المزيد من الترميزات"}}, // arabic: i need more encodings - {"iso8859-7", []string{"όπου μπορώ να αγοράσω περισσότερες κωδικοποιήσεις"}}, // greek: where can i buy more encodings? - {"iso8859-8", []string{"אני צריך קידוד אישי"}}, // hebrew: i need a personal encoding - {"iso8859-9", []string{"kodlamaları pişirebilirim"}}, // turkish: i can cook encodings - {"iso8859-10", []string{"koodaukset jäädyttävät nollaan"}}, // finnish: encodings freeze below zero - {"iso8859-13", []string{"mój pies zjada kodowanie"}}, // polish: my dog eats encodings - {"iso8859-14", []string{"An féidir leat cáise a ionchódú?"}}, // irish: can you encode a cheese? - {"iso8859-15", []string{"bedes du kode", "for min €"}}, // danish: please encode my euro symbol - {"iso8859-16", []string{"rossz karakterkódolást", "használsz"}}, // hungarian: you use the wrong character encoding - {"koi8r", []string{"я люблю кодировки"}}, // russian: i love encodings - {"koi8u", []string{"я люблю кодировки"}}, // russian: i love encodings - {"windows1250", []string{"Filebeat je môj obľúbený"}}, // slovak: filebeat is my favourite - {"windows1251", []string{"я люблю кодировки"}}, // russian: i love encodings - {"windows1252", []string{"what is better than an encoding?", "a legacy encoding"}}, - {"windows1253", []string{"όπου μπορώ να αγοράσω", "περισσότερες κωδικοποιήσεις"}}, // greek: where can i buy more encodings? - {"windows1254", []string{"kodlamaları", "pişirebilirim"}}, // turkish: i can cook encodings - {"windows1255", []string{"אני צריך קידוד אישי"}}, // hebrew: i need a personal encoding - {"windows1256", []string{"أنا بحاجة إلى المزيد من الترميزات"}}, // arabic: i need more encodings - {"windows1257", []string{"toite", "kodeerijaid"}}, // estonian: feed the encoders + {encoding: "plain", strings: []string{"I can", "eat glass"}}, + {encoding: "latin1", strings: []string{"I kå Glas frässa", "ond des macht mr nix!"}}, + {encoding: "utf-16be", strings: []string{"Pot să mănânc sticlă", "și ea nu mă rănește."}}, + {encoding: "utf-16le", strings: []string{"काचं शक्नोम्यत्तुम् ।", "नोपहिनस्ति माम् ॥"}}, + {encoding: "big5", strings: []string{"我能吞下玻", "璃而不傷身體。"}}, + {encoding: "gb18030", strings: []string{"我能吞下玻璃", "而不傷身。體"}}, + {encoding: "euc-kr", strings: []string{" 나는 유리를 먹을 수 있어요.", " 그래도 아프지 않아요"}}, + {encoding: "euc-jp", strings: []string{"私はガラスを食べられます。", "それは私を傷つけません。"}}, + {encoding: "plain", strings: []string{"I can", "eat glass"}}, + {encoding: "iso8859-1", strings: []string{"Filebeat is my favourite"}}, + {encoding: "iso8859-2", strings: []string{"Filebeat je môj obľúbený"}}, // slovak: filebeat is my favourite + {encoding: "iso8859-3", strings: []string{"büyükannem Filebeat kullanıyor"}}, // turkish: my granmother uses filebeat + {encoding: "iso8859-4", strings: []string{"Filebeat on mõeldud kõigile"}}, // estonian: filebeat is for everyone + {encoding: "iso8859-5", strings: []string{"я люблю кодировки"}}, // russian: i love encodings + {encoding: "iso8859-6", strings: []string{"أنا بحاجة إلى المزيد من الترميزات"}}, // arabic: i need more encodings + {encoding: "iso8859-7", strings: []string{"όπου μπορώ να αγοράσω περισσότερες κωδικοποιήσεις"}}, // greek: where can i buy more encodings? + {encoding: "iso8859-8", strings: []string{"אני צריך קידוד אישי"}}, // hebrew: i need a personal encoding + {encoding: "iso8859-9", strings: []string{"kodlamaları pişirebilirim"}}, // turkish: i can cook encodings + {encoding: "iso8859-10", strings: []string{"koodaukset jäädyttävät nollaan"}}, // finnish: encodings freeze below zero + {encoding: "iso8859-13", strings: []string{"mój pies zjada kodowanie"}}, // polish: my dog eats encodings + {encoding: "iso8859-14", strings: []string{"An féidir leat cáise a ionchódú?"}}, // irish: can you encode a cheese? + {encoding: "iso8859-15", strings: []string{"bedes du kode", "for min €"}}, // danish: please encode my euro symbol + {encoding: "iso8859-16", strings: []string{"rossz karakterkódolást", "használsz"}}, // hungarian: you use the wrong character encoding + {encoding: "koi8r", strings: []string{"я люблю кодировки"}}, // russian: i love encodings + {encoding: "koi8u", strings: []string{"я люблю кодировки"}}, // russian: i love encodings + {encoding: "windows1250", strings: []string{"Filebeat je môj obľúbený"}}, // slovak: filebeat is my favourite + {encoding: "windows1251", strings: []string{"я люблю кодировки"}}, // russian: i love encodings + {encoding: "windows1252", strings: []string{"what is better than an encoding?", "a legacy encoding"}}, + {encoding: "windows1253", strings: []string{"όπου μπορώ να αγοράσω", "περισσότερες κωδικοποιήσεις"}}, // greek: where can i buy more encodings? + {encoding: "windows1254", strings: []string{"kodlamaları", "pişirebilirim"}}, // turkish: i can cook encodings + {encoding: "windows1255", strings: []string{"אני צריך קידוד אישי"}}, // hebrew: i need a personal encoding + {encoding: "windows1256", strings: []string{"أنا بحاجة إلى المزيد من الترميزات"}}, // arabic: i need more encodings + {encoding: "windows1257", strings: []string{"toite", "kodeerijaid"}}, // estonian: feed the encoders } func TestReaderEncodings(t *testing.T) { @@ -88,19 +93,21 @@ func TestReaderEncodings(t *testing.T) { buffer := bytes.NewBuffer(nil) codec, _ := codecFactory(buffer) - nl := lineTerminatorCharacters[LineFeed] + nl := lineTerminatorCharacters[test.lineTerminator] // write with encoding to buffer writer := transform.NewWriter(buffer, codec.NewEncoder()) var expectedCount []int - for _, line := range test.strings { - writer.Write([]byte(line)) - writer.Write(nl) + for i, line := range test.strings { + _, _ = writer.Write([]byte(line)) + if !test.collectOnEOF || i < len(test.strings)-1 || test.withEOL { + _, _ = writer.Write(nl) + } expectedCount = append(expectedCount, buffer.Len()) } // create line reader - reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, LineFeed, unlimited}) + reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, test.lineTerminator, unlimited, test.collectOnEOF}) if err != nil { t.Fatal("failed to initialize reader:", err) } @@ -112,15 +119,19 @@ func TestReaderEncodings(t *testing.T) { for { bytes, sz, err := reader.Next() if sz > 0 { - readLines = append(readLines, string(bytes[:len(bytes)-len(nl)])) + offset := len(bytes) + if offset > 0 && (!test.collectOnEOF || !errors.Is(err, io.EOF) || test.withEOL) { + offset -= len(nl) + } + readLines = append(readLines, string(bytes[:offset])) } + current += sz + byteCounts = append(byteCounts, current) + if err != nil { break } - - current += sz - byteCounts = append(byteCounts, current) } // validate lines and byte offsets @@ -136,10 +147,65 @@ func TestReaderEncodings(t *testing.T) { } } + invalidLineTerminatorForEncoding := map[string][]LineTerminator{ + "latin1": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "big5": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "euc-kr": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "euc-jp": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-1": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-2": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-3": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-4": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-5": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-6": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-7": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-8": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-9": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-10": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-13": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-14": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-15": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-16": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "koi8r": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "koi8u": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1250": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1251": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1252": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1253": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1254": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1255": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1256": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1257": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "utf-16be": []LineTerminator{NextLine}, // test fails: buf ends with uint8{189} instead of uint8{133} + "gb18030": []LineTerminator{NextLine}, // test fails: buf ends with uint8{189} instead of uint8{133} + "utf-16le": []LineTerminator{NextLine}, // test fails: buf ends with uint8{189} instead of uint8{133} + } for _, test := range tests { - t.Run(test.encoding, func(t *testing.T) { - runTest(t, test) - }) + for _, collectOnEOF := range []bool{false, true} { + for _, withEOL := range []bool{false, true} { + for lineTerminatorName, lineTerminator := range lineTerminators { + lineTerminatorIsInvalid := false + if invalidLineTerminatorForEncoding, ok := invalidLineTerminatorForEncoding[test.encoding]; ok { + for _, invalidLineTerminator := range invalidLineTerminatorForEncoding { + if invalidLineTerminator == lineTerminator { + lineTerminatorIsInvalid = true + break + } + } + } + if lineTerminatorIsInvalid { + continue + } + + test.withEOL = withEOL + test.collectOnEOF = collectOnEOF + test.lineTerminator = lineTerminator + t.Run(fmt.Sprintf("encoding: %s, collect on EOF: %t, with EOL: %t, line terminator: %s", test.encoding, test.collectOnEOF, test.withEOL, lineTerminatorName), func(t *testing.T) { + runTest(t, test) + }) + } + } + } } } @@ -160,7 +226,7 @@ func TestLineTerminators(t *testing.T) { buffer.Write([]byte("this is my second line")) buffer.Write(nl) - reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, terminator, unlimited}) + reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, terminator, unlimited, false}) if err != nil { t.Errorf("failed to initialize reader: %v", err) continue @@ -238,7 +304,7 @@ func testReadLines(t *testing.T, inputLines [][]byte, eofOnLastRead bool) { } codec, _ := encoding.Plain(r) - reader, err := NewLineReader(ioutil.NopCloser(r), Config{codec, buffer.Len(), LineFeed, unlimited}) + reader, err := NewLineReader(ioutil.NopCloser(r), Config{codec, buffer.Len(), LineFeed, unlimited, false}) if err != nil { t.Fatalf("Error initializing reader: %v", err) } @@ -261,10 +327,6 @@ func testReadLines(t *testing.T, inputLines [][]byte, eofOnLastRead bool) { } } -func testReadLine(t *testing.T, line []byte) { - testReadLines(t, [][]byte{line}, false) -} - func randomInt(r *rand.Rand, min, max int) int { return r.Intn(max+1-min) + min } @@ -358,17 +420,23 @@ func TestMaxBytesLimit(t *testing.T) { } // Create line reader - reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, LineFeed, lineMaxLimit}) + reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, LineFeed, lineMaxLimit, false}) if err != nil { t.Fatal("failed to initialize reader:", err) } // Read decodec lines and test - var idx int + + var ( + idx int + readLen int + ) + for i := 0; ; i++ { - b, _, err := reader.Next() + b, n, err := reader.Next() if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { + readLen += n break } else { t.Fatal("unexpected error:", err) @@ -387,11 +455,16 @@ func TestMaxBytesLimit(t *testing.T) { break } + readLen += n s := string(b[:len(b)-len(nl)]) if line != s { t.Fatalf("lines do not match, expected: %s got: %s", line, s) } } + + if len(input) != readLen { + t.Fatalf("the bytes read are not equal to the bytes input, expected: %d got: %d", len(input), readLen) + } } // test_exceed_buffer from test_harvester.py @@ -408,7 +481,7 @@ func TestBufferSize(t *testing.T) { bufferSize := 10 in := ioutil.NopCloser(strings.NewReader(strings.Join(lines, ""))) - reader, err := NewLineReader(in, Config{codec, bufferSize, AutoLineTerminator, 1024}) + reader, err := NewLineReader(in, Config{codec, bufferSize, AutoLineTerminator, 1024, false}) if err != nil { t.Fatal("failed to initialize reader:", err) } @@ -416,7 +489,7 @@ func TestBufferSize(t *testing.T) { for i := 0; i < len(lines); i++ { b, n, err := reader.Next() if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { break } else { t.Fatal("unexpected error:", err) diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index ba34d8e516e..364b9bd6c06 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -12,6 +12,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -20,7 +21,6 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -133,15 +133,15 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { // Request object (download). contentType, meta, body, err := p.download() if err != nil { - return errors.Wrapf(err, "failed to get s3 object (elasped_time_ns=%d)", - time.Since(start).Nanoseconds()) + return fmt.Errorf("failed to get s3 object (elapsed_time_ns=%d): %w", + time.Since(start).Nanoseconds(), err) } defer body.Close() p.s3Metadata = meta reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(body, p.metrics.s3BytesProcessedTotal)) if err != nil { - return errors.Wrap(err, "failed checking for gzip content") + return fmt.Errorf("failed checking for gzip content: %w", err) } // Overwrite with user configured Content-Type. @@ -157,8 +157,8 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { err = p.readFile(reader) } if err != nil { - return errors.Wrapf(err, "failed reading s3 object (elasped_time_ns=%d)", - time.Since(start).Nanoseconds()) + return fmt.Errorf("failed reading s3 object (elapsed_time_ns=%d): %w", + time.Since(start).Nanoseconds(), err) } return nil @@ -174,7 +174,7 @@ func (p *s3ObjectProcessor) download() (contentType string, metadata map[string] } if resp == nil { - return "", nil, nil, errors.New("empty response from s3 get object") + return "", nil, nil, fmt.Errorf("empty response from s3 get object") } meta := s3Metadata(resp, p.readerConfig.IncludeS3Metadata...) @@ -218,7 +218,10 @@ func (p *s3ObjectProcessor) readJSON(r io.Reader) error { } data, _ := item.MarshalJSON() - evt := createEvent(string(data), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) + evt, err := createEvent(string(data), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) + if err != nil { + return err + } p.publish(p.acker, &evt) } @@ -257,7 +260,10 @@ func (p *s3ObjectProcessor) splitEventList(key string, raw json.RawMessage, offs } data, _ := item.MarshalJSON() - evt := createEvent(string(data), offset+arrayOffset, p.s3Obj, objHash, p.s3Metadata) + evt, err := createEvent(string(data), offset+arrayOffset, p.s3Obj, objHash, p.s3Metadata) + if err != nil { + return err + } p.publish(p.acker, &evt) } @@ -277,10 +283,11 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { var reader reader.Reader reader, err = readfile.NewEncodeReader(ioutil.NopCloser(r), readfile.Config{ - Codec: enc, - BufferSize: int(p.readerConfig.BufferSize), - Terminator: p.readerConfig.LineTerminator, - MaxBytes: int(p.readerConfig.MaxBytes) * 4, + Codec: enc, + BufferSize: int(p.readerConfig.BufferSize), + Terminator: p.readerConfig.LineTerminator, + CollectOnEOF: true, + MaxBytes: int(p.readerConfig.MaxBytes) * 4, }) if err != nil { return fmt.Errorf("failed to create encode reader: %w", err) @@ -293,18 +300,25 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { var offset int64 for { message, err := reader.Next() - if err == io.EOF { + + if len(message.Content) > 0 { + event, err := createEvent(string(message.Content), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) + if err != nil { + return err + } + event.Fields.DeepUpdate(message.Fields) + offset += int64(message.Bytes) + p.publish(p.acker, &event) + } + + if errors.Is(err, io.EOF) { // No more lines break } + if err != nil { return fmt.Errorf("error reading message: %w", err) } - - event := createEvent(string(message.Content), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) - event.Fields.DeepUpdate(message.Fields) - offset += int64(message.Bytes) - p.publish(p.acker, &event) } return nil @@ -317,7 +331,7 @@ func (p *s3ObjectProcessor) publish(ack *awscommon.EventACKTracker, event *beat. p.publisher.Publish(*event) } -func createEvent(message string, offset int64, obj s3EventV2, objectHash string, meta map[string]interface{}) beat.Event { +func createEvent(message string, offset int64, obj s3EventV2, objectHash string, meta map[string]interface{}) (beat.Event, error) { event := beat.Event{ Timestamp: time.Now().UTC(), Fields: common.MapStr{ @@ -347,10 +361,13 @@ func createEvent(message string, offset int64, obj s3EventV2, objectHash string, event.SetID(objectID(objectHash, offset)) if len(meta) > 0 { - event.Fields.Put("aws.s3.metadata", meta) + _, err := event.Fields.Put("aws.s3.metadata", meta) + if err == nil { + return event, fmt.Errorf("error updating/adding AWS S3 metadata: %w", err) + } } - return event + return event, nil } func objectID(objectHash string, offset int64) string { diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index a5272fa6647..c288741f844 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -57,7 +57,10 @@ func newS3GetObjectResponse(filename string, data []byte, contentType string) *s } func TestS3ObjectProcessor(t *testing.T) { - logp.TestingSetup() + errSetup := logp.TestingSetup() + if errSetup != nil { + t.Errorf("Error in setup: %v", errSetup) + } t.Run("download text/plain file", func(t *testing.T) { testProcessS3Object(t, "testdata/log.txt", "text/plain", 2) @@ -201,6 +204,14 @@ func TestS3ObjectProcessor(t *testing.T) { err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.NoError(t, err) }) + + t.Run("text file without end of line marker", func(t *testing.T) { + testProcessS3Object(t, "testdata/no-eol.txt", "text/plain", 1) + }) + + t.Run("text file without end of line marker but with newline", func(t *testing.T) { + testProcessS3Object(t, "testdata/no-eol-twolines.txt", "text/plain", 2) + }) } func testProcessS3Object(t testing.TB, file, contentType string, numEvents int, selectors ...fileSelectorConfig) []beat.Event { diff --git a/x-pack/filebeat/input/awss3/testdata/no-eol-twolines.txt b/x-pack/filebeat/input/awss3/testdata/no-eol-twolines.txt new file mode 100644 index 00000000000..22ab1cb28e5 --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/no-eol-twolines.txt @@ -0,0 +1,2 @@ +Line1: This file does contain a final EOL. +Line2: This file does contain a final EOL. \ No newline at end of file diff --git a/x-pack/filebeat/input/awss3/testdata/no-eol.txt b/x-pack/filebeat/input/awss3/testdata/no-eol.txt new file mode 100644 index 00000000000..0b7757db86e --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/no-eol.txt @@ -0,0 +1 @@ +This file does contain a final EOL. \ No newline at end of file diff --git a/x-pack/osquerybeat/internal/osqdcli/retry_test.go b/x-pack/osquerybeat/internal/osqdcli/retry_test.go index 19052c2e338..a2bb920b42d 100644 --- a/x-pack/osquerybeat/internal/osqdcli/retry_test.go +++ b/x-pack/osquerybeat/internal/osqdcli/retry_test.go @@ -9,18 +9,23 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/beats/v7/libbeat/logp" ) func TestRetryRun(t *testing.T) { - logp.Configure(logp.Config{ + configErr := logp.Configure(logp.Config{ Level: logp.DebugLevel, ToStderr: true, Selectors: []string{"*"}, }) + if configErr != nil { + t.Errorf("Error in configuring the test %v", configErr) + } + log := logp.NewLogger("retry_test").With("context", "osquery client connect") ctx := context.Background()