Skip to content

[ES-1934053] Detach result streaming from QueryContext cancellation#371

Closed
msrathore-db wants to merge 1 commit into
databricks:mainfrom
msrathore-db:fix/query-context-result-streaming
Closed

[ES-1934053] Detach result streaming from QueryContext cancellation#371
msrathore-db wants to merge 1 commit into
databricks:mainfrom
msrathore-db:fix/query-context-result-streaming

Conversation

@msrathore-db
Copy link
Copy Markdown
Contributor

@msrathore-db msrathore-db commented Jun 2, 2026

Summary

Fixes the Sev1 result-streaming truncation in ES-1934053. Since #295, the caller QueryContext was threaded into NewRowsResultPageIterator, so result paging (FetchResults) and CloseOperation inherited the caller's deadline. A short timeout meant to gate only statement submission + status polling then fired mid-stream and silently truncated large CloudFetch results (a query expected to return 29,232,004 rows returned only 2,159,144; the ArrowBatchIterator surfaced io.EOF rather than the deadline error).

Scope change (please re-review): this PR is now context-fix-only. The Arrow row-count cap that was previously bundled here has been split into #372 — it fixes an independent over-reporting bug and was the source of review findings F1/F4/F5/F6. See the comment below for how each review finding was addressed.

What changed

  • Detach the result context from the caller's cancellation via context.WithoutCancel, preserving its values for auth/logging.
  • Wire the detached context to a cancel func invoked from Rows.Close(), so in-flight FetchResults and CloudFetch downloads are never left uncancellable (addresses F2 — the prior total detachment).
  • GetArrowBatches/GetArrowIPCStreams now build their iterator from the detached context, so CloudFetch S3 downloads also survive the caller's deadline and remain abortable via Close (addresses F3 — previously paging was detached but downloads were not, a split-brain semantics).

Test plan

  • go test ./internal/rows/... — adds TestNewRows_DetachesResultRPCContextFromQueryContextCancellation and TestNewRows_CloseAbortsDetachedResultContext (detached-but-abortable contract).
  • go test ./... — full suite green.
  • E2E against a real SQL warehouse: reproduced the regression (a 100,000-row stream truncated to 12,288 rows under a 2s mid-stream deadline) and confirmed the fix drains the full result. The e2e regression now passes the cancelled QueryContext into GetArrowBatches (previously it used context.Background(), which masked the download-path issue F3 flagged).

This pull request and its description were written by Isaac.

@msrathore-db msrathore-db force-pushed the fix/query-context-result-streaming branch from a412542 to fd65633 Compare June 2, 2026 14:08
@msrathore-db msrathore-db changed the title Fix result streaming after query context cancellation [ES-1934053]Fix result streaming after query context cancellation Jun 2, 2026
@vikrantpuppala
Copy link
Copy Markdown
Collaborator

Code Review Squad — Review

Score: 81/100 — MODERATE RISK

This PR correctly fixes the streaming regression from #295: result-fetch RPCs are detached from QueryContext cancellation via context.WithoutCancel while preserving driver Values (conn/correlation/query id and auth credentials), confirmed by all 7 reviewers and a mutation-test of the regression test. The main concern is the second half: the new limitArrowRecords row-count cap is applied to the inline/local Arrow path too, so a server RowCount of 0 or any under-count now silently drops rows instead of returning the full result — a correctness risk worth resolving before merge. Other findings are a design call (in-flight fetches/CloudFetch S3 downloads are now uncancellable) and minor maintainability/test-coverage notes.


Could not post inline comments for: F1, F2, F3, F4, F5 — see body below. (GitHub PR review comment API returned 403 for this repository; findings are included here instead.)

Critical & High Findings

F1 — High: Row-count cap is applied to the inline Arrow path — risks silent data loss

File: internal/rows/arrowbased/batchloader.go line 596

[HIGH CONFIDENCE — flagged by devils-advocate + language, orchestrator-verified]

batchIterator.Next type-asserts to positionedIPCStreamIterator (line 583), and both localIPCStreamIterator (NextWithMetadata returns ab.RowCount, line 162) and cloudIPCStreamIterator (line 196) implement it. So this cap applies to inline/local results too — not just CloudFetch, where Arrow padding is the real concern. On main the local path returned all decoded records and never trusted RowCount.

limitArrowRecords only short-circuits on expectedRows < 0 (line 634). The caller guard here is expectedRows >= 0, so expectedRows == 0 enters the cap, hits remaining <= 0 on the first record, and Release()s every record → empty batch, no error. Reproduced empirically: with 6 decoded rows, RowCount=4 drops 2 rows; RowCount=0 drops the entire batch — all silently.

  • Suggested fix: scope the cap to the CloudFetch path only, OR cap only down when expectedRows < decoded and emit a warning (never silently), and at minimum guard the unambiguously-wrong expectedRows == 0 && len(records) > 0 case.

Medium Findings

F2 — Medium: Detachment is total — in-flight fetches and CloudFetch S3 downloads are no longer cancellable

File: internal/rows/rows.go line 142

[MODERATE CONFIDENCE — flagged by architecture + devils-advocate]

context.WithoutCancel nulls Done() entirely, so once NewRows returns there is no context path to abort an in-flight FetchResults or a stuck CloudFetch download. The Thrift client has a 900s ClientTimeout backstop, but CloudFetch S3 GETs use http.DefaultClient with no timeout unless cfg.HTTPClient is set — a hung S3 download can now hang indefinitely and uncancellably. A user calling cancel() to stop a runaway result stream will find it no longer works.

Consider deriving the results context from a context.WithCancel wired to rows.Close(), so the deadline used to gate statement submission doesn't bleed into streaming but the handle remains abortable. (Partial precedent: the direct-results CloudFetch path already used context.Background() pre-PR.)

F3 — Medium: Split-brain cancellation in GetArrowBatches/GetArrowIPCStreams

File: internal/rows/rows.go line 654

[LOW CONFIDENCE — flagged by architecture, orchestrator-verified]

GetArrowBatches/GetArrowIPCStreams build the iterator from the fresh caller-supplied ctx (lines 654, 667), while page fetches run through r.ResultPageIterator on the detached resultsCtx (line 211). So cancelling the ctx passed here cancels S3 downloads but not page fetches — half-effective semantics. The new e2e test masks this: it calls GetArrowBatches(context.Background()) with a fresh uncancelled ctx, so it never probes whether the caller ctx is consistently honored.

Decide explicitly: detach both halves (full "result handle outlives query" model), or document that this ctx governs downloads but not pagination.

Low Findings

Low findings (4) — click to expand

F4 — Low: O(n^2) offset re-summation + a second drift-prone offset source (local path)

File: internal/rows/arrowbased/batchloader.go line 153

[HIGH CONFIDENCE — flagged by performance + maintainability + language + devils-advocate]

localIPCStreamIterator.NextWithMetadata re-sums all prior batches' RowCount on every call (O(n^2) over the stream). Perf impact is negligible (local/small-result path only; CloudFetch reads offsets in O(1)). The subtler point: offsets now derive from RowCount rather than actual decoded rows, so any inexact inline RowCount drifts every subsequent batch's StartRowOffset. Carry a running consumed-rows field instead. Non-blocking.

F5 — Low: Add doc comments on the row-count-capping contract

File: internal/rows/arrowbased/batchloader.go line 633

[LOW CONFIDENCE — flagged by maintainability]

limitArrowRecords, the positionedIPCStreamIterator interface (line 34), and the -1 sentinel have no doc comments explaining the row-count-capping contract — the part future maintainers are most likely to misread or mis-"simplify". One sentence on the function ("cap a batch's records to the server-declared row count, dropping/truncating padding rows") plus a note on the NewSlice(0, remaining) + NewDelimiter(start, ...) pairing (slice is row-offset-relative, start is the absolute stream offset) would help. The detachment "why" comment in rows.go is already strong; adding "see #295/#371" would harden it against a future revert.

F6 — Low: Cap integration only covered by env-gated e2e test

[MODERATE CONFIDENCE — flagged by test + language]

The batchIterator.NextpositionedIPCStreamIteratorlimitArrowRecords integration has no unit test; the RowCount == 0 and exact-boundary (NumRows() == remaining) cases are uncovered, and the e2e test (TestE2EArrowBatchesSurviveQueryContextCancellation) won't run in CI without pecotesting creds. A unit test feeding a fake positioned IPC iterator through NewBatchIterator and asserting the batch row count is capped would both close the gap and pin the fix for finding F1 (the inline RowCount==0 data-loss case).

Comment thread internal/rows/arrowbased/batchloader.go Outdated
return nil, err
}
if expectedRows >= 0 {
records = limitArrowRecords(records, expectedRows)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code Review Squad — F1 · HIGH · flagged by devils-advocate + language, verified]

