fix: remature context cancellation during graceful shutdown in PartitionedFanoutWriter#1225
Conversation
…ionedFanoutWriter
tanmayrauth
left a comment
There was a problem hiding this comment.
Thanks for the PR, Two things before this lands:
-
Moving cancel() to the end of closeAndWait means it's skipped on the error return at rolling_data_writer.go:427-429, so the context leaks whenever a writer errors. defer r.cancel() near the top keeps the drain-before-cancel ordering (it still runs after wg.Wait()) while always firing.
-
I don't think removing r.cancel() from close() prevents cancellation during the drain. r.ctx is a child of the errgroup context (rolling_data_writer.go:266, ctx from errgroup.WithContext at partitioned_fanout_writer.go:83), and fanoutWorkers.Wait() at partitioned_fanout_writer.go:187 cancels that errgroup ctx as soon as it returns — before closeAll() runs at :188. So r.ctx is already Done during the drain, with or without this change. Could you share a repro so I can see the mechanism you're fixing?
| return fmt.Errorf("error in rolling data writer: %w", err) | ||
| } | ||
|
|
||
| r.cancel() |
There was a problem hiding this comment.
Moving cancel() to the end of closeAndWait means it's skipped on the error return, so the context leaks whenever a writer errors. defer r.cancel() near the top keeps the drain-before-cancel ordering (it still runs after wg.Wait()) while always firing.
laskoviymishka
left a comment
There was a problem hiding this comment.
Diagnosis is right, cancelling before recordCh drains is what makes the in-flight Arrow calls (openFileWriter / ToRequestedSchema / SortRecordBatch) fail with context.Canceled and drop buffered records, so moving cancel past wg.Wait() is the right shape.
I'd hold before merge though: pulling cancel() out of close() opens two leaks: the error branch of closeAndWait() now returns without cancelling, and the unpartitionedWrite sites in arrow_utils.go that call close() directly lose the cancel they used to get as a side effect. defer r.cancel() inside the stream goroutine closes both at once.
A regression test for the drain would be needed too. Details inline.
| return fmt.Errorf("error in rolling data writer: %w", err) | ||
| } | ||
|
|
||
| r.cancel() |
There was a problem hiding this comment.
placing cancel() only on the success return means the error branch above (return fmt.Errorf("error in rolling data writer: %w", err)) now exits without ever cancelling — so every writer that hits a stream error leaks its derived context until the parent is cancelled, which in a long-lived process (compaction daemon, streaming ingest) may be never. closeAll() keeps iterating after the first error, so a batch of failing writers leaks one each.
I'd move this further out: defer r.cancel() inside the stream goroutine. It owns the derived ctx and every caller already wg.Wait()s on it, so cancel fires once when streaming ends on any path — that also covers the unpartitionedWrite sites in arrow_utils.go and closeCurrentWriter in clustered_writer.go, which call close() directly and lose the cancel they used to get from it. A defer r.cancel() here in closeAndWait() only fixes this branch and leaves those two leaking. wdyt?
| @@ -429,6 +428,8 @@ func (r *RollingDataWriter) closeAndWait() error { | |||
| return fmt.Errorf("error in rolling data writer: %w", err) | |||
There was a problem hiding this comment.
minor: if cancel() ends up staying here rather than moving into the goroutine, nlreturn will want a blank line before this return too — the new return nil below got one but this branch didn't. CI runs gofumpt + nlreturn, so worth a quick make lint before pushing.
zeroshade
left a comment
There was a problem hiding this comment.
Requesting changes because the current patch still leaves the graceful-drain bug unresolved. PR #1368 already implements the correct fix for this same bug; I recommend consolidating/closing #1225 in favor of #1368, or adopting #1368's approach here. Also, laskoviymishka's earlier change request remains unaddressed and there is no regression test covering this path.
| return fmt.Errorf("error in rolling data writer: %w", err) | ||
| } | ||
|
|
||
| r.cancel() |
There was a problem hiding this comment.
Blocking: moving cancel() here does not fix the fanout drain path. RollingDataWriter instances are created with the errgroup.WithContext context in table/partitioned_fanout_writer.go around line 160, and fanoutWorkers.Wait() cancels that context before writerFactory.closeAll() drains writers around line 188. That means closeAndWait() can still drain buffered records under a canceled context and hit context.Canceled in stream() / openFileWriter / ToRequestedSchema / SortRecordBatch. Please decouple the per-writer drain context from the fanout errgroup context on success, and cancel writers only on the abort/error path. This is the approach already implemented in PR #1368; please consolidate with or adopt that fix.
Problem Statement
Currently, when
partitionedFanoutWriter.Writecompletes successfully, it waits for all upstream producers to finish (fanoutWorkers.Wait()), and then attempts a graceful shutdown by callingwriterFactory.closeAll().However,
writerFactory.closeAll()iterates through theRollingDataWriters and callscloseAndWait()on each. Theclose()method does the following:By immediately calling
r.cancel(), it actively cancels the context passed to the backgroundstreamgoroutine. SincerecordChis buffered (size 64), it may still contain unprocessedRecordBatchitems. If the underlying Arrow functions (e.g.,compute.SortRecordBatch) or Parquet writers checkctx.Done()during this draining phase, they will abort immediately with acontext.Cancelederror. This converts what should be a successful, graceful shutdown into an error, and silently drops the remaining buffered records.Proposed Solution
To support proper channel draining,
RollingDataWritershould rely onclose(r.recordCh)as the primary signal for thestreamgoroutine to process the remaining buffered records and exit naturally.In this PR:
r.cancel()fromclose().r.cancel()to the end ofcloseAndWait(), afterr.wg.Wait()has returned and all residual batches have been successfully processed and flushed to Parquet.This ensures that the memory buffer is correctly fully drained, preventing silent data loss or unexpected
context.Canceledpanics during the finalization phase. The overall timeout/abort mechanism is still safely controlled by the parent context passed into theWritefunction.