diff --git a/lib/include/duckdb/web/webdb.h b/lib/include/duckdb/web/webdb.h index 5421435e7..0cd847169 100644 --- a/lib/include/duckdb/web/webdb.h +++ b/lib/include/duckdb/web/webdb.h @@ -33,12 +33,14 @@ struct DuckDBWasmResultsWrapper { // Additional ResponseStatuses to be >= 256, and mirrored to packages/duckdb-wasm/src/status.ts // Missing mapping result in a throw, but they should eventually align (it's fine if typescript side only has a // subset) - enum ResponseStatus : uint32_t { ARROW_BUFFER = 0, MAX_ARROW_ERROR = 255 }; + enum ResponseStatus : uint32_t { ARROW_BUFFER = 0, MAX_ARROW_ERROR = 255, DUCKDB_WASM_RETRY = 256 }; DuckDBWasmResultsWrapper(arrow::Result> res, ResponseStatus status = ResponseStatus::ARROW_BUFFER) : arrow_buffer(res), status(status) {} DuckDBWasmResultsWrapper(arrow::Status res, ResponseStatus status = ResponseStatus::ARROW_BUFFER) : arrow_buffer(res), status(status) {} + DuckDBWasmResultsWrapper(ResponseStatus status = ResponseStatus::ARROW_BUFFER) + : DuckDBWasmResultsWrapper(nullptr, status) {} arrow::Result> arrow_buffer; ResponseStatus status; }; diff --git a/lib/src/webdb.cc b/lib/src/webdb.cc index 659cb4a42..869465657 100644 --- a/lib/src/webdb.cc +++ b/lib/src/webdb.cc @@ -239,6 +239,46 @@ DuckDBWasmResultsWrapper WebDB::Connection::FetchQueryResults() { if (current_query_result_ == nullptr) { return DuckDBWasmResultsWrapper{nullptr}; } + + if (current_query_result_->type == QueryResultType::STREAM_RESULT) { + auto& stream_result = current_query_result_->Cast(); + + auto before = std::chrono::steady_clock::now(); + uint64_t elapsed; + auto polling_interval = + webdb_.config_->query.query_polling_interval.value_or(DEFAULT_QUERY_POLLING_INTERVAL); + bool ready = false; + do { + switch (stream_result.ExecuteTask()) { + case StreamExecutionResult::EXECUTION_ERROR: + return arrow::Status{arrow::StatusCode::ExecutionError, + std::move(current_query_result_->GetError())}; + case StreamExecutionResult::EXECUTION_CANCELLED: + return arrow::Status{arrow::StatusCode::ExecutionError, + "The execution of the query was cancelled before it could finish, likely " + "caused by executing a different query"}; + case StreamExecutionResult::CHUNK_READY: + case StreamExecutionResult::EXECUTION_FINISHED: + ready = true; + break; + case StreamExecutionResult::BLOCKED: + stream_result.WaitForTask(); + return DuckDBWasmResultsWrapper::ResponseStatus::DUCKDB_WASM_RETRY; + case StreamExecutionResult::NO_TASKS_AVAILABLE: + return DuckDBWasmResultsWrapper::ResponseStatus::DUCKDB_WASM_RETRY; + case StreamExecutionResult::CHUNK_NOT_READY: + break; + } + + auto after = std::chrono::steady_clock::now(); + elapsed = std::chrono::duration_cast(after - before).count(); + } while (!ready && elapsed < polling_interval); + + if (!ready) { + return DuckDBWasmResultsWrapper::ResponseStatus::DUCKDB_WASM_RETRY; + } + } + // Fetch next result chunk chunk = current_query_result_->Fetch(); if (current_query_result_->HasError()) { diff --git a/packages/duckdb-wasm/src/bindings/bindings_base.ts b/packages/duckdb-wasm/src/bindings/bindings_base.ts index 08cbc2bbe..8e8a1aecd 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_base.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_base.ts @@ -4,7 +4,7 @@ import { Logger } from '../log'; import { InstantiationProgress } from './progress'; import { DuckDBBindings } from './bindings_interface'; import { DuckDBConnection } from './connection'; -import { StatusCode, IsArrowBuffer } from '../status'; +import { StatusCode, IsArrowBuffer, IsDuckDBWasmRetry } from '../status'; import { dropResponseBuffers, DuckDBRuntime, readString, callSRet, copyBuffer, DuckDBDataProtocol } from './runtime'; import { CSVInsertOptions, JSONInsertOptions, ArrowInsertOptions } from './insert_options'; import { ScriptTokens } from './tokens'; @@ -222,14 +222,20 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { return this.mod.ccall('duckdb_web_pending_query_cancel', 'boolean', ['number'], [conn]); } /** Fetch query results */ - public fetchQueryResults(conn: number): Uint8Array { + public fetchQueryResults(conn: number): Uint8Array | null { const [s, d, n] = callSRet(this.mod, 'duckdb_web_query_fetch_results', ['number'], [conn]); + if (IsDuckDBWasmRetry(s)) { + dropResponseBuffers(this.mod); + return null; // Retry + } + if (!IsArrowBuffer(s)) { throw new Error("Unexpected StatusCode from duckdb_web_query_fetch_results (" + s + ") and with self reported error as" + readString(this.mod, d, n)); } if (s !== StatusCode.SUCCESS) { throw new Error(readString(this.mod, d, n)); } + const res = copyBuffer(this.mod, d, n); dropResponseBuffers(this.mod); return res; diff --git a/packages/duckdb-wasm/src/bindings/bindings_interface.ts b/packages/duckdb-wasm/src/bindings/bindings_interface.ts index 271a42ef9..2920972b2 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_interface.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_interface.ts @@ -19,7 +19,7 @@ export interface DuckDBBindings { startPendingQuery(conn: number, text: string, allowStreamResult: boolean): Uint8Array | null; pollPendingQuery(conn: number): Uint8Array | null; cancelPendingQuery(conn: number): boolean; - fetchQueryResults(conn: number): Uint8Array; + fetchQueryResults(conn: number): Uint8Array | null; getTableNames(conn: number, text: string): string[]; createPrepared(conn: number, text: string): number; diff --git a/packages/duckdb-wasm/src/bindings/connection.ts b/packages/duckdb-wasm/src/bindings/connection.ts index 70b5f018d..fb54a3660 100644 --- a/packages/duckdb-wasm/src/bindings/connection.ts +++ b/packages/duckdb-wasm/src/bindings/connection.ts @@ -53,7 +53,6 @@ export class DuckDBConnection { //Otherwise, reject with the error reject(e); } - } }); } @@ -125,7 +124,10 @@ export class ResultStreamIterator implements Iterable { if (this._depleted) { return { done: true, value: null }; } - const bufferI8 = this.bindings.fetchQueryResults(this.conn); + let bufferI8 = null; + do { + bufferI8 = this.bindings.fetchQueryResults(this.conn); + } while (bufferI8 == null); this._depleted = bufferI8.length == 0; return { done: this._depleted, diff --git a/packages/duckdb-wasm/src/parallel/async_bindings.ts b/packages/duckdb-wasm/src/parallel/async_bindings.ts index dc8a81e53..9d8a1c2e7 100644 --- a/packages/duckdb-wasm/src/parallel/async_bindings.ts +++ b/packages/duckdb-wasm/src/parallel/async_bindings.ts @@ -442,8 +442,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { } /** Fetch query results */ - public async fetchQueryResults(conn: ConnectionID): Promise { - const task = new WorkerTask( + public async fetchQueryResults(conn: ConnectionID): Promise { + const task = new WorkerTask( WorkerRequestType.FETCH_QUERY_RESULTS, conn, ); diff --git a/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts b/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts index 97ba2b191..da1e7cbf6 100644 --- a/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts +++ b/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts @@ -22,7 +22,7 @@ export interface AsyncDuckDBBindings { startPendingQuery(conn: number, text: string, allowStreamResult: boolean): Promise; pollPendingQuery(conn: number): Promise; cancelPendingQuery(conn: number): Promise; - fetchQueryResults(conn: number): Promise; + fetchQueryResults(conn: number): Promise; createPrepared(conn: number, text: string): Promise; closePrepared(conn: number, statement: number): Promise; diff --git a/packages/duckdb-wasm/src/parallel/async_connection.ts b/packages/duckdb-wasm/src/parallel/async_connection.ts index 783a18bd5..d4bdd8ecd 100644 --- a/packages/duckdb-wasm/src/parallel/async_connection.ts +++ b/packages/duckdb-wasm/src/parallel/async_connection.ts @@ -115,7 +115,7 @@ export class AsyncResultStreamIterator implements AsyncIterable { /** Reached end of stream? */ protected _depleted: boolean; /** In-flight */ - protected _inFlight: Promise | null; + protected _inFlight: Promise | null; constructor( protected readonly db: AsyncDuckDB, @@ -135,17 +135,21 @@ export class AsyncResultStreamIterator implements AsyncIterable { if (this._depleted) { return { done: true, value: null }; } - let buffer: Uint8Array; + let buffer: Uint8Array | null = null; if (this._inFlight != null) { buffer = await this._inFlight; this._inFlight = null; - } else { + } + + while (buffer == null) { buffer = await this.db.fetchQueryResults(this.conn); } + this._depleted = buffer.length == 0; if (!this._depleted) { this._inFlight = this.db.fetchQueryResults(this.conn); } + return { done: this._depleted, value: buffer, diff --git a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts index 3a5a8f295..b637b78b4 100644 --- a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts +++ b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts @@ -279,6 +279,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger { } case WorkerRequestType.FETCH_QUERY_RESULTS: { const result = this._bindings.fetchQueryResults(request.data); + const transfer = result ? [result.buffer] : []; this.postMessage( { messageId: this._nextMessageId++, @@ -286,7 +287,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger { type: WorkerResponseType.QUERY_RESULT_CHUNK, data: result, }, - [result.buffer], + transfer, ); break; } diff --git a/packages/duckdb-wasm/src/parallel/worker_request.ts b/packages/duckdb-wasm/src/parallel/worker_request.ts index 38502b7b6..071510d16 100644 --- a/packages/duckdb-wasm/src/parallel/worker_request.ts +++ b/packages/duckdb-wasm/src/parallel/worker_request.ts @@ -160,7 +160,7 @@ export type WorkerResponseVariant = | WorkerResponse | WorkerResponse | WorkerResponse - | WorkerResponse + | WorkerResponse | WorkerResponse | WorkerResponse | WorkerResponse @@ -180,7 +180,7 @@ export type WorkerTaskVariant = | WorkerTask | WorkerTask | WorkerTask - | WorkerTask + | WorkerTask | WorkerTask | WorkerTask | WorkerTask diff --git a/packages/duckdb-wasm/src/status.ts b/packages/duckdb-wasm/src/status.ts index a308e1e56..4ae0f8f98 100644 --- a/packages/duckdb-wasm/src/status.ts +++ b/packages/duckdb-wasm/src/status.ts @@ -1,8 +1,13 @@ export enum StatusCode { SUCCESS = 0, MAX_ARROW_ERROR = 255, + DUCKDB_WASM_RETRY = 256, } export function IsArrowBuffer(status: StatusCode): boolean { - return (status <= StatusCode.MAX_ARROW_ERROR); + return status <= StatusCode.MAX_ARROW_ERROR; +} + +export function IsDuckDBWasmRetry(status: StatusCode): boolean { + return status === StatusCode.DUCKDB_WASM_RETRY; }