This row-count cap is applied to the inline/local Arrow path too, not just CloudFetch where Arrow padding is the real concern. batchIterator.Next type-asserts to positionedIPCStreamIterator (line 583), and both localIPCStreamIterator (NextWithMetadata returns ab.RowCount, line 162) and cloudIPCStreamIterator (line 196) implement it. On main the local path returned all decoded records and never trusted RowCount.

limitArrowRecords only short-circuits on expectedRows < 0 (line 634); the caller guard here is expectedRows >= 0, so expectedRows == 0 enters the cap, hits remaining <= 0 on the first record, and Release()s every record → empty batch, no error. Reproduced empirically: with 6 decoded rows, RowCount=4 drops 2 rows; RowCount=0 drops the entire batch — all silently. This is a correctness risk strictly worse than the over-reporting bug being fixed.

Suggested fix: scope the cap to the CloudFetch path only, OR cap only down when expectedRows < decoded and emit a warning (never silently); at minimum guard the unambiguously-wrong expectedRows == 0 && len(records) > 0 case.

Comment thread internal/rows/rows.go Outdated
// status polling. Result handles can outlive that phase, especially for
// paginated CloudFetch streams, so detach server-side result RPCs from the
// caller's cancellation while preserving context values used for auth/logging.
resultsCtx := context2.WithoutCancel(ctx)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code Review Squad — F2 · MEDIUM · flagged by architecture + devils-advocate]

context.WithoutCancel nulls Done() entirely, so once NewRows returns there is no context path to abort an in-flight FetchResults or a stuck CloudFetch download. The Thrift client has a 900s ClientTimeout backstop, but CloudFetch S3 GETs use http.DefaultClient with no timeout unless cfg.HTTPClient is set — a hung S3 download can now hang indefinitely and uncancellably. A user calling cancel() to stop a runaway result stream will find it no longer works.

Consider deriving the results context from a context.WithCancel wired to rows.Close(), so the deadline that gates statement submission doesn't bleed into streaming but the handle remains abortable. (Partial precedent: the direct-results CloudFetch path already used context.Background() pre-PR, so this is arguably the intended end state — worth an explicit maintainer decision + a comment either way.)

Comment thread internal/rows/arrowbased/batchloader.go Outdated
if bi.index < cnt {
ab := bi.batches[bi.index]
startRowOffset := bi.startRowOffset
for i := 0; i < bi.index; i++ {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code Review Squad — F4 · LOW · flagged by performance + maintainability + language + devils-advocate]

localIPCStreamIterator.NextWithMetadata re-sums all prior batches' RowCount on every call (O(n²) over the stream). Perf impact is negligible (local/small-result path only; CloudFetch reads offsets in O(1)). The subtler point: offsets now derive from RowCount rather than actual decoded rows, so any inexact inline RowCount would drift every subsequent batch's StartRowOffset. Carry a running consumed-rows field instead. Non-blocking.

Comment thread internal/rows/arrowbased/batchloader.go Outdated
return batch, nil
}

func limitArrowRecords(records []SparkArrowRecord, expectedRows int64) []SparkArrowRecord {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Code Review Squad — F6 · LOW · flagged by maintainability]

limitArrowRecords, the positionedIPCStreamIterator interface (line 34), and the -1 "unknown row count" sentinel have no doc comments explaining the row-count-capping contract — the part future maintainers are most likely to misread or mis-"simplify". One sentence on the function ("cap a batch's records to the server-declared row count, dropping/truncating padding rows") plus a note on the NewSlice(0, remaining) + NewDelimiter(start, ...) pairing (the slice index is record-relative, while start is the absolute stream offset) would prevent a future maintainer from mis-"fixing" the offset.

@msrathore-db msrathore-db force-pushed the fix/query-context-result-streaming branch from 05fd91d to 4334b29 Compare June 2, 2026 19:03
@msrathore-db msrathore-db changed the title [ES-1934053]Fix result streaming after query context cancellation [ES-1934053] Detach result streaming from QueryContext cancellation Jun 2, 2026
@msrathore-db
Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review @vikrantpuppala. I've restructured this PR and addressed all findings. Summary of what changed and where:

