Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions clients/destination.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package clients

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
Expand Down Expand Up @@ -124,12 +125,23 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) (
c.wg.Add(1)
go func() {
defer c.wg.Done()
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
lr := newLogReader(reader)
for {
line, err := lr.NextLine()
if errors.Is(err, io.EOF) {
break
}
if errors.Is(err, errLogLineToLong) {
c.logger.Err(err).Str("line", string(line)).Msg("skipping too long log line")
continue
}
if err != nil {
c.logger.Err(err).Msg("failed to read log line from plugin")
break
}
var structuredLogLine map[string]interface{}
b := scanner.Bytes()
if err := json.Unmarshal(b, &structuredLogLine); err != nil {
c.logger.Err(err).Str("line", string(b)).Msg("failed to unmarshal log line from plugin")
if err := json.Unmarshal(line, &structuredLogLine); err != nil {
c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin")
} else {
jsonToLog(c.logger, structuredLogLine)
}
Expand Down
56 changes: 56 additions & 0 deletions clients/log_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package clients

import (
"bufio"
"errors"
"io"
)

// logReaderPrefixLen is used when returning a partial line as context in NextLine
const logReaderPrefixLen = 1000

var errLogLineToLong = errors.New("log line too long, discarding")

// logReader is a custom implementation similar to bufio.Scanner, but provides a way to handle lines
// (or tokens) that exceed the buffer size.
type logReader struct {
bufferedReader *bufio.Reader
reader io.ReadCloser // reader provided by the client
}

// newLogReader creates a new logReader to read log lines from an io.ReadCloser
func newLogReader(reader io.ReadCloser) *logReader {
return &logReader{
reader: reader,
bufferedReader: bufio.NewReader(reader),
}
}

// NextLine reads and returns the next log line from the reader. An io.EOF error is returned
// if the end of the stream has been reached. This implementation is different from bufio.Scanner as it
// also returns an error if a line is too long to fit into the buffer. In this case, an error is returned
// together with a limited prefix of the line.
func (r *logReader) NextLine() ([]byte, error) {
line, isPrefix, err := r.bufferedReader.ReadLine()
if !isPrefix || err != nil {
return line, err
}
prefix := make([]byte, logReaderPrefixLen)
for i := 0; isPrefix; i++ {
// this loop is entered if a log line is too long to fit into the buffer. We discard it by
// iterating until isPrefix becomes false. We only log the first few bytes of the line to help with
// identification.
if i == 0 {
prefixLen := logReaderPrefixLen
if len(line) < prefixLen {
prefixLen = len(line)
}
copy(prefix, line[:prefixLen])
}
line, isPrefix, err = r.bufferedReader.ReadLine()
if err != nil {
return nil, err
}
}
return prefix, errLogLineToLong
}
114 changes: 114 additions & 0 deletions clients/log_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package clients

import (
"bufio"
"github.com/google/go-cmp/cmp"
"io"
"strings"
"testing"
)

func longStr(len int) string {
b := make([]byte, len)
for i := 0; i < len; i++ {
b[i] = byte(65 + (i % 26)) // cycle through letters A to Z
}
return string(b)
}

func genLogs(num, lineLen int) string {
s := make([]string, num)
for i := 0; i < num; i++ {
s[i] = longStr(lineLen)
}
return strings.Join(s, "\n")
}

func Test_LogReader(t *testing.T) {
cases := []struct {
name string
text string
wantLines []string
wantErr bool
}{
{
name: "basic case",
text: `{"k": "v"}
{"k2": "v2"}`,
wantErr: false,
wantLines: []string{
`{"k": "v"}`,
`{"k2": "v2"}`,
}},
{
name: "very long line",
text: longStr(10000000),
wantLines: []string{
longStr(logReaderPrefixLen),
},
wantErr: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
r := io.NopCloser(strings.NewReader(tc.text))
lr := newLogReader(r)
var gotErr error
gotLines := make([]string, 0)
for i := 0; i < len(tc.wantLines)+1; i++ {
line, err := lr.NextLine()
if err == io.EOF {
break
} else if err != nil {
gotErr = err
}
gotLines = append(gotLines, string(line))
}
if gotErr == nil && tc.wantErr {
t.Fatal("NextLine() was expected to return error, but didn't")
}
if len(gotLines) != len(tc.wantLines) {
t.Fatalf("NextLine() calls got %d lines, want %d", len(gotLines), len(tc.wantLines))
}
if diff := cmp.Diff(gotLines, tc.wantLines); diff != "" {
t.Errorf("NextLine() lines differ from expected. Diff (-got, +want): %s", diff)
}
})
}
}

// we store these package-level variables so that the compiler cannot eliminate the Benchmarks themselves
var (
bufScannerResult []byte
logReaderResult []byte
)

func Benchmark_BufferedScanner(b *testing.B) {
logs := genLogs(10, 10000)
bs := bufio.NewScanner(io.NopCloser(strings.NewReader(logs)))
b.ResetTimer()
var got []byte
for n := 0; n < b.N; n++ {
for bs.Scan() {
got = bs.Bytes()
}
}
bufScannerResult = got
}

func Benchmark_LogReader(b *testing.B) {
logs := genLogs(10, 10000)
lr := newLogReader(io.NopCloser(strings.NewReader(logs)))
b.ResetTimer()
var got []byte
for n := 0; n < b.N; n++ {
for {
line, err := lr.NextLine()
if err == io.EOF {
break
}
got = line
}
}
logReaderResult = got
}
23 changes: 17 additions & 6 deletions clients/source.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package clients

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -129,12 +129,23 @@ func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*Sour
c.wg.Add(1)
go func() {
defer c.wg.Done()
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
lr := newLogReader(reader)
for {
line, err := lr.NextLine()
if errors.Is(err, io.EOF) {
break
}
if errors.Is(err, errLogLineToLong) {
c.logger.Err(err).Str("line", string(line)).Msg("skipping too long log line")
continue
}
if err != nil {
c.logger.Err(err).Msg("failed to read log line from plugin")
break
}
var structuredLogLine map[string]interface{}
b := scanner.Bytes()
if err := json.Unmarshal(b, &structuredLogLine); err != nil {
c.logger.Err(err).Str("line", string(b)).Msg("failed to unmarshal log line from plugin")
if err := json.Unmarshal(line, &structuredLogLine); err != nil {
c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin")
} else {
jsonToLog(c.logger, structuredLogLine)
}
Expand Down