diff --git a/internal/tailer/logstream/decode.go b/internal/tailer/logstream/decode.go index d229d8cf8..49eb7c5cd 100644 --- a/internal/tailer/logstream/decode.go +++ b/internal/tailer/logstream/decode.go @@ -17,13 +17,17 @@ import ( var logLines = expvar.NewMap("log_lines_total") // decodeAndSend transforms the byte array `b` into unicode in `partial`, sending to the llp as each newline is decoded. -func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) { +func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) int { var ( r rune width int + count int ) for i := 0; i < len(b) && i < n; i += width { r, width = utf8.DecodeRune(b[i:]) + if r == utf8.RuneError { + return count + } // Most file-based log sources will end with \n on Unixlike systems. // On Windows they appear to be both \r\n. syslog disallows \r (and \t // and others) and writes them escaped, per syslog(7). [RFC @@ -39,7 +43,9 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname default: partial.WriteRune(r) } + count += width } + return count } func sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) { diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index 2729f209e..e2cf06ee0 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -82,6 +82,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.V(2).Infof("%v: seeked to end", fd) } b := make([]byte, defaultReadBufferSize) + var lastBytes []byte partial := bytes.NewBufferString("") started := make(chan struct{}) var total int @@ -106,7 +107,13 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if count > 0 { total += count glog.V(2).Infof("%v: decode and send", fd) - decodeAndSend(ctx, fs.lines, fs.pathname, count, b[:count], partial) + needSend := append(lastBytes, b[:count]...) + sendCount := decodeAndSend(ctx, fs.lines, fs.pathname, len(needSend), needSend, partial) + if sendCount < len(needSend) { + lastBytes = append([]byte{}, needSend[sendCount:]...) + } else { + lastBytes = []byte{} + } fs.mu.Lock() fs.lastReadTime = time.Now() fs.mu.Unlock()