Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/include/duckdb/web/webdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ struct DuckDBWasmResultsWrapper {
// Additional ResponseStatuses to be >= 256, and mirrored to packages/duckdb-wasm/src/status.ts
// Missing mapping result in a throw, but they should eventually align (it's fine if typescript side only has a
// subset)
enum ResponseStatus : uint32_t { ARROW_BUFFER = 0, MAX_ARROW_ERROR = 255 };
enum ResponseStatus : uint32_t { ARROW_BUFFER = 0, MAX_ARROW_ERROR = 255, DUCKDB_WASM_RETRY = 256 };
DuckDBWasmResultsWrapper(arrow::Result<std::shared_ptr<arrow::Buffer>> res,
ResponseStatus status = ResponseStatus::ARROW_BUFFER)
: arrow_buffer(res), status(status) {}
DuckDBWasmResultsWrapper(arrow::Status res, ResponseStatus status = ResponseStatus::ARROW_BUFFER)
: arrow_buffer(res), status(status) {}
DuckDBWasmResultsWrapper(ResponseStatus status = ResponseStatus::ARROW_BUFFER)
: DuckDBWasmResultsWrapper(nullptr, status) {}
arrow::Result<std::shared_ptr<arrow::Buffer>> arrow_buffer;
ResponseStatus status;
};
Expand Down
40 changes: 40 additions & 0 deletions lib/src/webdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,46 @@ DuckDBWasmResultsWrapper WebDB::Connection::FetchQueryResults() {
if (current_query_result_ == nullptr) {
return DuckDBWasmResultsWrapper{nullptr};
}

if (current_query_result_->type == QueryResultType::STREAM_RESULT) {
auto& stream_result = current_query_result_->Cast<duckdb::StreamQueryResult>();

auto before = std::chrono::steady_clock::now();
uint64_t elapsed;
auto polling_interval =
webdb_.config_->query.query_polling_interval.value_or(DEFAULT_QUERY_POLLING_INTERVAL);
bool ready = false;
do {
switch (stream_result.ExecuteTask()) {
case StreamExecutionResult::EXECUTION_ERROR:
return arrow::Status{arrow::StatusCode::ExecutionError,
std::move(current_query_result_->GetError())};
case StreamExecutionResult::EXECUTION_CANCELLED:
return arrow::Status{arrow::StatusCode::ExecutionError,
"The execution of the query was cancelled before it could finish, likely "
"caused by executing a different query"};
case StreamExecutionResult::CHUNK_READY:
case StreamExecutionResult::EXECUTION_FINISHED:
ready = true;
break;
case StreamExecutionResult::BLOCKED:
stream_result.WaitForTask();
return DuckDBWasmResultsWrapper::ResponseStatus::DUCKDB_WASM_RETRY;
case StreamExecutionResult::NO_TASKS_AVAILABLE:
return DuckDBWasmResultsWrapper::ResponseStatus::DUCKDB_WASM_RETRY;
case StreamExecutionResult::CHUNK_NOT_READY:
break;
}

auto after = std::chrono::steady_clock::now();
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(after - before).count();
} while (!ready && elapsed < polling_interval);

if (!ready) {
return DuckDBWasmResultsWrapper::ResponseStatus::DUCKDB_WASM_RETRY;
}
}

// Fetch next result chunk
chunk = current_query_result_->Fetch();
if (current_query_result_->HasError()) {
Expand Down
10 changes: 8 additions & 2 deletions packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Logger } from '../log';
import { InstantiationProgress } from './progress';
import { DuckDBBindings } from './bindings_interface';
import { DuckDBConnection } from './connection';
import { StatusCode, IsArrowBuffer } from '../status';
import { StatusCode, IsArrowBuffer, IsDuckDBWasmRetry } from '../status';
import { dropResponseBuffers, DuckDBRuntime, readString, callSRet, copyBuffer, DuckDBDataProtocol } from './runtime';
import { CSVInsertOptions, JSONInsertOptions, ArrowInsertOptions } from './insert_options';
import { ScriptTokens } from './tokens';
Expand Down Expand Up @@ -222,14 +222,20 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
return this.mod.ccall('duckdb_web_pending_query_cancel', 'boolean', ['number'], [conn]);
}
/** Fetch query results */
public fetchQueryResults(conn: number): Uint8Array {
public fetchQueryResults(conn: number): Uint8Array | null {
const [s, d, n] = callSRet(this.mod, 'duckdb_web_query_fetch_results', ['number'], [conn]);
if (IsDuckDBWasmRetry(s)) {
dropResponseBuffers(this.mod);
return null; // Retry
}

if (!IsArrowBuffer(s)) {
throw new Error("Unexpected StatusCode from duckdb_web_query_fetch_results (" + s + ") and with self reported error as" + readString(this.mod, d, n));
}
if (s !== StatusCode.SUCCESS) {
throw new Error(readString(this.mod, d, n));
}

const res = copyBuffer(this.mod, d, n);
dropResponseBuffers(this.mod);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dropResponseBuffers(this.mod);
dropResponseBuffers(this.mod);

I am not sure if this needs to be performed, or moved afterwards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it may need to be revisited though might be worth a dedicated PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking moving this below the return null might be it, but I need to check.
In any case, this impact only retry cases, that are currently not exposed in the wild, so might be ok-ish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want it in both "SUCCESS" cases since a buffer is instantiated, otherwise we would leak the (1-byte) retry buffer each time no?

return res;
Expand Down
2 changes: 1 addition & 1 deletion packages/duckdb-wasm/src/bindings/bindings_interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export interface DuckDBBindings {
startPendingQuery(conn: number, text: string, allowStreamResult: boolean): Uint8Array | null;
pollPendingQuery(conn: number): Uint8Array | null;
cancelPendingQuery(conn: number): boolean;
fetchQueryResults(conn: number): Uint8Array;
fetchQueryResults(conn: number): Uint8Array | null;
getTableNames(conn: number, text: string): string[];

createPrepared(conn: number, text: string): number;
Expand Down
6 changes: 4 additions & 2 deletions packages/duckdb-wasm/src/bindings/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ export class DuckDBConnection {
//Otherwise, reject with the error
reject(e);
}

}
});
}
Expand Down Expand Up @@ -125,7 +124,10 @@ export class ResultStreamIterator implements Iterable<Uint8Array> {
if (this._depleted) {
return { done: true, value: null };
}
const bufferI8 = this.bindings.fetchQueryResults(this.conn);
let bufferI8 = null;
do {
bufferI8 = this.bindings.fetchQueryResults(this.conn);
} while (bufferI8 == null);
this._depleted = bufferI8.length == 0;
return {
done: this._depleted,
Expand Down
4 changes: 2 additions & 2 deletions packages/duckdb-wasm/src/parallel/async_bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
}

/** Fetch query results */
public async fetchQueryResults(conn: ConnectionID): Promise<Uint8Array> {
const task = new WorkerTask<WorkerRequestType.FETCH_QUERY_RESULTS, ConnectionID, Uint8Array>(
public async fetchQueryResults(conn: ConnectionID): Promise<Uint8Array | null> {
const task = new WorkerTask<WorkerRequestType.FETCH_QUERY_RESULTS, ConnectionID, Uint8Array | null>(
WorkerRequestType.FETCH_QUERY_RESULTS,
conn,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export interface AsyncDuckDBBindings {
startPendingQuery(conn: number, text: string, allowStreamResult: boolean): Promise<Uint8Array | null>;
pollPendingQuery(conn: number): Promise<Uint8Array | null>;
cancelPendingQuery(conn: number): Promise<boolean>;
fetchQueryResults(conn: number): Promise<Uint8Array>;
fetchQueryResults(conn: number): Promise<Uint8Array | null>;

createPrepared(conn: number, text: string): Promise<number>;
closePrepared(conn: number, statement: number): Promise<void>;
Expand Down
10 changes: 7 additions & 3 deletions packages/duckdb-wasm/src/parallel/async_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export class AsyncResultStreamIterator implements AsyncIterable<Uint8Array> {
/** Reached end of stream? */
protected _depleted: boolean;
/** In-flight */
protected _inFlight: Promise<Uint8Array> | null;
protected _inFlight: Promise<Uint8Array | null> | null;

constructor(
protected readonly db: AsyncDuckDB,
Expand All @@ -135,17 +135,21 @@ export class AsyncResultStreamIterator implements AsyncIterable<Uint8Array> {
if (this._depleted) {
return { done: true, value: null };
}
let buffer: Uint8Array;
let buffer: Uint8Array | null = null;
if (this._inFlight != null) {
buffer = await this._inFlight;
this._inFlight = null;
} else {
}

while (buffer == null) {
buffer = await this.db.fetchQueryResults(this.conn);
}

this._depleted = buffer.length == 0;
if (!this._depleted) {
this._inFlight = this.db.fetchQueryResults(this.conn);
}

return {
done: this._depleted,
value: buffer,
Expand Down
3 changes: 2 additions & 1 deletion packages/duckdb-wasm/src/parallel/worker_dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,15 @@ export abstract class AsyncDuckDBDispatcher implements Logger {
}
case WorkerRequestType.FETCH_QUERY_RESULTS: {
const result = this._bindings.fetchQueryResults(request.data);
const transfer = result ? [result.buffer] : [];
this.postMessage(
{
messageId: this._nextMessageId++,
requestId: request.messageId,
type: WorkerResponseType.QUERY_RESULT_CHUNK,
data: result,
},
[result.buffer],
transfer,
);
break;
}
Expand Down
4 changes: 2 additions & 2 deletions packages/duckdb-wasm/src/parallel/worker_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export type WorkerResponseVariant =
| WorkerResponse<WorkerResponseType.PREPARED_STATEMENT_ID, number>
| WorkerResponse<WorkerResponseType.QUERY_PLAN, Uint8Array>
| WorkerResponse<WorkerResponseType.QUERY_RESULT, Uint8Array>
| WorkerResponse<WorkerResponseType.QUERY_RESULT_CHUNK, Uint8Array>
| WorkerResponse<WorkerResponseType.QUERY_RESULT_CHUNK, Uint8Array | null>
| WorkerResponse<WorkerResponseType.QUERY_RESULT_HEADER, Uint8Array>
| WorkerResponse<WorkerResponseType.QUERY_RESULT_HEADER_OR_NULL, Uint8Array | null>
| WorkerResponse<WorkerResponseType.SCRIPT_TOKENS, ScriptTokens>
Expand All @@ -180,7 +180,7 @@ export type WorkerTaskVariant =
| WorkerTask<WorkerRequestType.DROP_FILE, string, null>
| WorkerTask<WorkerRequestType.DROP_FILES, null, null>
| WorkerTask<WorkerRequestType.EXPORT_FILE_STATISTICS, string, FileStatistics>
| WorkerTask<WorkerRequestType.FETCH_QUERY_RESULTS, ConnectionID, Uint8Array>
| WorkerTask<WorkerRequestType.FETCH_QUERY_RESULTS, ConnectionID, Uint8Array | null>
| WorkerTask<WorkerRequestType.FLUSH_FILES, null, null>
| WorkerTask<WorkerRequestType.GET_FEATURE_FLAGS, null, number>
| WorkerTask<WorkerRequestType.GET_TABLE_NAMES, [number, string], string[]>
Expand Down
7 changes: 6 additions & 1 deletion packages/duckdb-wasm/src/status.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
export enum StatusCode {
SUCCESS = 0,
MAX_ARROW_ERROR = 255,
DUCKDB_WASM_RETRY = 256,
}

export function IsArrowBuffer(status: StatusCode): boolean {
return (status <= StatusCode.MAX_ARROW_ERROR);
return status <= StatusCode.MAX_ARROW_ERROR;
}

export function IsDuckDBWasmRetry(status: StatusCode): boolean {
return status === StatusCode.DUCKDB_WASM_RETRY;
}