Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
(unexported; no API change). (I-03)

### Fixed
- `TestCopyCtx_MidStreamCancel` was racy and failed intermittently on
`main` once #39 and #40 landed together: the test's blocking reader
returned `io.EOF` after `close(br.release)`, but `copyCtx` returns
`nil` on EOF before its next `ctx.Err()` check, so the cancel could
be missed. The reader now blocks on `<-ctx.Done()` and returns
`ctx.Err()` instead, making the test deterministic without changing
production code. Test-only.
- `stderrProgress` `stop()` is now idempotent. The first call closes the
channel and waits for the drain goroutine; subsequent calls are no-ops.
Latent — all current callers defer `stop` exactly once; this guards
Expand Down
33 changes: 14 additions & 19 deletions image/copyctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"errors"
"io"
"sync"
"testing"
)
Expand Down Expand Up @@ -45,12 +44,13 @@ func TestCopyCtx_PreCancelled(t *testing.T) {
}
}

// blockingReader yields one chunk, then blocks on release until the test
// signals it. Used to deterministically interleave a cancel with the copy.
// blockingReader yields one chunk, then on the second call blocks until
// ctx is done and returns ctx.Err(). This avoids the race where copyCtx
// could see io.EOF before its next ctx.Err() check and return nil.
type blockingReader struct {
ctx context.Context
first []byte
yielded bool
release chan struct{}
}

func (b *blockingReader) Read(p []byte) (int, error) {
Expand All @@ -59,15 +59,14 @@ func (b *blockingReader) Read(p []byte) (int, error) {
n := copy(p, b.first)
return n, nil
}
<-b.release
return 0, io.EOF
<-b.ctx.Done()
return 0, b.ctx.Err()
}

func TestCopyCtx_MidStreamCancel(t *testing.T) {
first := bytes.Repeat([]byte("a"), 32*1024) // exactly one chunk
br := &blockingReader{first: first, release: make(chan struct{})}

ctx, cancel := context.WithCancel(context.Background())
br := &blockingReader{ctx: ctx, first: first}
var dst bytes.Buffer

var (
Expand All @@ -81,19 +80,15 @@ func TestCopyCtx_MidStreamCancel(t *testing.T) {
n, cerr = copyCtx(ctx, &dst, br)
}()

// Wait until the first chunk has been written, then cancel. closing
// br.release after cancel unblocks the goroutine deterministically:
// if it had already entered the second (blocking) Read, the close
// returns it (0, io.EOF) and the next ctx.Err() check returns the
// cancel; otherwise the cancel was observed first and the close is
// a no-op for an already-returned goroutine.
for {
if dst.Len() == len(first) {
break
}
// Wait until the first chunk has been written, then cancel.
// Whichever side observes the cancel first wins deterministically:
// copyCtx's top-of-loop ctx.Err() check returns context.Canceled,
// or the reader's <-ctx.Done() unblocks and returns ctx.Err() which
// copyCtx propagates via the rerr branch. Either way, cerr ==
// context.Canceled.
for dst.Len() != len(first) {
}
cancel()
close(br.release)
wg.Wait()

if !errors.Is(cerr, context.Canceled) {
Expand Down