From 3dd877ba1315e8da253ba1f70b1cd8bb2fd2478f Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 7 Oct 2022 15:20:20 +0100 Subject: [PATCH 1/4] Add custom log reader implementation to fix hang on long log lines --- clients/destination.go | 19 +++++++---- clients/log_reader.go | 43 ++++++++++++++++++++++++ clients/log_reader_test.go | 69 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 6 deletions(-) create mode 100644 clients/log_reader.go create mode 100644 clients/log_reader_test.go diff --git a/clients/destination.go b/clients/destination.go index 57b9213f1d..280907ae6a 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -1,10 +1,11 @@ package clients import ( - "bufio" "context" "encoding/json" + "errors" "fmt" + "io" "net" "os" "os/exec" @@ -124,12 +125,18 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) ( c.wg.Add(1) go func() { defer c.wg.Done() - scanner := bufio.NewScanner(reader) - for scanner.Scan() { + lr := newLogReader(reader) + for { + line, err := lr.NextLine() + if errors.Is(err, io.EOF) { + break + } else if err != nil { + c.logger.Err(err).Str("line", string(line)).Msg("failed to read log line from plugin") + continue + } var structuredLogLine map[string]interface{} - b := scanner.Bytes() - if err := json.Unmarshal(b, &structuredLogLine); err != nil { - c.logger.Err(err).Str("line", string(b)).Msg("failed to unmarshal log line from plugin") + if err := json.Unmarshal(line, &structuredLogLine); err != nil { + c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin") } else { jsonToLog(c.logger, structuredLogLine) } diff --git a/clients/log_reader.go b/clients/log_reader.go new file mode 100644 index 0000000000..4ff7c58398 --- /dev/null +++ b/clients/log_reader.go @@ -0,0 +1,43 @@ +package clients + +import ( + "bufio" + "errors" + "io" +) + +const logReaderPrefixLen = 1000 + +type logReader struct { + bufferedReader *bufio.Reader + reader io.ReadCloser // reader provided by the client +} + +func newLogReader(reader io.ReadCloser) *logReader { + return &logReader{ + reader: reader, + bufferedReader: bufio.NewReader(reader), + } +} + +func (r *logReader) NextLine() ([]byte, error) { + line, isPrefix, err := r.bufferedReader.ReadLine() + if !isPrefix || err != nil { + return line, err + } + prefix := make([]byte, logReaderPrefixLen) + for i := 0; isPrefix; i++ { + // this loop is entered if a log line is too long to fit into the buffer. We discard it by + // iterating until isPrefix becomes false. We only log the first few bytes of the line to help with + // identification. + if i == 0 { + prefixLen := logReaderPrefixLen + if len(line) < prefixLen { + prefixLen = len(line) + } + copy(prefix, line[:prefixLen]) + } + line, isPrefix, err = r.bufferedReader.ReadLine() + } + return prefix, errors.New("log line too long, discarding") +} diff --git a/clients/log_reader_test.go b/clients/log_reader_test.go new file mode 100644 index 0000000000..353ae3dc7e --- /dev/null +++ b/clients/log_reader_test.go @@ -0,0 +1,69 @@ +package clients + +import ( + "github.com/google/go-cmp/cmp" + "io" + "strings" + "testing" +) + +func longStr(len int) string { + b := make([]byte, len) + for i := 0; i < len; i++ { + b[i] = byte(65 + (i % 26)) // cycle through letters A to Z + } + return string(b) +} + +func Test_LogReader(t *testing.T) { + cases := []struct { + name string + text string + wantLines []string + wantErr bool + }{ + { + name: "basic case", + text: `{"k": "v"} +{"k2": "v2"}`, + wantErr: false, + wantLines: []string{ + `{"k": "v"}`, + `{"k2": "v2"}`, + }}, + { + name: "very long line", + text: longStr(10000000), + wantLines: []string{ + longStr(logReaderPrefixLen), + }, + wantErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + r := io.NopCloser(strings.NewReader(tc.text)) + lr := newLogReader(r) + var gotErr error + gotLines := make([]string, 0) + for i := 0; i < len(tc.wantLines)+1; i++ { + line, err := lr.NextLine() + if err == io.EOF { + break + } else if err != nil { + gotErr = err + } + gotLines = append(gotLines, string(line)) + } + if gotErr == nil && tc.wantErr { + t.Fatal("NextLine() was expected to return error, but didn't") + } + if len(gotLines) != len(tc.wantLines) { + t.Fatalf("NextLine() calls got %d lines, want %d", len(gotLines), len(tc.wantLines)) + } + if diff := cmp.Diff(gotLines, tc.wantLines); diff != "" { + t.Errorf("NextLine() lines differ from expected. Diff (-got, +want): %s", diff) + } + }) + } +} From af8a70d5d81c0ec5d73760f7d8d9c09abf11df4a Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 7 Oct 2022 15:28:13 +0100 Subject: [PATCH 2/4] Add to source and add comments --- clients/log_reader.go | 8 ++++++++ clients/source.go | 18 ++++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/clients/log_reader.go b/clients/log_reader.go index 4ff7c58398..13d5b436f7 100644 --- a/clients/log_reader.go +++ b/clients/log_reader.go @@ -6,13 +6,17 @@ import ( "io" ) +// logReaderPrefixLen is used when returning a partial line as context in NextLine const logReaderPrefixLen = 1000 +// logReader is a custom implementation similar to bufio.Scanner, but provides a way to handle lines +// (or tokens) that exceed the buffer size. type logReader struct { bufferedReader *bufio.Reader reader io.ReadCloser // reader provided by the client } +// newLogReader creates a new logReader to read log lines from an io.ReadCloser func newLogReader(reader io.ReadCloser) *logReader { return &logReader{ reader: reader, @@ -20,6 +24,10 @@ func newLogReader(reader io.ReadCloser) *logReader { } } +// NextLine reads and returns the next log line from the reader. An io.EOF error is returned +// if the end of the stream has been reached. This implementation is different from bufio.Scanner as it +// also returns an error if a line is too long to fit into the buffer. In this case, an error is returned +// together with a limited prefix of the line. func (r *logReader) NextLine() ([]byte, error) { line, isPrefix, err := r.bufferedReader.ReadLine() if !isPrefix || err != nil { diff --git a/clients/source.go b/clients/source.go index 4ffd3f2aac..50cb08a19e 100644 --- a/clients/source.go +++ b/clients/source.go @@ -1,9 +1,9 @@ package clients import ( - "bufio" "context" "encoding/json" + "errors" "fmt" "io" "net" @@ -129,12 +129,18 @@ func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*Sour c.wg.Add(1) go func() { defer c.wg.Done() - scanner := bufio.NewScanner(reader) - for scanner.Scan() { + lr := newLogReader(reader) + for { + line, err := lr.NextLine() + if errors.Is(err, io.EOF) { + break + } else if err != nil { + c.logger.Err(err).Str("line", string(line)).Msg("failed to read log line from plugin") + continue + } var structuredLogLine map[string]interface{} - b := scanner.Bytes() - if err := json.Unmarshal(b, &structuredLogLine); err != nil { - c.logger.Err(err).Str("line", string(b)).Msg("failed to unmarshal log line from plugin") + if err := json.Unmarshal(line, &structuredLogLine); err != nil { + c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin") } else { jsonToLog(c.logger, structuredLogLine) } From 7db68287aeb90c002c51eceb4652f98b26b18754 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 7 Oct 2022 15:59:42 +0100 Subject: [PATCH 3/4] Add benchmarks --- clients/log_reader_test.go | 45 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/clients/log_reader_test.go b/clients/log_reader_test.go index 353ae3dc7e..674d8ea10f 100644 --- a/clients/log_reader_test.go +++ b/clients/log_reader_test.go @@ -1,6 +1,7 @@ package clients import ( + "bufio" "github.com/google/go-cmp/cmp" "io" "strings" @@ -15,6 +16,14 @@ func longStr(len int) string { return string(b) } +func genLogs(num, lineLen int) string { + s := make([]string, num) + for i := 0; i < num; i++ { + s[i] = longStr(lineLen) + } + return strings.Join(s, "\n") +} + func Test_LogReader(t *testing.T) { cases := []struct { name string @@ -67,3 +76,39 @@ func Test_LogReader(t *testing.T) { }) } } + +// we store these package-level variables so that the compiler cannot eliminate the Benchmarks themselves +var ( + bufScannerResult []byte + logReaderResult []byte +) + +func Benchmark_BufferedScanner(b *testing.B) { + logs := genLogs(10, 10000) + bs := bufio.NewScanner(io.NopCloser(strings.NewReader(logs))) + b.ResetTimer() + var got []byte + for n := 0; n < b.N; n++ { + for bs.Scan() { + got = bs.Bytes() + } + } + bufScannerResult = got +} + +func Benchmark_LogReader(b *testing.B) { + logs := genLogs(10, 10000) + lr := newLogReader(io.NopCloser(strings.NewReader(logs))) + b.ResetTimer() + var got []byte + for n := 0; n < b.N; n++ { + for { + line, err := lr.NextLine() + if err == io.EOF { + break + } + got = line + } + } + logReaderResult = got +} From 361c82e3a0d4c1fb397bd0ee1af685bbbfc4dd9e Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 7 Oct 2022 15:59:49 +0100 Subject: [PATCH 4/4] Handle errors better --- clients/destination.go | 9 +++++++-- clients/log_reader.go | 7 ++++++- clients/source.go | 9 +++++++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/clients/destination.go b/clients/destination.go index 280907ae6a..80d94d4905 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -130,10 +130,15 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) ( line, err := lr.NextLine() if errors.Is(err, io.EOF) { break - } else if err != nil { - c.logger.Err(err).Str("line", string(line)).Msg("failed to read log line from plugin") + } + if errors.Is(err, errLogLineToLong) { + c.logger.Err(err).Str("line", string(line)).Msg("skipping too long log line") continue } + if err != nil { + c.logger.Err(err).Msg("failed to read log line from plugin") + break + } var structuredLogLine map[string]interface{} if err := json.Unmarshal(line, &structuredLogLine); err != nil { c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin") diff --git a/clients/log_reader.go b/clients/log_reader.go index 13d5b436f7..b1b344cfb3 100644 --- a/clients/log_reader.go +++ b/clients/log_reader.go @@ -9,6 +9,8 @@ import ( // logReaderPrefixLen is used when returning a partial line as context in NextLine const logReaderPrefixLen = 1000 +var errLogLineToLong = errors.New("log line too long, discarding") + // logReader is a custom implementation similar to bufio.Scanner, but provides a way to handle lines // (or tokens) that exceed the buffer size. type logReader struct { @@ -46,6 +48,9 @@ func (r *logReader) NextLine() ([]byte, error) { copy(prefix, line[:prefixLen]) } line, isPrefix, err = r.bufferedReader.ReadLine() + if err != nil { + return nil, err + } } - return prefix, errors.New("log line too long, discarding") + return prefix, errLogLineToLong } diff --git a/clients/source.go b/clients/source.go index 50cb08a19e..a135684d36 100644 --- a/clients/source.go +++ b/clients/source.go @@ -134,10 +134,15 @@ func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*Sour line, err := lr.NextLine() if errors.Is(err, io.EOF) { break - } else if err != nil { - c.logger.Err(err).Str("line", string(line)).Msg("failed to read log line from plugin") + } + if errors.Is(err, errLogLineToLong) { + c.logger.Err(err).Str("line", string(line)).Msg("skipping too long log line") continue } + if err != nil { + c.logger.Err(err).Msg("failed to read log line from plugin") + break + } var structuredLogLine map[string]interface{} if err := json.Unmarshal(line, &structuredLogLine); err != nil { c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin")