Skip to content

Commit

Permalink
Merge pull request #658 from zhengtianbao/fix637
Browse files Browse the repository at this point in the history
Fix Unicode replacement character appear in parse result
  • Loading branch information
jaqx0r authored Jul 12, 2022
2 parents c4802e1 + 4ba57b0 commit 13d1e22
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
8 changes: 7 additions & 1 deletion internal/tailer/logstream/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ import (
var logLines = expvar.NewMap("log_lines_total")

// decodeAndSend transforms the byte array `b` into unicode in `partial`, sending to the llp as each newline is decoded.
func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) {
func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) int {
var (
r rune
width int
count int
)
for i := 0; i < len(b) && i < n; i += width {
r, width = utf8.DecodeRune(b[i:])
if r == utf8.RuneError {
return count
}
// Most file-based log sources will end with \n on Unixlike systems.
// On Windows they appear to be both \r\n. syslog disallows \r (and \t
// and others) and writes them escaped, per syslog(7). [RFC
Expand All @@ -39,7 +43,9 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname
default:
partial.WriteRune(r)
}
count += width
}
return count
}

func sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) {
Expand Down
10 changes: 9 additions & 1 deletion internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
glog.V(2).Infof("%v: seeked to end", fd)
}
b := make([]byte, defaultReadBufferSize)
var lastBytes []byte
partial := bytes.NewBufferString("")
started := make(chan struct{})
var total int
Expand All @@ -106,7 +107,14 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
if count > 0 {
total += count
glog.V(2).Infof("%v: decode and send", fd)
decodeAndSend(ctx, fs.lines, fs.pathname, count, b[:count], partial)
needSend := lastBytes
needSend = append(needSend, b[:count]...)
sendCount := decodeAndSend(ctx, fs.lines, fs.pathname, len(needSend), needSend, partial)
if sendCount < len(needSend) {
lastBytes = append([]byte{}, needSend[sendCount:]...)
} else {
lastBytes = []byte{}
}
fs.mu.Lock()
fs.lastReadTime = time.Now()
fs.mu.Unlock()
Expand Down
41 changes: 41 additions & 0 deletions internal/tailer/logstream/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,47 @@ func TestFileStreamRead(t *testing.T) {
wg.Wait()
}

func TestFileStreamReadNonSingleByteEnd(t *testing.T) {
var wg sync.WaitGroup

tmpDir := testutil.TestTempDir(t)

name := filepath.Join(tmpDir, "log")
f := testutil.TestOpenFile(t, name)
defer f.Close()

lines := make(chan *logline.LogLine, 1)
ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1)
fs, err := logstream.New(ctx, &wg, waker, name, lines, true)
testutil.FatalIfErr(t, err)
awaken(1)

s := "a"
for i := 0; i < 4094; i++ {
s += "a"
}

s += "中"
testutil.WriteString(t, f, s+"\n")
awaken(1)

fs.Stop()
wg.Wait()
close(lines)
received := testutil.LinesReceived(lines)
expected := []*logline.LogLine{
{context.TODO(), name, s},
}
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))

if !fs.IsComplete() {
t.Errorf("expecting filestream to be complete because stopped")
}
cancel()
wg.Wait()
}

func TestFileStreamTruncation(t *testing.T) {
var wg sync.WaitGroup

Expand Down

0 comments on commit 13d1e22

Please sign in to comment.