streaming: prevent reader Close from deadlocking on a fatal read error#78
Merged
Conversation
On a fatal read error (e.g. the underlying stream key being destroyed) the reader read loop called Close synchronously. Close waits on the read goroutine via wait.Wait, so calling it from that same goroutine deadlocked and leaked the reader and its Redis connection; any external Close blocked forever too. Trigger the shutdown asynchronously so the read goroutine can return and release the wait group. Close is already idempotent via sync.Once, so this is safe alongside a concurrent external Close. Adds a regression test that drives the real read loop through a simulated fatal error (via a small xreadFn seam) and asserts the reader closes instead of hanging. The test hangs on pre-fix code.
Move the test-only read hook from a Reader struct field to a package variable so it no longer pollutes Reader. Behavior is unchanged; it defaults to (*Reader).xread and is only overridden in tests.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Follow-up to #77. That PR fixed
Closedeadlocking on a stalled subscriber; this fixes a sibling deadlock in the same area that I spotted while reviewing it.On a fatal read error —
handleReadErrorreturns the error when Redis reports the stream key no longer exists — the reader's read loop callsClose()on itself:But
Closedoesr.wait.Wait(), which waits for the read goroutine to run its deferredcleanup()(wait.Done()). Since that goroutine is the one blocked insideClose, it deadlocks: the read goroutine and its Redis connection leak, and any externalClose()blocks forever too. Same goroutine + connection leak as #77, different trigger.Fix
Trigger the shutdown asynchronously (
pulse.Go(r.logger, r.Close)) and let the read goroutine return socleanupreleases the wait group.Closeis already idempotent viasync.Once(from #77), so this is safe alongside a concurrent externalClose.Only
Readeris affected —Sink.readlogs and continues on read errors rather than closing itself.Note on the test
The fatal path is only reached when Redis returns
"stream key no longer exists", which it does not emit deterministically for a blockingXREAD(deleting the key doesn't unblock the call, so a Redis-driven test would be timing-dependent). To get a deterministic regression test that exercises the realread()loop, I added a small package-levelxreadFnseam that defaults to(*Reader).xreadand is only overridden in tests to inject the fatal error. Happy to drop it if you'd rather not carry the seam.Test plan
go test -race ./streaming -run TestReaderCloseOnFatalReadError -count=10— new regression test; hangs on pre-fix code (verified by reverting the fix), passes here.go test -race ./streaming -count=1