Skip to content

Commit

Permalink
[iomisc.ReadUntilMatch] check context cancellation between Reads
Browse files Browse the repository at this point in the history
Previously if the context was cancelled, we leaked a Go routine that
would continue running io.Copy() even after ReadUntilMatch returned.
This is problematic since it would consume bytes from the underlying
reader.

Also:
* Delete output argument to ReadUntilMatch. This was only used by
  one caller. That caller has been changed to tee the reader themselves.
* Delete the ring package as it's no longer needed.

Note this solution isn't perfect. If the underlying call to m.Read()
blocks, ReadUntilMatch will not respect the context cancellation.
See here for discussion of related issues:
golang/go#20280.

Bug: 66731
Change-Id: I09feffb098247927eb87b94da6e9143e163dfda0
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/470197
Reviewed-by: Ina Huh <ihuh@google.com>
Reviewed-by: Oliver Newman <olivernewman@google.com>
Fuchsia-Auto-Submit: Gary Miguel <garymm@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
  • Loading branch information
Gary Miguel authored and CQ Bot committed Jan 13, 2021
1 parent 35d36e2 commit b774793
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 334 deletions.
1 change: 0 additions & 1 deletion tools/botanist/BUILD.gn
Expand Up @@ -52,7 +52,6 @@ go_library("target") {
"//tools/lib/logger",
"//tools/lib/osmisc",
"//tools/lib/retry",
"//tools/lib/ring",
"//tools/net/netutil",
"//tools/net/sshutil",
"//tools/qemu",
Expand Down
2 changes: 1 addition & 1 deletion tools/botanist/target/device.go
Expand Up @@ -200,7 +200,7 @@ func (t *DeviceTarget) Start(ctx context.Context, images []bootserver.Image, arg
return
}
defer socket.Close()
_, err = iomisc.ReadUntilMatch(ctx, iomisc.NewMatchingReader(socket, [][]byte{[]byte(bootedLogSignature)}), nil)
_, err = iomisc.ReadUntilMatch(ctx, iomisc.NewMatchingReader(socket, [][]byte{[]byte(bootedLogSignature)}))
bootedLogChan <- err
}()
}
Expand Down
1 change: 0 additions & 1 deletion tools/lib/BUILD.gn
Expand Up @@ -16,7 +16,6 @@ group("tests") {
"//tools/lib/logger:logger_tests",
"//tools/lib/osmisc:tests",
"//tools/lib/retry:retry_tests",
"//tools/lib/ring:tests",
"//tools/lib/runner:runner_tests",
"//tools/lib/syslog:syslog_tests",
"//tools/lib/tarutil:tarutil_tests",
Expand Down
5 changes: 1 addition & 4 deletions tools/lib/iomisc/BUILD.gn
Expand Up @@ -14,10 +14,7 @@ go_library("iomisc") {
"reader.go",
"reader_test.go",
]
deps = [
"//tools/lib/logger",
"//tools/lib/ring",
]
deps = [ "//tools/lib/logger" ]
}

go_test("tests") {
Expand Down
55 changes: 20 additions & 35 deletions tools/lib/iomisc/match.go
Expand Up @@ -11,7 +11,6 @@ import (
"io"

"go.fuchsia.dev/fuchsia/tools/lib/logger"
"go.fuchsia.dev/fuchsia/tools/lib/ring"
)

// MatchingReader is an io.Reader implementation that wraps another such
Expand Down Expand Up @@ -76,48 +75,34 @@ type byteTracker interface {
Bytes() []byte
}

// ReadUntilMatch reads from a MatchingReader until a match has been read,
// and ultimately tries to return those matches.
func ReadUntilMatch(ctx context.Context, m *MatchingReader, output io.Writer) ([]byte, error) {
matchWindowSize := 0
for _, tm := range m.toMatch {
if len(tm) > matchWindowSize {
matchWindowSize = len(tm)
// ReadUntilMatch reads from a MatchingReader until a match has been read
// and returns the match.
// Checks ctx for cancellation only between calls to m.Read(), so cancellation
// will not be noticed if m.Read() blocks.
// See https://github.com/golang/go/issues/20280 for discussion of similar issues.
func ReadUntilMatch(ctx context.Context, m *MatchingReader) ([]byte, error) {
// buf size considerations: smaller => more responsive to ctx cancellation,
// larger => less CPU overhead.
buf := make([]byte, 1024)
lastReadSize := 0
for ctx.Err() == nil {
var err error
lastReadSize, err = m.Read(buf)
if err == nil {
continue
}
}

if output == nil {
output = ring.NewBuffer(matchWindowSize)
}

errs := make(chan error)
go func() {
if _, err := io.Copy(output, m); err != nil {
errs <- err
}
errs <- io.EOF
}()

select {
case <-ctx.Done():
// If we time out, it is helpful to see what the last bytes processed
// were: dump these bytes if we can.
if bt, ok := output.(byteTracker); ok {
lastBytes := bt.Bytes()
numBytes := min(len(lastBytes), matchWindowSize)
lastBytes = lastBytes[len(lastBytes)-numBytes:]
logger.Debugf(ctx, "ReadUntilMatch: last %d bytes read before cancellation: %q", numBytes, lastBytes)
}
// TODO(garymm): We should really interrupt the io.Copy() goroutine here.
return nil, ctx.Err()
case err := <-errs:
if errors.Is(err, io.EOF) {
if match := m.Match(); match != nil {
return match, nil
}
}
return nil, err
}

// If we time out, it is helpful to see the last bytes processed.
logger.Debugf(ctx, "ReadUntilMatch: last %d bytes read before cancellation: %q", lastReadSize, buf[:lastReadSize])

return nil, ctx.Err()
}

func min(a, b int) int {
Expand Down
29 changes: 26 additions & 3 deletions tools/lib/iomisc/match_test.go
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"io"
"testing"
"time"
)

func TestMatchingReader(t *testing.T) {
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestReadUntilMatch(t *testing.T) {
w.Write([]byte("EFGH"))
}()

match, err := ReadUntilMatch(context.Background(), m, nil)
match, err := ReadUntilMatch(context.Background(), m)

if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand All @@ -153,10 +154,32 @@ func TestReadUntilMatch(t *testing.T) {
w.Close()
}()

_, err := ReadUntilMatch(context.Background(), m, nil)
_, err := ReadUntilMatch(context.Background(), m)

if !errors.Is(err, io.EOF) {
t.Errorf("ReadUntilMatch() returned %v, expected io.EOF", err)
t.Errorf("ReadUntilMatch() returned %v, want io.EOF", err)
}
})

t.Run("cancellation", func(t *testing.T) {
r, w := io.Pipe()
defer r.Close()
defer w.Close()

m := NewMatchingReader(r, [][]byte{[]byte("A")})

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
defer cancel()

go func() {
b := []byte("B")
for {
w.Write(b)
}
}()

if _, err := ReadUntilMatch(ctx, m); err == nil || !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("ReadUntilMatch() returned %v, want DeadlineExceeded ", err)
}
})
}
19 changes: 0 additions & 19 deletions tools/lib/ring/BUILD.gn

This file was deleted.

134 changes: 0 additions & 134 deletions tools/lib/ring/buffer.go

This file was deleted.

0 comments on commit b774793

Please sign in to comment.