diff --git a/docs-mintlify/reference/core-data-apis/rest-api/reference.mdx b/docs-mintlify/reference/core-data-apis/rest-api/reference.mdx index 1991e47c4aede..aa6676daae666 100644 --- a/docs-mintlify/reference/core-data-apis/rest-api/reference.mdx +++ b/docs-mintlify/reference/core-data-apis/rest-api/reference.mdx @@ -20,6 +20,13 @@ Run the query to the REST (JSON) API and get the results. | `queryType` | If multiple queries are passed in `query` for [data blending][ref-recipes-data-blending], this must be set to `multi` | ❌ No | | `cache` | See [cache control][ref-cache-control]. `stale-if-slow` by default | ❌ No | +### Headers + +| Header | Description | Required | +| --- | --- | --- | +| `Authorization` | API token for authentication | ✅ Yes | +| `x-request-id` | Custom request identifier. When provided, this ID is used to track the query through the system and can be used to [cancel the query](#base_pathv1running-queryrequestid). If not provided, a unique ID is generated automatically. | ❌ No | + Response - `query` - The query passed via params. It can be an array of queries and in @@ -401,6 +408,13 @@ This endpoint is part of the [SQL API][ref-sql-api]. | `timezone` | The [time zone][ref-time-zone] for this query in the [TZ Database Name][link-tzdb] format, e.g., `America/Los_Angeles` | ❌ No | | `cache` | See [cache control][ref-cache-control]. `stale-if-slow` by default | ❌ No | +### Headers + +| Header | Description | Required | +| --- | --- | --- | +| `Authorization` | API token for authentication | ✅ Yes | +| `x-request-id` | Custom request identifier. When provided, this ID is used to track the query through the system and can be used to [cancel the query](#base_pathv1running-queryrequestid). If not provided, a unique ID is generated automatically. | ❌ No | + Response: a stream of newline-delimited JSON objects. The first object contains the `schema` property with column names and types, and optionally `lastRefreshTime` indicating when the data was last refreshed. @@ -447,6 +461,57 @@ Response: {"data":[["Max VERSTAPPEN","10","3.730769230769231"],["Lando NORRIS","4","4"],["Charles LECLERC","4","4.730769230769231"],["Oscar PIASTRI","3","4.8076923076923075"],["Andrea Kimi ANTONELLI","0","5"]]} ``` +## `{base_path}/v1/running-query/{requestId}` + +Cancel a running query by its request ID. This endpoint cancels any in-flight +queries matching the given request ID across all query queues. + +The request ID can be obtained from the `x-request-id` header sent with +the original query request to endpoints like +[`/v1/load`](#base_pathv1load) or [`/v1/cubesql`](#base_pathv1cubesql). + +| Parameter | Description | Required | +| --- | --- | --- | +| `requestId` | The request ID of the query to cancel (URL path parameter) | ✅ Yes | + +Response: + +- `result` — an array of cancelled query definitions. Empty array if no + matching queries were found in the queue. + +### Example + +Cancel a query: + +```bash +curl \ + -X DELETE \ + -H "Authorization: TOKEN" \ + http://localhost:4000/cubejs-api/v1/running-query/my-request-id-span-1 +``` + +Example response when a query was cancelled: + +```json +{ + "result": [ + { + "queryHandler": "query", + "queryKey": "...", + "requestId": "my-request-id-span-1" + } + ] +} +``` + +Example response when no matching query was found: + +```json +{ + "result": [] +} +``` + ## `{base_path}/v1/pre-aggregations/jobs` Trigger pre-aggregation build jobs or retrieve statuses of such jobs. diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 070342511a6f1..6dbd852f4ed32 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -428,6 +428,14 @@ class ApiGateway { }); })); + app.delete(`${this.basePath}/v1/running-query/:requestId`, userMiddlewares, userAsyncHandler(async (req: any, res) => { + await this.cancelQuery({ + requestId: req.params.requestId, + context: req.context, + res: this.resToResultFn(res), + }); + })); + /** ************************************************************** * meta scope * *************************************************************** */ @@ -1261,6 +1269,23 @@ class ApiGateway { } } + public async cancelQuery( + { requestId, context, res }: { requestId: string, context: RequestContext, res: ResponseResultFn } + ) { + const requestStarted = new Date(); + try { + const orchestratorApi = await this.getAdapterApi(context); + const cancelled = await orchestratorApi.cancelQueryByRequestId(requestId); + await res({ + result: cancelled + }); + } catch (e: any) { + this.handleError({ + e, context, res, requestStarted + }); + } + } + /** * Convert incoming query parameter (JSON fetched from the HTTP) to * an array of query type and array of normalized queries. diff --git a/packages/cubejs-athena-driver/src/AthenaDriver.ts b/packages/cubejs-athena-driver/src/AthenaDriver.ts index b89dd5302688f..8cc8e6155c447 100644 --- a/packages/cubejs-athena-driver/src/AthenaDriver.ts +++ b/packages/cubejs-athena-driver/src/AthenaDriver.ts @@ -10,6 +10,7 @@ import { checkNonNullable, pausePromise, Required, + MaybeCancelablePromise, } from '@cubejs-backend/shared'; import { Athena, @@ -243,12 +244,13 @@ export class AthenaDriver extends BaseDriver implements DriverInterface { /** * Executes a query and returns either query result memory data or * query result stream, depending on options. + * Returns a cancelable promise that will stop the Athena query on cancel. */ - public async downloadQueryResults( + public downloadQueryResults( query: string, values: unknown[], options: DownloadQueryResultsOptions, - ): Promise { + ): MaybeCancelablePromise { if (!options.streamImport) { return this.memory(query, values); } else { @@ -259,59 +261,122 @@ export class AthenaDriver extends BaseDriver implements DriverInterface { /** * Executes query and returns table memory data that includes rows * and queried fields types. + * Returns a cancelable promise that will stop the Athena query on cancel. */ - public async memory( + public memory( query: string, values: unknown[], - ): Promise { - const qid = await this.startQuery(query, values); - await this.waitForSuccess(qid); - const iter = this.lazyRowIterator(qid, query, true); - const types = ((await iter.next()).value); - const rows: Row[] = []; - for await (const row of iter) { - rows.push(row); - } - return { types, rows }; + ): MaybeCancelablePromise { + let qid: AthenaQueryId | null = null; + let cancelled = false; + + const promise: any = (async () => { + qid = await this.startQuery(query, values); + if (cancelled) { + await this.stopQuery(qid); + throw new Error('Query was cancelled'); + } + await this.waitForSuccess(qid, () => cancelled); + const iter = this.lazyRowIterator(qid, query, true); + const types = ((await iter.next()).value); + const rows: Row[] = []; + for await (const row of iter) { + if (cancelled) throw new Error('Query was cancelled'); + rows.push(row); + } + return { types, rows }; + })(); + + promise.cancel = async () => { + cancelled = true; + if (qid) { + await this.stopQuery(qid); + } + }; + + return promise; } /** * Returns stream table object that includes query result stream and * queried fields types. + * Returns a cancelable promise that will stop the Athena query on cancel. */ - public async stream( + public stream( query: string, values: unknown[], options: StreamOptions, - ): Promise { - const qid = await this.startQuery(query, values); - await this.waitForSuccess(qid); - const iter = this.lazyRowIterator(qid, query, true); - const types = ((await iter.next()).value); - return { - rowStream: stream.Readable.from(iter, { - highWaterMark: options.highWaterMark, - }), - types, - release: async () => { /* canceling is missed in the iter */ }, + ): MaybeCancelablePromise { + let qid: AthenaQueryId | null = null; + let cancelled = false; + + const promise: any = (async () => { + qid = await this.startQuery(query, values); + if (cancelled) { + await this.stopQuery(qid); + throw new Error('Query was cancelled'); + } + await this.waitForSuccess(qid, () => cancelled); + const iter = this.lazyRowIterator(qid, query, true); + const types = ((await iter.next()).value); + return { + rowStream: stream.Readable.from(iter, { + highWaterMark: options.highWaterMark, + }), + types, + release: async () => { + if (qid) { + await this.stopQuery(qid); + } + }, + }; + })(); + + promise.cancel = async () => { + cancelled = true; + if (qid) { + await this.stopQuery(qid); + } }; + + return promise; } /** - * Executes query and rerutns queried rows. + * Executes query and returns queried rows. + * Returns a cancelable promise that will stop the Athena query on cancel. */ - public async query( + public query( query: string, values: unknown[], _options?: QueryOptions, - ): Promise { - const qid = await this.startQuery(query, values); - await this.waitForSuccess(qid); - const rows: R[] = []; - for await (const row of this.lazyRowIterator(qid, query)) { - rows.push(row); - } - return rows; + ): MaybeCancelablePromise { + let qid: AthenaQueryId | null = null; + let cancelled = false; + + const promise: any = (async () => { + qid = await this.startQuery(query, values); + if (cancelled) { + await this.stopQuery(qid); + throw new Error('Query was cancelled'); + } + await this.waitForSuccess(qid, () => cancelled); + const rows: R[] = []; + for await (const row of this.lazyRowIterator(qid, query)) { + if (cancelled) throw new Error('Query was cancelled'); + rows.push(row); + } + return rows; + })(); + + promise.cancel = async () => { + cancelled = true; + if (qid) { + await this.stopQuery(qid); + } + }; + + return promise; } /** @@ -368,18 +433,36 @@ export class AthenaDriver extends BaseDriver implements DriverInterface { /** * Save pre-aggregation data into a temp table. + * Returns a cancelable promise that will stop the Athena query on cancel. */ - public async loadPreAggregationIntoTable( + public loadPreAggregationIntoTable( preAggregationTableName: string, loadSql: string, params: any, - ): Promise { - if (this.config.S3OutputLocation === undefined) { - throw new Error('Unload is not configured. Please define CUBEJS_AWS_S3_OUTPUT_LOCATION env var '); - } + ): MaybeCancelablePromise { + let qid: AthenaQueryId | null = null; + let cancelled = false; - const qid = await this.startQuery(loadSql, params); - await this.waitForSuccess(qid); + const promise: any = (async () => { + if (this.config.S3OutputLocation === undefined) { + throw new Error('Unload is not configured. Please define CUBEJS_AWS_S3_OUTPUT_LOCATION env var '); + } + qid = await this.startQuery(loadSql, params); + if (cancelled) { + await this.stopQuery(qid); + throw new Error('Query was cancelled'); + } + await this.waitForSuccess(qid, () => cancelled); + })(); + + promise.cancel = async () => { + cancelled = true; + if (qid) { + await this.stopQuery(qid); + } + }; + + return promise; } /** @@ -543,9 +626,12 @@ export class AthenaDriver extends BaseDriver implements DriverInterface { return status === 'SUCCEEDED'; } - protected async waitForSuccess(qid: AthenaQueryId): Promise { + protected async waitForSuccess(qid: AthenaQueryId, isCancelled?: () => boolean): Promise { const startedTime = Date.now(); for (let i = 0; Date.now() - startedTime <= this.config.pollTimeout; i++) { + if (isCancelled?.()) { + throw new Error('Query was cancelled'); + } if (await this.checkStatus(qid)) { return; } diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index 5121832fb0ed1..c5bdc1cc687d4 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -18,7 +18,7 @@ use crate::cubesql_utils::with_session; use crate::logger::NodeBridgeLogger; use crate::rest4sql::rest4sql; use crate::sql4sql::sql4sql; -use crate::stream::OnDrainHandler; +use crate::stream::{OnCloseHandler, OnDrainHandler}; use crate::tokio_runtime_node; use crate::transport::NodeBridgeTransport; use crate::utils::{batch_to_rows, NonDebugInRelease}; @@ -28,6 +28,7 @@ use cubesqlplanner::cube_bridge::base_query_options::NativeBaseQueryOptions; use cubesqlplanner::planner::base_query::BaseQuery; use std::rc::Rc; use std::sync::Arc; +use tokio::sync::oneshot; use cubesql::telemetry::LocalReporter; use cubesql::{telemetry::ReportingLogger, CubeError}; @@ -303,6 +304,11 @@ async fn handle_sql_query( let session_clone = Arc::clone(&session); let span_id_clone = span_id.clone(); + let (close_tx, close_rx) = oneshot::channel::<()>(); + let close_handler = + OnCloseHandler::new(channel.clone(), stream_methods.stream.clone(), close_tx); + close_handler.handle(stream_methods.on.clone()).await?; + let execute = || async move { // todo: can we use compiler_cache? let meta_context = transport_service.meta(native_auth_ctx).await?; @@ -432,7 +438,12 @@ async fn handle_sql_query( Ok::<(), CubeError>(()) }; - let result = execute().await; + let result = tokio::select! { + _ = close_rx => { + Err(CubeError::internal("Client disconnected".to_string())) + } + res = execute() => res + }; match &result { Ok(_) => { diff --git a/packages/cubejs-backend-native/src/stream.rs b/packages/cubejs-backend-native/src/stream.rs index a39093a28ef15..4a496b3537cca 100644 --- a/packages/cubejs-backend-native/src/stream.rs +++ b/packages/cubejs-backend-native/src/stream.rs @@ -89,6 +89,72 @@ impl OnDrainHandler { } } +fn handle_on_close(mut cx: FunctionContext) -> JsResult { + let this = cx + .this::>()? + .downcast_or_throw::, _>(&mut cx)?; + this.on_close(); + + Ok(cx.undefined()) +} + +pub struct OnCloseHandler { + channel: Arc, + js_stream: Arc>, + sender: Arc>>>, +} + +unsafe impl Sync for OnCloseHandler {} + +impl Finalize for OnCloseHandler {} + +impl OnCloseHandler { + pub fn new( + channel: Arc, + js_stream: Arc>, + sender: oneshot::Sender<()>, + ) -> Self { + Self { + channel, + js_stream, + sender: Arc::new(Mutex::new(Some(sender))), + } + } + + pub async fn handle(&self, js_stream_on_fn: Arc>) -> Result<(), CubeError> { + let js_stream_obj = self.js_stream.clone(); + let handler = Self { + channel: self.channel.clone(), + js_stream: self.js_stream.clone(), + sender: self.sender.clone(), + }; + + call_js_fn( + self.channel.clone(), + js_stream_on_fn, + Box::new(|cx| { + let on_close_fn = JsFunction::new(cx, handle_on_close)?; + + let this = cx.boxed(handler).upcast::(); + let on_close_fn = bind_method(cx, on_close_fn, this)?; + + let event_arg = cx.string("close").upcast::(); + + Ok(vec![event_arg, on_close_fn.upcast::()]) + }), + Box::new(|_, _| Ok(())), + js_stream_obj, + ) + .await + } + + fn on_close(&self) { + if let Some(sender) = self.sender.lock().unwrap().take() { + let _ = sender.send(()); + } + } +} + pub struct JsWriteStream { sender: Sender, ready_sender: Mutex>>>, diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 70b04ade38ddb..316ab3183e2c8 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -24,7 +24,7 @@ import { ContinueWaitError } from './ContinueWaitError'; import { LocalCacheDriver } from './LocalCacheDriver'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; import { LoadPreAggregationResult, PreAggregationDescription } from './PreAggregations'; -import { getCacheHash } from './utils'; +import { getCacheHash, extractRequestUUID } from './utils'; import { CacheAndQueryDriverType, MetadataOperationType } from './QueryOrchestrator'; export type CacheQueryResultOptions = { @@ -408,9 +408,7 @@ export class QueryCache { } public static extractRequestUUID(requestId: string): string { - const idx = requestId.lastIndexOf('-span-'); - - return idx !== -1 ? requestId.substring(0, idx) : requestId; + return extractRequestUUID(requestId); } protected static replaceAll(replaceThis, withThis, inThis) { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index e257579fa5397..c7cfaf8827542 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -435,6 +435,20 @@ export class QueryOrchestrator { return this.preAggregations.cancelQueriesFromQueue(queryKeys, dataSource); } + public async cancelQueryByRequestId(requestId: string) { + const cancelled = []; + + for (const queue of Object.values(this.queryCache.getQueues())) { + cancelled.push(...await queue.cancelQueryByRequestId(requestId)); + } + + for (const queue of Object.values(this.preAggregations.getQueues())) { + cancelled.push(...await queue.cancelQueryByRequestId(requestId)); + } + + return cancelled; + } + public async updateRefreshEndReached() { return this.preAggregations.updateRefreshEndReached(); } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts index 19f0dcbeba660..d3699314f818b 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts @@ -16,6 +16,7 @@ import { ContinueWaitError } from './ContinueWaitError'; import { LocalQueueDriver } from './LocalQueueDriver'; import { QueryStream } from './QueryStream'; import { CacheAndQueryDriverType } from './QueryOrchestrator'; +import { extractRequestUUID } from './utils'; export type CancelHandlerFn = (query: QueryDef) => Promise; export type QueryHandlerFn = (query: QueryDef, cancelHandler: CancelHandlerFn) => Promise; @@ -517,6 +518,21 @@ export class QueryQueue { } } + public async cancelQueryByRequestId(requestId: string): Promise { + const targetUUID = extractRequestUUID(requestId); + const queries: any[] = await this.getQueries(); + const cancelled: QueryDef[] = []; + + for (const query of queries) { + if (query.requestId && extractRequestUUID(query.requestId) === targetUUID) { + await this.cancelQuery(query.queryKey, null); + cancelled.push(query); + } + } + + return cancelled; + } + /** * Reconciliation logic: cancel stalled and orphaned queries from the queue * and pick some planned to be processed queries to process. @@ -754,6 +770,10 @@ export class QueryQueue { if (query && insertedCount && activated && processingLockAcquired) { let executionResult; + let queryExecutionFinished = false; + // Set by the query handler's setCancelHandler callback once execution begins. + // Not available on the original query def from retrieveForProcessing. + let localCancelHandler: unknown = null; const startQueryTime = (new Date()).getTime(); const timeInQueue = (new Date()).getTime() - query.addedToQueueTime; this.logger('Performing query', { @@ -774,7 +794,7 @@ export class QueryQueue { let queryProcessHeartbeat = Date.now(); const heartBeatTimer = setInterval( - () => { + async () => { if ((Date.now() - queryProcessHeartbeat) > 5 * 60 * 1000) { this.logger('Query processing heartbeat', { queueId, @@ -784,7 +804,46 @@ export class QueryQueue { queryProcessHeartbeat = Date.now(); } - return queueConnection.updateHeartBeat(queryKeyHashed, queueId); + try { + await queueConnection.updateHeartBeat(queryKeyHashed, queueId); + } catch (e: any) { + this.logger('Error updating heartbeat', { + queueId, + queryKey: query.queryKey, + error: e.stack || e, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId, + }); + } + + if (!queryExecutionFinished && localCancelHandler !== null) { + try { + const currentDef = await queueConnection.getQueryDef(queryKeyHashed, queueId); + if (!currentDef && !queryExecutionFinished) { + this.logger('Cancelling query due to external cancellation', { + queueId, + queryKey: query.queryKey, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId, + metadata: query.query?.metadata, + preAggregationId: query.query?.preAggregation?.preAggregationId, + newVersionEntry: query.query?.newVersionEntry, + preAggregation: query.query?.preAggregation, + addedToQueueTime: query.addedToQueueTime, + }); + const cancelQuery = { ...query, cancelHandler: localCancelHandler }; + await this.processCancel(cancelQuery, queueId); + } + } catch (e: any) { + this.logger('Error checking for external cancellation', { + queueId, + queryKey: query.queryKey, + error: e.stack || e, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId, + }); + } + } }, this.heartBeatInterval * 1000 ); @@ -813,6 +872,7 @@ export class QueryQueue { this.queryHandlers[handler]( query.query, async (cancelHandler) => { + localCancelHandler = cancelHandler; try { await queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId, queueId); } catch (e: any) { @@ -891,7 +951,7 @@ export class QueryQueue { } } } finally { - // catch block can throw an exception, it's why it's important to clearInterval here + queryExecutionFinished = true; clearInterval(heartBeatTimer); } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts index 1e9c366a09a61..9a32c023e3ab5 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts @@ -31,3 +31,11 @@ export function getCacheHash(queryKey: QueryKey | CacheKey, processUid?: string) .digest('hex') as any; } } + +/** + * Extracts the UUID prefix from a request ID by stripping the `-span-N` suffix. + */ +export function extractRequestUUID(requestId: string): string { + const idx = requestId.lastIndexOf('-span-'); + return idx !== -1 ? requestId.substring(0, idx) : requestId; +} diff --git a/packages/cubejs-server-core/src/core/OrchestratorApi.ts b/packages/cubejs-server-core/src/core/OrchestratorApi.ts index 59f73ec26d398..e4c8b910b04e6 100644 --- a/packages/cubejs-server-core/src/core/OrchestratorApi.ts +++ b/packages/cubejs-server-core/src/core/OrchestratorApi.ts @@ -297,6 +297,10 @@ export class OrchestratorApi { return this.orchestrator.cancelPreAggregationQueriesFromQueue(queryKeys, dataSource); } + public async cancelQueryByRequestId(requestId: string) { + return this.orchestrator.cancelQueryByRequestId(requestId); + } + public async updateRefreshEndReached() { return this.orchestrator.updateRefreshEndReached(); } diff --git a/packages/cubejs-testing/birdbox-fixtures/postgresql/schema/SlowQuery.js b/packages/cubejs-testing/birdbox-fixtures/postgresql/schema/SlowQuery.js new file mode 100644 index 0000000000000..536bd118afc49 --- /dev/null +++ b/packages/cubejs-testing/birdbox-fixtures/postgresql/schema/SlowQuery.js @@ -0,0 +1,17 @@ +cube(`SlowQuery`, { + sql: `SELECT pg_sleep(90), 1 as id`, + + measures: { + count: { + type: `count`, + }, + }, + + dimensions: { + id: { + sql: `id`, + type: `number`, + primaryKey: true, + }, + }, +}); diff --git a/packages/cubejs-testing/test/smoke-cubesql.test.ts b/packages/cubejs-testing/test/smoke-cubesql.test.ts index 1dc83840f00f4..389454d969d8b 100644 --- a/packages/cubejs-testing/test/smoke-cubesql.test.ts +++ b/packages/cubejs-testing/test/smoke-cubesql.test.ts @@ -1093,4 +1093,83 @@ filter_subq AS ( expect(res.rows).toMatchSnapshot(); }); }); + + describe('Query cancellation by request ID', () => { + test('cancel a running query via REST API', async () => { + const requestId = 'cancel-test-request-id-12345'; + const token = jwt.sign({ user: 'admin' }, DEFAULT_CONFIG.CUBEJS_API_SECRET, { expiresIn: '1h' }); + + // Connect directly to the underlying Postgres to check pg_stat_activity + const pgConn = new PgClient({ + host: db.getHost(), + port: db.getMappedPort(5432), + database: 'test', + user: 'test', + password: 'test', + ssl: false, + }); + await pgConn.connect(); + + try { + // Start a slow query (pg_sleep(30) in the SlowQuery cube SQL) + // via the REST SQL API. Use AbortController so we can close the + // connection to trigger the Rust-side close detection. + const abortController = new AbortController(); + const queryPromise = fetch(`${birdbox.configuration.apiUrl}/cubesql`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: token, + 'x-request-id': requestId, + }, + body: JSON.stringify({ + query: 'SELECT count FROM SlowQuery', + }), + signal: abortController.signal, + }).then(r => r.text()).catch(e => e); + + // Wait for pg_sleep to appear in pg_stat_activity + let sleepRunning = false; + for (let i = 0; i < 20; i++) { + await new Promise(resolve => setTimeout(resolve, 500)); + const { rows } = await pgConn.query( + "SELECT count(*) as cnt FROM pg_stat_activity WHERE query LIKE '%pg_sleep%' AND state = 'active' AND pid != pg_backend_pid()" + ); + if (parseInt(rows[0].cnt, 10) > 0) { + sleepRunning = true; + break; + } + } + expect(sleepRunning).toBe(true); + + // Close the HTTP connection — this triggers the Rust-side + // OnCloseHandler which drops the execute future via tokio::select! + abortController.abort(); + await queryPromise; + + // Cancel the query in the queue by request ID + const cancelRes = await fetch(`${birdbox.configuration.apiUrl}/running-query/${requestId}`, { + method: 'DELETE', + headers: { Authorization: token }, + }); + expect(cancelRes.status).toBe(200); + + await new Promise(resolve => setTimeout(resolve, 2000)); + + // A second cancel should return empty — the query is gone from the queue + const cancelRes2 = await fetch(`${birdbox.configuration.apiUrl}/running-query/${requestId}`, { + method: 'DELETE', + headers: { Authorization: token }, + }); + const cancelBody2 = await cancelRes2.json() as any; + expect(cancelBody2.result).toEqual([]); + } finally { + // pg_terminate_backend for any lingering pg_sleep queries + await pgConn.query( + "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query LIKE '%pg_sleep%' AND state = 'active' AND pid != pg_backend_pid()" + ); + await pgConn.end(); + } + }); + }); });