Skip to content

Commit

Permalink
Fix Unicode replacement character appear in parse result
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengtianbao committed Jul 12, 2022
1 parent c4802e1 commit c12a012
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
8 changes: 7 additions & 1 deletion internal/tailer/logstream/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ import (
var logLines = expvar.NewMap("log_lines_total")

// decodeAndSend transforms the byte array `b` into unicode in `partial`, sending to the llp as each newline is decoded.
func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) {
func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) int {
var (
r rune
width int
count int
)
for i := 0; i < len(b) && i < n; i += width {
r, width = utf8.DecodeRune(b[i:])
if r == utf8.RuneError {
return count
}
// Most file-based log sources will end with \n on Unixlike systems.
// On Windows they appear to be both \r\n. syslog disallows \r (and \t
// and others) and writes them escaped, per syslog(7). [RFC
Expand All @@ -39,7 +43,9 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname
default:
partial.WriteRune(r)
}
count += width
}
return count
}

func sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) {
Expand Down
9 changes: 8 additions & 1 deletion internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
glog.V(2).Infof("%v: seeked to end", fd)
}
b := make([]byte, defaultReadBufferSize)
var lastBytes []byte
partial := bytes.NewBufferString("")
started := make(chan struct{})
var total int
Expand All @@ -106,7 +107,13 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
if count > 0 {
total += count
glog.V(2).Infof("%v: decode and send", fd)
decodeAndSend(ctx, fs.lines, fs.pathname, count, b[:count], partial)
needSend := append(lastBytes, b[:count]...)
sendCount := decodeAndSend(ctx, fs.lines, fs.pathname, len(needSend), needSend, partial)
if sendCount < len(needSend) {
lastBytes = append([]byte{}, needSend[sendCount:]...)
} else {
lastBytes = []byte{}
}
fs.mu.Lock()
fs.lastReadTime = time.Now()
fs.mu.Unlock()
Expand Down

0 comments on commit c12a012

Please sign in to comment.