feat: support distributed query cancellation in query queue and cancellation promise for Athena#10953
Conversation
|
Claude finished @paveltiunov's task in 2m 17s —— View job Code Review: Distributed Query Cancellation + Athena Cancel Support
Overall: Solid design — piggybacking cancellation detection on the existing heartbeat is a pragmatic approach that avoids CubeStore protocol changes. The Athena cancellation support is a meaningful improvement over the previous no-op Bugs Found
Race Condition (Low Severity)The heartbeat-based cancellation detection in QueryQueue has a TOCTOU window between Code Quality
TestingNo tests were added for either change. Key scenarios that would benefit from coverage:
Security & Performance
|
| const promise: any = (async () => { | ||
| qid = await this.startQuery(query, values); | ||
| if (cancelled) { | ||
| await this.stopQuery(qid); | ||
| throw new Error('Query was cancelled'); | ||
| } | ||
| await this.waitForSuccess(qid, () => cancelled); | ||
| const iter = this.lazyRowIterator(qid, query, true); | ||
| const types = <TableStructure><unknown>((await iter.next()).value); | ||
| const rows: Row[] = []; | ||
| for await (const row of iter) { | ||
| if (cancelled) break; | ||
| rows.push(<Row>row); | ||
| } | ||
| return { types, rows }; | ||
| })(); | ||
|
|
||
| promise.cancel = async () => { | ||
| cancelled = true; | ||
| if (qid) { | ||
| await this.stopQuery(qid); | ||
| } | ||
| }; | ||
|
|
||
| return promise; |
There was a problem hiding this comment.
Nit (code quality): The cancelable-promise wrapping pattern (declare qid/cancelled, create IIFE, attach .cancel()) is duplicated nearly identically across memory(), stream(), query(), and loadPreAggregationIntoTable() — four copies of the same boilerplate.
Consider extracting a small helper like:
private makeCancelable<T>(
fn: (isCancelled: () => boolean) => Promise<T>
): MaybeCancelablePromise<T> {
let qid: AthenaQueryId | null = null;
let cancelled = false;
const promise: any = fn(() => cancelled);
promise.cancel = async () => {
cancelled = true;
if (qid) await this.stopQuery(qid);
};
return promise;
}Not blocking, but would reduce the surface area for the partial-result bug found above from four sites to one.
| } | ||
| } finally { | ||
| // catch block can throw an exception, it's why it's important to clearInterval here | ||
| queryExecutionFinished = true; |
There was a problem hiding this comment.
Good: The old comment (// catch block can throw an exception, it's why it's important to clearInterval here) was removed, and queryExecutionFinished = true is now set before clearInterval. This correctly signals to any in-flight heartbeat callback that execution is done.
One edge case: if a heartbeat callback is currently executing its getQueryDef check when queryExecutionFinished is set to true, the callback will exit early via the if (queryExecutionFinished) return; guard — which is correct behavior.
| protected async waitForSuccess(qid: AthenaQueryId, isCancelled?: () => boolean): Promise<void> { | ||
| const startedTime = Date.now(); | ||
| for (let i = 0; Date.now() - startedTime <= this.config.pollTimeout; i++) { | ||
| if (isCancelled?.()) { |
There was a problem hiding this comment.
The cancellation check runs before checkStatus(), so if the query was cancelled between polls but the Athena query already succeeded, waitForSuccess will throw "Query was cancelled" even though the data is ready. This is fine in practice (the stopQuery call is best-effort and a no-op on a completed query), but worth noting that the ordering means cancellation always wins — even a microsecond before success. This seems intentional for consistency.
|
Claude finished @paveltiunov's task in 4m 31s —— View job Comprehensive Code Review: Distributed Query Cancellation + Athena Cancel Support
SummaryThe design is sound — piggybacking cancellation detection on the existing heartbeat avoids CubeStore protocol changes, and adding Bugs Found
Race Condition Analysis (QueryQueue)I traced through the possible interleavings of the heartbeat-based cancellation detection and found the double-check pattern is correct. The key invariant: Code Quality
Security & Performance
TestingNo tests were added for either change. Key scenarios that would benefit from coverage:
RecommendationFix the two |
| if (!queryExecutionFinished && localCancelHandler !== null) { | ||
| try { | ||
| const currentDef = await queueConnection.getQueryDef(queryKeyHashed, queueId); | ||
| if (!currentDef && !queryExecutionFinished) { | ||
| this.logger('Cancelling query due to external cancellation', { | ||
| queueId, | ||
| queryKey: query.queryKey, | ||
| queuePrefix: this.redisQueuePrefix, | ||
| requestId: query.requestId, | ||
| metadata: query.query?.metadata, | ||
| preAggregationId: query.query?.preAggregation?.preAggregationId, | ||
| newVersionEntry: query.query?.newVersionEntry, | ||
| preAggregation: query.query?.preAggregation, | ||
| addedToQueueTime: query.addedToQueueTime, | ||
| }); | ||
| const cancelQuery = { ...query, cancelHandler: localCancelHandler }; | ||
| if (this.cancelHandlers[query.queryHandler]) { | ||
| await this.cancelHandlers[query.queryHandler](cancelQuery); | ||
| } | ||
| } |
There was a problem hiding this comment.
Race condition analysis — actually well-handled. I traced through the possible interleavings:
When a query completes normally, the finally block (line 928) sets queryExecutionFinished = true before setResultAndRemoveQuery (line 932) removes the queue item. So if a heartbeat callback is mid-flight:
- It passes the first
!queryExecutionFinishedguard (line 791) getQueryDefresolves — even if the item was just removed bysetResultAndRemoveQuery,queryExecutionFinishedis alreadytrue- The second check
!queryExecutionFinished(line 794) catches it — no false cancellation
Since both reads in !currentDef && !queryExecutionFinished happen in the same synchronous expression (no microtask boundary between them), this double-check pattern is sound in single-threaded JS.
One suggestion: consider adding a comment documenting this invariant ("queryExecutionFinished must be set before the queue item is removed") since it's the load-bearing ordering guarantee.
| let qid: AthenaQueryId | null = null; | ||
| let cancelled = false; | ||
|
|
||
| const promise: any = (async () => { | ||
| qid = await this.startQuery(query, values); | ||
| if (cancelled) { | ||
| await this.stopQuery(qid); | ||
| throw new Error('Query was cancelled'); | ||
| } | ||
| await this.waitForSuccess(qid, () => cancelled); | ||
| const iter = this.lazyRowIterator(qid, query, true); | ||
| const types = <TableStructure><unknown>((await iter.next()).value); | ||
| const rows: Row[] = []; | ||
| for await (const row of iter) { | ||
| if (cancelled) break; | ||
| rows.push(<Row>row); | ||
| } | ||
| return { types, rows }; | ||
| })(); | ||
|
|
||
| promise.cancel = async () => { | ||
| cancelled = true; | ||
| if (qid) { | ||
| await this.stopQuery(qid); | ||
| } | ||
| }; | ||
|
|
||
| return promise; |
There was a problem hiding this comment.
Nit (code quality): The cancelable-promise wrapping pattern (let qid/cancelled, create async IIFE, attach .cancel()) is repeated nearly identically across memory(), stream(), query(), and loadPreAggregationIntoTable(). A small helper would centralize the logic and reduce the surface area for bugs like the partial-result issue:
private makeCancelable<T>(
fn: (isCancelled: () => boolean) => Promise<T>
): MaybeCancelablePromise<T> {
let qid: AthenaQueryId | null = null;
let cancelled = false;
const promise: any = fn(() => cancelled);
promise.cancel = async () => {
cancelled = true;
if (qid) await this.stopQuery(qid);
};
return promise;
}Not blocking, but would cut four ~15-line boilerplate blocks down to one.
| }); | ||
| })); | ||
|
|
||
| app.delete(`${this.basePath}/v1/running-query/:requestId`, userMiddlewares, userAsyncHandler(async (req: any, res) => { |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #10953 +/- ##
===========================================
+ Coverage 58.51% 78.84% +20.33%
===========================================
Files 216 470 +254
Lines 17014 93380 +76366
Branches 3457 3462 +5
===========================================
+ Hits 9956 73629 +63673
- Misses 6555 19248 +12693
Partials 503 503
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…llation promise for Athena Distributed Query Cancellation (QueryQueue): - During the heartbeat interval, the executing node now checks if the queue item was removed externally (by another node calling cancelQuery or reconcile removing orphaned/stalled queries) - If the queue item is gone and a cancel handler is registered locally, the local cancel handler is invoked, triggering actual DB query cancellation - This closes the gap where cancellation from one node would remove the queue entry but the actual DB query kept running on the executing node Athena Cancellation Promise (AthenaDriver): - query(), memory(), stream(), downloadQueryResults(), and loadPreAggregationIntoTable() now return MaybeCancelablePromise with a .cancel() method - Calling cancel() sets a flag that aborts the poll loop in waitForSuccess() and calls stopQueryExecution() on AWS Athena - waitForSuccess() now accepts an optional isCancelled callback to detect cancellation during the poll loop - The stream() release function now properly stops the Athena query - This enables the query orchestrator to cancel in-flight Athena queries on timeout, manual cancel, or pre-aggregation cancellation via cancelCombinator Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
…llback Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
Add DELETE /cubejs-api/v1/running-query/:requestId endpoint that cancels in-flight queries matching the given request ID across all SQL and pre-aggregation queues. - QueryQueue.cancelQueryByRequestId: scans queued queries and cancels those matching the request ID - QueryOrchestrator.cancelQueryByRequestId: searches across all SQL query and pre-aggregation queues for all data sources - OrchestratorApi.cancelQueryByRequestId: delegates to orchestrator - ApiGateway: registers DELETE route with user auth middleware Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
…ding - Remove timeout argument from test() call — jest.setTimeout at the describe level already applies, and the types don't support the 3-argument overload (TS2769) - Remove blank line before closing brace (padded-blocks lint error) Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
…tion Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
…on detection" This reverts commit ae4de19.
Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
The SQL API appends -span-N suffixes to request IDs on each continue-wait retry, so the same logical query may have different span suffixes in the queue. Strip the -span-N suffix before comparing to match all queries from the same request. Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
The /cubesql endpoint's native transport has an internal retry loop
that re-submits cancelled queries with new span IDs, so cancellation
doesn't surface as an error to the client. Use /load instead where
the continue-wait loop is client-controlled.
After cancellation, /load returns { error: 'Continue wait' } since
the queue item is gone. Assert result.error is defined.
Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
When a client closes the connection to /v1/cubesql, the native transport's continue-wait retry loop would keep polling indefinitely. Fix: listen for res 'close' event in the gateway and mark the request ID as closed. On each retry iteration, the sqlApiLoad callback checks if the request was closed and throws 'Client disconnected' — a non-continue-wait error that breaks the transport retry loop. Closed request IDs are auto-cleaned after 5 minutes. Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
This reverts commit 1a33564.
When a client closes the HTTP connection to /v1/cubesql, the native transport's continue-wait retry loop would keep polling indefinitely. Fix: register a 'close' event listener on the Node.js response stream from the Rust side (OnCloseHandler, modeled after OnDrainHandler). When the stream closes, a oneshot channel fires. In handle_sql_query, tokio::select! races the execute() future against the close signal. If the client disconnects, the execute future is dropped, which cancels the transport retry loop and all downstream operations. Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
76def12 to
1cd4b72
Compare
Review: latest synchronize (Rust client-disconnect + UUID-prefix cancel matching)Verdict: No blocking issues. 0 high, 0 medium, 2 low (both inline). Prior partial-result and unhandled-rejection bugs are fixed (verified in DetailsTasks
Previously-flagged bugs (verified fixed in
|
| let result = tokio::select! { | ||
| _ = close_rx => { | ||
| Err(CubeError::internal("Client disconnected".to_string())) | ||
| } | ||
| res = execute() => res | ||
| }; |
There was a problem hiding this comment.
Question — does cancellation actually propagate to the database? When the disconnect arm wins, tokio::select! drops the execute() future, which cancels the in-flight stream.next().await on the next yield. That stops consumption of batches, but whether the underlying database query is actually aborted depends on how get_df_batches / the upstream driver handles drop. If it spawns work on a separate task (or if a downstream driver like Athena holds an HTTP connection that completes regardless of consumer), the query keeps running until natural completion. Worth confirming this is "stop polling/streaming on disconnect" (analogous to the SQL API REST flow) rather than "cancel the underlying query."
Stream 'close' also fires on normal end. Node's Writable emits 'close' after the response naturally finishes. In the success path, execute() resolves first and tokio::select! returns Ok; close_rx is dropped and the JS listener firing later is harmless (the Arc<Mutex<Option<Sender>>>::take() makes subsequent on_close calls no-ops). Just noting it's deliberate that the close listener outlives close_rx.
Listener never detached. Like the existing OnDrainHandler, the 'close' listener stays attached for the lifetime of stream_methods.stream. Consistent with the existing pattern, but worth noting if the same JS stream is ever reused across queries — a stale handler from a prior query would fire.
| public async cancelQueryByRequestId(requestId: string): Promise<QueryDef[]> { | ||
| const extractUUID = (id: string) => { | ||
| const idx = id.lastIndexOf('-span-'); | ||
| return idx !== -1 ? id.substring(0, idx) : id; | ||
| }; | ||
|
|
||
| const targetUUID = extractUUID(requestId); | ||
| const queries: any[] = await this.getQueries(); | ||
| const cancelled: QueryDef[] = []; | ||
|
|
||
| for (const query of queries) { | ||
| if (query.requestId && extractUUID(query.requestId) === targetUUID) { |
There was a problem hiding this comment.
UUID-prefix match is broader than the prior exact match. Two observations:
-
Cross-span cancellation is now possible. If two logical requests share the same UUID prefix but differ only in
-span-N, cancelling one cancels the other. This appears intentional (the goal is to cancel all sub-spans of a parent request), but it's an implicit semantic widening from the previous exact-match behavior — worth a one-line comment documenting the intent. -
lastIndexOf('-span-')accepts any input. IfrequestIdis ever user-supplied (e.g., viax-request-idheader), a string likeevil-span-1would be silently truncated toeviland match any query whose UUID happens to beevil. In practice the prefix is a generated UUID so collisions are negligible, but if the goal is strictly "strip a server-generated-span-Nsuffix," consider anchoring with a regex like/-span-\d+$/so unrelated strings aren't quietly normalized.
Neither blocks merge.
- Start slow query via /v1/cubesql (REST SQL API) - Poll pg_stat_activity on the underlying Postgres to confirm pg_sleep is running - Abort the HTTP connection (triggers Rust-side close detection) - Cancel the query in the queue via DELETE /running-query/:requestId - Poll pg_stat_activity again to confirm pg_sleep has stopped, verifying the query is no longer executing in Postgres Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
|
Claude finished @cursor[bot]'s task in 1m 16s —— View job Review of latest synchronize (test rewrite)Verdict: No blocking issues. 0 high, 0 medium, 0 low new. Diff since prior review ( DetailsTasks
New: test rewrite (
|
- Start slow query via /v1/cubesql REST SQL API - Confirm pg_sleep is running via pg_stat_activity on the underlying Postgres - Abort HTTP connection (triggers Rust-side close detection) - Cancel query in queue via DELETE /running-query/:requestId - Verify second cancel returns empty (query is gone from queue) - Clean up lingering pg_sleep via pg_terminate_backend in finally Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
|
Claude finished @cursor[bot]'s task in 34s —— View job Review of latest synchronize (test refinement in
|
Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
|
Claude finished @cursor[bot]'s task in 23s —— View job Review of latest synchronize (
|
Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
|
Claude finished @cursor[bot]'s task in 24s —— View job Review of latest synchronize (
|
Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
|
Claude finished @cursor[bot]'s task in 22s —— View job Review of latest synchronize (
|
…e and QueryCache Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
|
Claude finished @cursor[bot]'s task in 26s —— View job Review of latest synchronize (
|
Description of Changes Made
Distributed Query Cancellation (QueryQueue)
In a distributed/multi-node Cube deployment with CubeStore, when a query is cancelled (manually, via orphan/stale detection, or timeout reconciliation) by a node different from the one executing the query, the queue entry is removed but the actual database query keeps running until it completes or times out. This is because the cancel handler (
queue.handles[cancelHandler]) is process-local — it only exists on the node executing the query.Changes:
processQueryto detect external cancellationgetQueryDefprocessCancel, triggering actual database query cancellationqueryExecutionFinishedflag to prevent false positives (e.g., detecting "gone" after natural completion)localCancelHandlertracking to capture the cancel handle registered by the query handlerThis enables distributed query cancellation at heartbeat interval granularity (default 30s) without requiring inter-process messaging or CubeStore protocol changes.
Athena Cancellation Promise (AthenaDriver)
The Athena driver previously returned plain
Promiseobjects from query methods, which meant the query orchestrator could not cancel in-flight Athena queries on timeout, manual cancel, or pre-aggregation cancellation.Changes:
query(),memory(),stream(),downloadQueryResults(), andloadPreAggregationIntoTable()now returnMaybeCancelablePromisewith a.cancel()method.cancel()sets a cancellation flag and callsstopQueryExecution()on AWS Athena via the existingstopQuery()methodwaitForSuccess()now accepts an optionalisCancelledcallback to detect cancellation during the poll loop, enabling fast exit from the polling cyclestream()releasefunction now properly stops the Athena query instead of being a no-opstartQuery(), during each poll iteration inwaitForSuccess(), and during result row iterationThis integrates with the query orchestrator's existing cancel machinery — the
cancelCombinatorpattern inPreAggregationLoaderand the cancel handler registration inQueryCache.createQueuewill automatically detect and use the.cancel()method.REST API for Query Cancellation by Request ID
Added a new public API endpoint for cancelling running queries by their request ID.
Endpoint:
DELETE /cubejs-api/v1/running-query/:requestIdFlow:
ApiGatewayregisters the route with user auth middlewareOrchestratorApi.cancelQueryByRequestId()delegates to the orchestratorQueryOrchestrator.cancelQueryByRequestId()searches across all SQL query and pre-aggregation queues for all data sourcesQueryQueue.cancelQueryByRequestId()scans queued queries, matches byrequestId, and cancels viacancelQuery()Check List