Skip to content

Commit

Permalink
Fix memory leak on corrupt unicode character
Browse files Browse the repository at this point in the history
See the added test.
When a corrupt unicode character is received, mtail did not process
the following logs but kept them in memory.

With this commit RuneErrors on parsing unicode characters are ignored
when there are enough characters after it in the buffer.
This keeps the functionality intact that bytes that gave a RuneError are
being reparsed, but with a larger buffer, to not break unicode
characters which were split in half from the buffered reading on the
logfile being read.

The issue was introduced in v3.0.0-rc50
  • Loading branch information
rreuvekamp committed Aug 17, 2022
1 parent 060a830 commit 6d127f3
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 5 deletions.
12 changes: 7 additions & 5 deletions internal/tailer/logstream/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname
var (
r rune
width int
count int
)
for i := 0; i < len(b) && i < n; i += width {
var i int
for ; i < len(b) && i < n; i += width {
r, width = utf8.DecodeRune(b[i:])
if r == utf8.RuneError {
return count
if len(b)-i > 10 {
continue
}
return i
}
// Most file-based log sources will end with \n on Unixlike systems.
// On Windows they appear to be both \r\n. syslog disallows \r (and \t
Expand All @@ -43,9 +46,8 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname
default:
partial.WriteRune(r)
}
count += width
}
return count
return i
}

func sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) {
Expand Down
46 changes: 46 additions & 0 deletions internal/tailer/logstream/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,52 @@ func TestFileStreamReadNonSingleByteEnd(t *testing.T) {
wg.Wait()
}

func TestStreamDoesntBreakOnCorruptRune(t *testing.T) {
var wg sync.WaitGroup

tmpDir := testutil.TestTempDir(t)

name := filepath.Join(tmpDir, "log")
f := testutil.TestOpenFile(t, name)
defer f.Close()

lines := make(chan *logline.LogLine, 1)
ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1)
fs, err := logstream.New(ctx, &wg, waker, name, lines, true)
testutil.FatalIfErr(t, err)
awaken(1)

s := string([]byte{0xF1})
// 0xF1 = 11110001 , a byte signaling the start of a unicode character that
// is 4 bytes long.
// The following characters are regular single-byte
// characters, which don't start with a 1 bit to signal it's part of a
// unicode character. This will result in a RuneError.
for i := 0; i < 100; i++ {
s += "a"
}

testutil.WriteString(t, f, s+"\n")
awaken(1)

fs.Stop()
wg.Wait()
close(lines)
received := testutil.LinesReceived(lines)
expected := []*logline.LogLine{
{context.TODO(), name, s[1:]},
}
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))

if !fs.IsComplete() {
t.Errorf("expecting filestream to be complete because stopped")
}
cancel()
wg.Wait()

}

func TestFileStreamTruncation(t *testing.T) {
var wg sync.WaitGroup

Expand Down

0 comments on commit 6d127f3

Please sign in to comment.