This PR was split. It now contains only the context-detachment fix for the Sev1 truncation. The Arrow row-count cap moved to #372, because it fixes a separate bug (CloudFetch over-reporting) that's independent of the streaming truncation. I validated this empirically against a real warehouse:

  • The Sev1 itself is under-reporting from a mid-stream deadline (reproduced: 100k rows → 12,288 under a 2s deadline; full drain with the fix).
  • The over-reporting is real and independent: SELECT * ... LIMIT 300000 decodes to 301,407 rows without a cap. That's Cap CloudFetch Arrow batches to server-declared RowCount #372's concern.

Findings on the row-count cap (now in #372):

  • F1 (silent data loss): Fixed by scoping the cap to the CloudFetch path only — a new positionedIPCStreamIterator is implemented solely by cloudIPCStreamIterator, never by localIPCStreamIterator, so the inline path is byte-for-byte unchanged from main. The caller also guards expectedRows > 0, so RowCount == 0 is treated as "unknown" and never drops rows. Both are covered by TestBatchIterator_RowCountCap (incl. the RowCount==0 and inline-never-capped cases).
  • F4 (O(n²) offset re-summation): Gone — the local iterator no longer derives offsets from RowCount (it never implements the positioned interface).
  • F5 (docs): Added doc comments on positionedIPCStreamIterator, limitArrowRecords (incl. the record-relative NewSlice vs absolute Delimiter start note), and the -1 sentinel.
  • F6 (test coverage): Added the unit test above plus an env-gated e2e (TestE2ECloudFetchExactRowCount) that drains exactly 2,000,000 rows over multiple CloudFetch link pages.

I also cross-checked the official JDBC driver: ArrowResultChunkIterator.hasNextRow() stops at rowsReadByIterator >= resultChunk.numRows (= TSparkArrowResultLink.RowCount) and computes offsets from StartRowOffset + RowCount — i.e. JDBC caps to the server-declared count exactly like #372. Details in #372.

Findings on the context fix (this PR):

  • F2 (total detachment → uncancellable): The detached context is now context.WithCancel(context.WithoutCancel(ctx)), with the cancel func invoked from Rows.Close(). So the caller's deadline no longer truncates the stream, but Close() still aborts any in-flight FetchResults/CloudFetch download — no uncancellable hang. Covered by TestNewRows_CloseAbortsDetachedResultContext.
  • F3 (split-brain cancellation): GetArrowBatches/GetArrowIPCStreams now build the iterator from the detached results context instead of the caller ctx, so CloudFetch downloads are governed the same way as page fetches — both survive the submit-gating deadline and both abort on Close(). This is an explicit decision toward the "result handle outlives the query" model (the customer's pattern passes the same deadline-bound ctx to GetArrowBatches). The e2e test now passes the cancelled ctx into GetArrowBatches so this path is actually exercised.

PTAL — happy to adjust the F3 semantics call if you'd prefer the iterator ctx to remain caller-cancellable instead.

@msrathore-db msrathore-db force-pushed the fix/query-context-result-streaming branch from 4334b29 to d6d2952 Compare June 2, 2026 19:05
PR databricks#295 threaded the caller context into NewRows and the result page
iterator, so FetchResults paging and CloseOperation inherited the
caller's deadline. A short QueryContext timeout meant to gate only
statement submission and status polling then fired mid-stream, silently
truncating large CloudFetch results: a query expected to return
29,232,004 rows returned only 2,159,144. The ArrowBatchIterator surfaced
io.EOF rather than the deadline error, so the truncation was silent.

Detach the result context from the caller's cancellation via
context.WithoutCancel (preserving its values for auth/logging), but wire
it to a cancel func invoked from Rows.Close() so in-flight FetchResults
and CloudFetch downloads are never left uncancellable. GetArrowBatches /
GetArrowIPCStreams now build their iterator from this detached context so
CloudFetch S3 downloads also survive the caller's deadline and remain
abortable via Close — the previous behaviour cancelled downloads but not
paging, a half-effective split.

Adds unit tests for the detached-but-abortable contract and updates the
e2e regression to pass the cancelled QueryContext into GetArrowBatches.

Co-authored-by: Isaac
Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
@msrathore-db
Copy link
Copy Markdown
Contributor Author

Closing in favor of #373. This PR was opened from a fork, so the required Lint and Test and Build checks can't run (fork PRs don't receive the JFrog OIDC id-token). #373 is the identical context-only fix on a same-repo branch where CI runs green. The row-count cap that was originally bundled here is in #372. Review discussion from here is summarized in #373; please re-review there.

msrathore-db added a commit that referenced this pull request Jun 3, 2026
…373)

> Supersedes #371 (which was opened from a fork and therefore could not
run the required JFrog-dependent CI checks — forks don't receive the
OIDC `id-token`). Same context-only fix, same-repo branch so CI can run.
Review history and discussion are on #371.

