diff --git a/clients/destination.go b/clients/destination.go index 57b9213f1d..80d94d4905 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -1,10 +1,11 @@ package clients import ( - "bufio" "context" "encoding/json" + "errors" "fmt" + "io" "net" "os" "os/exec" @@ -124,12 +125,23 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) ( c.wg.Add(1) go func() { defer c.wg.Done() - scanner := bufio.NewScanner(reader) - for scanner.Scan() { + lr := newLogReader(reader) + for { + line, err := lr.NextLine() + if errors.Is(err, io.EOF) { + break + } + if errors.Is(err, errLogLineToLong) { + c.logger.Err(err).Str("line", string(line)).Msg("skipping too long log line") + continue + } + if err != nil { + c.logger.Err(err).Msg("failed to read log line from plugin") + break + } var structuredLogLine map[string]interface{} - b := scanner.Bytes() - if err := json.Unmarshal(b, &structuredLogLine); err != nil { - c.logger.Err(err).Str("line", string(b)).Msg("failed to unmarshal log line from plugin") + if err := json.Unmarshal(line, &structuredLogLine); err != nil { + c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin") } else { jsonToLog(c.logger, structuredLogLine) } diff --git a/clients/log_reader.go b/clients/log_reader.go new file mode 100644 index 0000000000..b1b344cfb3 --- /dev/null +++ b/clients/log_reader.go @@ -0,0 +1,56 @@ +package clients + +import ( + "bufio" + "errors" + "io" +) + +// logReaderPrefixLen is used when returning a partial line as context in NextLine +const logReaderPrefixLen = 1000 + +var errLogLineToLong = errors.New("log line too long, discarding") + +// logReader is a custom implementation similar to bufio.Scanner, but provides a way to handle lines +// (or tokens) that exceed the buffer size. +type logReader struct { + bufferedReader *bufio.Reader + reader io.ReadCloser // reader provided by the client +} + +// newLogReader creates a new logReader to read log lines from an io.ReadCloser +func newLogReader(reader io.ReadCloser) *logReader { + return &logReader{ + reader: reader, + bufferedReader: bufio.NewReader(reader), + } +} + +// NextLine reads and returns the next log line from the reader. An io.EOF error is returned +// if the end of the stream has been reached. This implementation is different from bufio.Scanner as it +// also returns an error if a line is too long to fit into the buffer. In this case, an error is returned +// together with a limited prefix of the line. +func (r *logReader) NextLine() ([]byte, error) { + line, isPrefix, err := r.bufferedReader.ReadLine() + if !isPrefix || err != nil { + return line, err + } + prefix := make([]byte, logReaderPrefixLen) + for i := 0; isPrefix; i++ { + // this loop is entered if a log line is too long to fit into the buffer. We discard it by + // iterating until isPrefix becomes false. We only log the first few bytes of the line to help with + // identification. + if i == 0 { + prefixLen := logReaderPrefixLen + if len(line) < prefixLen { + prefixLen = len(line) + } + copy(prefix, line[:prefixLen]) + } + line, isPrefix, err = r.bufferedReader.ReadLine() + if err != nil { + return nil, err + } + } + return prefix, errLogLineToLong +} diff --git a/clients/log_reader_test.go b/clients/log_reader_test.go new file mode 100644 index 0000000000..674d8ea10f --- /dev/null +++ b/clients/log_reader_test.go @@ -0,0 +1,114 @@ +package clients + +import ( + "bufio" + "github.com/google/go-cmp/cmp" + "io" + "strings" + "testing" +) + +func longStr(len int) string { + b := make([]byte, len) + for i := 0; i < len; i++ { + b[i] = byte(65 + (i % 26)) // cycle through letters A to Z + } + return string(b) +} + +func genLogs(num, lineLen int) string { + s := make([]string, num) + for i := 0; i < num; i++ { + s[i] = longStr(lineLen) + } + return strings.Join(s, "\n") +} + +func Test_LogReader(t *testing.T) { + cases := []struct { + name string + text string + wantLines []string + wantErr bool + }{ + { + name: "basic case", + text: `{"k": "v"} +{"k2": "v2"}`, + wantErr: false, + wantLines: []string{ + `{"k": "v"}`, + `{"k2": "v2"}`, + }}, + { + name: "very long line", + text: longStr(10000000), + wantLines: []string{ + longStr(logReaderPrefixLen), + }, + wantErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + r := io.NopCloser(strings.NewReader(tc.text)) + lr := newLogReader(r) + var gotErr error + gotLines := make([]string, 0) + for i := 0; i < len(tc.wantLines)+1; i++ { + line, err := lr.NextLine() + if err == io.EOF { + break + } else if err != nil { + gotErr = err + } + gotLines = append(gotLines, string(line)) + } + if gotErr == nil && tc.wantErr { + t.Fatal("NextLine() was expected to return error, but didn't") + } + if len(gotLines) != len(tc.wantLines) { + t.Fatalf("NextLine() calls got %d lines, want %d", len(gotLines), len(tc.wantLines)) + } + if diff := cmp.Diff(gotLines, tc.wantLines); diff != "" { + t.Errorf("NextLine() lines differ from expected. Diff (-got, +want): %s", diff) + } + }) + } +} + +// we store these package-level variables so that the compiler cannot eliminate the Benchmarks themselves +var ( + bufScannerResult []byte + logReaderResult []byte +) + +func Benchmark_BufferedScanner(b *testing.B) { + logs := genLogs(10, 10000) + bs := bufio.NewScanner(io.NopCloser(strings.NewReader(logs))) + b.ResetTimer() + var got []byte + for n := 0; n < b.N; n++ { + for bs.Scan() { + got = bs.Bytes() + } + } + bufScannerResult = got +} + +func Benchmark_LogReader(b *testing.B) { + logs := genLogs(10, 10000) + lr := newLogReader(io.NopCloser(strings.NewReader(logs))) + b.ResetTimer() + var got []byte + for n := 0; n < b.N; n++ { + for { + line, err := lr.NextLine() + if err == io.EOF { + break + } + got = line + } + } + logReaderResult = got +} diff --git a/clients/source.go b/clients/source.go index 4ffd3f2aac..a135684d36 100644 --- a/clients/source.go +++ b/clients/source.go @@ -1,9 +1,9 @@ package clients import ( - "bufio" "context" "encoding/json" + "errors" "fmt" "io" "net" @@ -129,12 +129,23 @@ func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*Sour c.wg.Add(1) go func() { defer c.wg.Done() - scanner := bufio.NewScanner(reader) - for scanner.Scan() { + lr := newLogReader(reader) + for { + line, err := lr.NextLine() + if errors.Is(err, io.EOF) { + break + } + if errors.Is(err, errLogLineToLong) { + c.logger.Err(err).Str("line", string(line)).Msg("skipping too long log line") + continue + } + if err != nil { + c.logger.Err(err).Msg("failed to read log line from plugin") + break + } var structuredLogLine map[string]interface{} - b := scanner.Bytes() - if err := json.Unmarshal(b, &structuredLogLine); err != nil { - c.logger.Err(err).Str("line", string(b)).Msg("failed to unmarshal log line from plugin") + if err := json.Unmarshal(line, &structuredLogLine); err != nil { + c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin") } else { jsonToLog(c.logger, structuredLogLine) }