Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/include/duckdb/web/utils/wasm_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
namespace duckdb {
namespace web {

struct DuckDBWasmResultsWrapper;

struct WASMResponse {
/// The status code
double statusCode = 1;
Expand Down Expand Up @@ -35,6 +37,8 @@ class WASMResponseBuffer {
/// Store the arrow status.
/// Returns wheather the result was OK
bool Store(WASMResponse& response, arrow::Status status);
/// Store a DuckDBWasmResultsWrapper
void Store(WASMResponse& response, DuckDBWasmResultsWrapper& value);
/// Store a string
void Store(WASMResponse& response, std::string value);
/// Store a string view
Expand Down
16 changes: 15 additions & 1 deletion lib/include/duckdb/web/webdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ namespace web {

struct BufferingArrowIPCStreamDecoder;

struct DuckDBWasmResultsWrapper {
// Additional ResponseStatuses to be >= 256, and mirrored to packages/duckdb-wasm/src/status.ts
// Missing mapping result in a throw, but they should eventually align (it's fine if typescript side only has a
// subset)
enum ResponseStatus : uint32_t { ARROW_BUFFER = 0, MAX_ARROW_ERROR = 255 };
DuckDBWasmResultsWrapper(arrow::Result<std::shared_ptr<arrow::Buffer>> res,
ResponseStatus status = ResponseStatus::ARROW_BUFFER)
: arrow_buffer(res), status(status) {}
DuckDBWasmResultsWrapper(arrow::Status res, ResponseStatus status = ResponseStatus::ARROW_BUFFER)
: arrow_buffer(res), status(status) {}
arrow::Result<std::shared_ptr<arrow::Buffer>> arrow_buffer;
ResponseStatus status;
};

class WebDB {
public:
/// A connection
Expand Down Expand Up @@ -93,7 +107,7 @@ class WebDB {
/// Cancel a pending query
bool CancelPendingQuery();
/// Fetch a data chunk from a pending query
arrow::Result<std::shared_ptr<arrow::Buffer>> FetchQueryResults();
DuckDBWasmResultsWrapper FetchQueryResults();
/// Get table names
arrow::Result<std::string> GetTableNames(std::string_view text);

Expand Down
10 changes: 10 additions & 0 deletions lib/src/utils/wasm_response.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstdint>

#include "arrow/buffer.h"
#include "duckdb/web/webdb.h"

namespace duckdb {
namespace web {
Expand All @@ -26,6 +27,15 @@ bool WASMResponseBuffer::Store(WASMResponse& response, arrow::Status status) {
return true;
}

void WASMResponseBuffer::Store(WASMResponse& response, DuckDBWasmResultsWrapper& value) {
if (value.status == DuckDBWasmResultsWrapper::ResponseStatus::ARROW_BUFFER) {
Store(response, std::move(value.arrow_buffer));
} else {
Clear();
response.statusCode = value.status;
}
}

void WASMResponseBuffer::Store(WASMResponse& response, std::string value) {
result_str_ = std::move(value);
response.statusCode = 0;
Expand Down
6 changes: 3 additions & 3 deletions lib/src/webdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ bool WebDB::Connection::CancelPendingQuery() {
}
}

arrow::Result<std::shared_ptr<arrow::Buffer>> WebDB::Connection::FetchQueryResults() {
DuckDBWasmResultsWrapper WebDB::Connection::FetchQueryResults() {
try {
// Fetch data if a query is active
duckdb::unique_ptr<duckdb::DataChunk> chunk;
if (current_query_result_ == nullptr) {
return nullptr;
return DuckDBWasmResultsWrapper{nullptr};
}
// Fetch next result chunk
chunk = current_query_result_->Fetch();
Expand All @@ -248,7 +248,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> WebDB::Connection::FetchQueryResul
current_query_result_.reset();
current_schema_.reset();
current_schema_patched_.reset();
return nullptr;
return DuckDBWasmResultsWrapper{nullptr};
}

// Serialize the record batch
Expand Down
2 changes: 1 addition & 1 deletion lib/src/webdb_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ bool duckdb_web_pending_query_cancel(ConnectionHdl connHdl, const char* script)
void duckdb_web_query_fetch_results(WASMResponse* packed, ConnectionHdl connHdl) {
auto c = reinterpret_cast<WebDB::Connection*>(connHdl);
auto r = c->FetchQueryResults();
WASMResponseBuffer::Get().Store(*packed, std::move(r));
WASMResponseBuffer::Get().Store(*packed, r);
}
/// Get table names
void duckdb_web_get_tablenames(WASMResponse* packed, ConnectionHdl connHdl, const char* query) {
Expand Down
5 changes: 4 additions & 1 deletion packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Logger } from '../log';
import { InstantiationProgress } from './progress';
import { DuckDBBindings } from './bindings_interface';
import { DuckDBConnection } from './connection';
import { StatusCode } from '../status';
import { StatusCode, IsArrowBuffer } from '../status';
import { dropResponseBuffers, DuckDBRuntime, readString, callSRet, copyBuffer, DuckDBDataProtocol } from './runtime';
import { CSVInsertOptions, JSONInsertOptions, ArrowInsertOptions } from './insert_options';
import { ScriptTokens } from './tokens';
Expand Down Expand Up @@ -224,6 +224,9 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
/** Fetch query results */
public fetchQueryResults(conn: number): Uint8Array {
const [s, d, n] = callSRet(this.mod, 'duckdb_web_query_fetch_results', ['number'], [conn]);
if (!IsArrowBuffer(s)) {
throw new Error("Unexpected StatusCode from duckdb_web_query_fetch_results (" + s + ") and with self reported error as" + readString(this.mod, d, n));
}
if (s !== StatusCode.SUCCESS) {
throw new Error(readString(this.mod, d, n));
}
Expand Down
7 changes: 6 additions & 1 deletion packages/duckdb-wasm/src/status.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
export enum StatusCode {
SUCCESS = 0,
SUCCESS = 0,
MAX_ARROW_ERROR = 255,
}

export function IsArrowBuffer(status: StatusCode): boolean {
return (status <= StatusCode.MAX_ARROW_ERROR);
}