## Summary
Fixes the Sev1 result-streaming truncation in **ES-1934053**. Since
#295, the caller `QueryContext` was threaded into `NewRows` →
`ResultPageIterator`, so result paging (`FetchResults`) and
`CloseOperation` inherited the caller's deadline. A short timeout meant
to gate only statement submission + status polling then fired mid-stream
and **silently truncated** large CloudFetch results (a query expected to
return 29,232,004 rows returned only 2,159,144; the `ArrowBatchIterator`
surfaced `io.EOF` rather than the deadline error).

This PR is **context-fix-only**. The Arrow row-count cap that was
originally bundled with this work is split into #372 (an independent
*over-reporting* fix).

## What changed
- Detach the result context from the caller's cancellation via
`context.WithoutCancel`, preserving its values for auth/logging.
- Wire the detached context to a cancel func invoked from
`Rows.Close()`, so in-flight `FetchResults` and CloudFetch downloads are
never left uncancellable (addresses review finding **F2**).
- `GetArrowBatches`/`GetArrowIPCStreams` now build their iterator from
the detached context, so CloudFetch S3 downloads also survive the
caller's deadline and remain abortable via `Close` (addresses **F3** —
previously paging was detached but downloads were not).

## Test plan
- `go test ./internal/rows/...` — adds
`TestNewRows_DetachesResultRPCContextFromQueryContextCancellation` and
`TestNewRows_CloseAbortsDetachedResultContext`.
- `go test ./...` — full suite green.
- E2E against a real SQL warehouse: reproduced the regression
(100,000-row stream truncated to **12,288** rows under a 2s mid-stream
deadline) and confirmed the fix drains the full result. The e2e
regression passes the **cancelled** `QueryContext` into
`GetArrowBatches` (previously `context.Background()`, which masked the
download path).

This pull request and its description were written by Isaac.

Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
msrathore-db added a commit that referenced this pull request Jun 3, 2026
## Summary
CloudFetch Arrow IPC files can contain **padding rows beyond the
`RowCount` declared on each result link**
(`TSparkArrowResultLink.RowCount`), which the driver surfaced as extra
rows. Validated against a real warehouse, `SELECT * ... LIMIT 300000`
decoded to **301,407 rows** (1,407 padding rows beyond the requested
count).

This PR caps each decoded CloudFetch batch to its link's `RowCount` and
anchors batch offsets to the link's `StartRowOffset`.

The cap is **scoped to the CloudFetch path only**, via a new
`positionedIPCStreamIterator` interface implemented solely by
`cloudIPCStreamIterator`. The inline/local Arrow path is intentionally
left unchanged: those batches are returned verbatim with no padding, and
their per-batch `RowCount` has historically been untrusted, so capping
there could silently drop rows. A `RowCount <= 0` is treated as
"unknown" and never drops rows.

## Why this matches the server contract
The official **JDBC driver** does exactly this:
`ArrowResultChunkIterator.hasNextRow()` stops once `rowsReadByIterator
>= resultChunk.numRows`, where `numRows =
TSparkArrowResultLink.RowCount`. It decodes the full Arrow file but
never exposes more than the server-declared count, and cross-chunk row
offsets are computed from `StartRowOffset + RowCount` (server values),
not decoded counts. This PR mirrors that behavior for the Go driver.

## Relationship to #371 / ES-1934053
Split out of #371. #371 fixes the Sev1 **truncation** (result streaming
detached from `QueryContext` cancellation — *under*-reporting). This PR
fixes the independent **over-reporting** discovered during that
validation. The two are orthogonal and can merge in any order.

## Test plan
- `go test ./internal/rows/arrowbased/...` — new
`TestBatchIterator_RowCountCap` covers cap-down, exact boundary,
over-count, the `RowCount==0` safety case (no silent drop), and
inline-never-capped.
- `go test ./...` — full suite green.
- E2E against a real SQL warehouse: `TestE2ECloudFetchExactRowCount`
drains `SELECT id, repeat('x',64) FROM range(2000000)` over multiple
CloudFetch link pages and asserts exactly **2,000,000** rows
(over-reported without the cap).

This pull request and its description were written by Isaac.

Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants