Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions lib/contracts/OperationStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,33 @@ export interface OperationStatus {
* to `WaitUntilReadyOptions.callback` for the consumer to interpret.
*/
progressUpdateResponse?: unknown;

/**
* Number of rows modified by a DML statement (UPDATE / INSERT / DELETE /
* MERGE). `undefined`/`null` for SELECT and on backends/warehouses that do
* not surface the counter. Mirrors Thrift's
* `TGetOperationStatusResp.numModifiedRows`.
*/
numModifiedRows?: number | null;

/**
* Server-supplied user-facing message, when the backend exposes one. Mirrors
* Thrift's `TGetOperationStatusResp.displayMessage`. May contain SQL
* fragments or parameter values — treat as potentially sensitive.
*/
displayMessage?: string | null;

/**
* Server-supplied diagnostic detail (multi-line operator / stack context),
* when available. Mirrors Thrift's `TGetOperationStatusResp.diagnosticInfo`.
* For support surfaces, not user-facing.
*/
diagnosticInfo?: string | null;

/**
* Server-supplied JSON blob with extended error details, when available.
* Mirrors Thrift's `TGetOperationStatusResp.errorDetailsJson`. Pass-through
* string — callers parse with `JSON.parse` if they need structured access.
*/
errorDetailsJson?: string | null;
}
138 changes: 129 additions & 9 deletions lib/sea/SeaOperationBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,33 @@ export type SeaOperationStatement = SeaStatementHandle & Partial<SeaStatement>;
*/
type SeaFetchHandle = Pick<SeaStatement, 'fetchNextBatch' | 'schema'>;

/**
* The rich operation-status surface the kernel exposes on a terminal sync
* `Statement` (`numModifiedRows` / `displayMessage` / `diagnosticInfo` /
* `errorDetailsJson`). These accessors live ONLY on the blocking `Statement`
* (metadata path + sync `runAsync:false` path once `result()` resolves) — the
* async `AsyncStatement` / `AsyncResultHandle` do not expose them — so the
* reader below is best-effort and returns an empty record when the handle
* predates this surface or the operation never resolved to a `Statement`.
*/
type SeaStatusFieldsHandle = Pick<
SeaStatement,
'numModifiedRows' | 'displayMessage' | 'diagnosticInfo' | 'errorDetailsJson'
>;

/**
* The rich operation-status fields, as the kernel returns them (each `null`
* when the server didn't supply it — e.g. `numModifiedRows` is null for a
* SELECT). Carried onto the neutral `OperationStatus` and ultimately into the
* Thrift `TGetOperationStatusResp` so SEA reports parity with the Thrift path.
*/
interface SeaRichStatusFields {
numModifiedRows: number | null;
displayMessage: string | null;
diagnosticInfo: string | null;
errorDetailsJson: string | null;
}

/** Poll cadence for the async `status()` loop — matches the Thrift backend's 100ms. */
const STATUS_POLL_INTERVAL_MS = 100;

Expand Down Expand Up @@ -377,7 +404,9 @@ export default class SeaOperationBackend implements IOperationBackend {
if (this.asyncStatement) {
// Async query path: report the real kernel state (single
// GetStatementStatus RPC — no polling here; `waitUntilReady` owns the
// poll loop).
// poll loop). The rich status fields (`numModifiedRows` etc.) live on the
// terminal sync `Statement`, which the async path never produces, so they
// stay undefined here.
const state = statusStringToOperationState(await this.asyncStatement.status());
return { state, hasResultSet: true };
}
Expand All @@ -386,11 +415,16 @@ export default class SeaOperationBackend implements IOperationBackend {
// server-side; there is no per-status RPC to query while it runs. Report
// Running until `result()` has materialised the terminal statement, then
// Succeeded — mirroring the kernel's blocking-then-terminal lifecycle.
const state = this.fetchHandlePromise ? OperationState.Succeeded : OperationState.Running;
return { state, hasResultSet: true };
if (!this.fetchHandlePromise) {
return { state: OperationState.Running, hasResultSet: true };
}
// The blocking `result()` has resolved a terminal `Statement` — surface
// its rich status fields alongside the Succeeded state.
return { state: OperationState.Succeeded, hasResultSet: true, ...(await this.readRichStatusFields()) };
}
// Metadata path: the kernel statement is already terminal.
return { state: OperationState.Succeeded, hasResultSet: true };
// Metadata path: the kernel statement is already terminal — read its rich
// fields too (they are `null` for metadata results, by design).
return { state: OperationState.Succeeded, hasResultSet: true, ...(await this.readRichStatusFields()) };
}

public async waitUntilReady(options?: IOperationBackendWaitOptions): Promise<void> {
Expand All @@ -402,8 +436,11 @@ export default class SeaOperationBackend implements IOperationBackend {
}
// Metadata path: the kernel statement has already resolved, so there is
// nothing to poll. seaFinished fires the progress callback once with a
// synthesised completion tick, matching the Thrift path's final tick.
return seaFinished(this.lifecycle, options);
// synthesised completion tick, matching the Thrift path's final tick. The
// rich-field reader is passed lazily so it only runs when a callback is
// wired (metadata statements report all-null, but the surface stays
// consistent with the query paths).
return seaFinished(this.lifecycle, options, () => this.readRichStatusFields());
}

public async cancel(): Promise<Status> {
Expand All @@ -418,6 +455,85 @@ export default class SeaOperationBackend implements IOperationBackend {
// Internals.
// ---------------------------------------------------------------------------

/**
* Read the kernel's rich operation-status fields (`numModifiedRows` /
* `displayMessage` / `diagnosticInfo` / `errorDetailsJson`) off the terminal
* sync `Statement`. These accessors live only on the blocking `Statement`
* (metadata path, or the sync `runAsync:false` path once `result()` has
* resolved) — not on the async `AsyncStatement` / `AsyncResultHandle` — so:
*
* - on the async path we have no `Statement`, so we return all-null;
* - on the sync path we await `getFetchHandle()` first, which both drives
* `result()` to completion and stores the resolved `Statement` on
* `blockingStatement` (the handle that backs the accessors);
* - if the (older) binding predates these accessors we degrade to all-null
* rather than throwing — `getOperationStatus()` must never fail just
* because the rich fields are unavailable.
*
* Errors from the individual accessors are swallowed to null: a failed
* status-field read must not turn a successful operation's status query into
* a throw. The fields are best-effort metadata, not the operation outcome.
*/
private async readRichStatusFields(): Promise<SeaRichStatusFields> {
const empty: SeaRichStatusFields = {
numModifiedRows: null,
displayMessage: null,
diagnosticInfo: null,
errorDetailsJson: null,
};

// The async path never produces a terminal sync `Statement`, so there is
// nothing to read these off of.
if (this.asyncStatement && !this.cancellableExecution) {
return empty;
}

// Ensure the sync path's blocking `result()` has resolved and stored the
// terminal `Statement` on `blockingStatement` (no-op on the metadata path,
// where `blockingStatement` was set at construction).
if (this.cancellableExecution) {
try {
await this.getFetchHandle();
} catch {
// The operation failed/cancelled — its outcome surfaces through the
// wait/fetch path; status-field reads have nothing to add.
return empty;
}
}

const handle = this.blockingStatement as Partial<SeaStatusFieldsHandle> | undefined;
if (!handle || typeof handle.numModifiedRows !== 'function') {
// No resolved statement, or a binding that predates the rich-field
// accessors — degrade to all-null.
return empty;
}
const richHandle = handle as SeaStatusFieldsHandle;

const readOrNull = async <T>(read: () => Promise<T | null>): Promise<T | null> => {
try {
return await read();
} catch (err) {
this.context
.getLogger()
.log(
LogLevel.debug,
`SEA status-field read failed for operation ${this._id}; reporting null. Cause: ` +
`${err instanceof Error ? err.message : String(err)}`,
);
return null;
}
};

const [numModifiedRows, displayMessage, diagnosticInfo, errorDetailsJson] = await Promise.all([
readOrNull(() => richHandle.numModifiedRows()),
readOrNull(() => richHandle.displayMessage()),
readOrNull(() => richHandle.diagnosticInfo()),
readOrNull(() => richHandle.errorDetailsJson()),
]);

return { numModifiedRows, displayMessage, diagnosticInfo, errorDetailsJson };
}

/**
* Poll the kernel `AsyncStatement` to a terminal state on a fixed 100ms
* cadence, mirroring the Thrift backend's `waitUntilReady` loop. We poll
Expand Down Expand Up @@ -547,9 +663,13 @@ export default class SeaOperationBackend implements IOperationBackend {
// `getFetchHandle()` drives `result()` and memoises the resolved Statement
// (also stored on `blockingStatement` so `close()` can reach it).
await this.getFetchHandle();
// Single completion tick, matching the metadata path.
// Single completion tick, matching the metadata path — carrying the rich
// status fields (numModifiedRows etc.) read off the now-terminal Statement.
if (options?.callback) {
await Promise.resolve(options.callback({ state: OperationState.Succeeded, hasResultSet: true }));
const richFields = await this.readRichStatusFields();
await Promise.resolve(
options.callback({ state: OperationState.Succeeded, hasResultSet: true, ...richFields }),
);
}
}

Expand Down
23 changes: 16 additions & 7 deletions lib/sea/SeaOperationLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,21 @@ export async function seaClose(
}

/**
* Synthesize a neutral {@link OperationStatus} reporting the "finished"
* state. `IOperationBackend.waitUntilReady` is backend-neutral surface — its
* Synthesize an {@link OperationStatus} reporting the "finished" state.
* `IOperationBackend.waitUntilReady` is backend-neutral surface — its
* `callback` receives an {@link OperationStatus}, not a Thrift wire struct
* (the public Thrift-shaped `OperationStatusCallback` is adapted at the
* `DBSQLOperation` facade boundary). For M0 we report `Succeeded`. Richer
* fields (`numModifiedRows`, `progressUpdateResponse`, `errorMessage`) defer
* to M1 per the operation feature plan.
* `DBSQLOperation` facade boundary). We report `Succeeded`, and merge in any
* rich status fields (`numModifiedRows` / `displayMessage` / `diagnosticInfo`
* / `errorDetailsJson`) the backend resolved off the terminal kernel
* statement, so a `finished({callback})` consumer sees the same surface as a
* subsequent `getOperationStatus()` call.
*/
function synthesizeFinishedStatus(): OperationStatus {
function synthesizeFinishedStatus(extra?: Partial<OperationStatus>): OperationStatus {
return {
state: OperationState.Succeeded,
hasResultSet: true,
...extra,
};
}

Expand Down Expand Up @@ -227,13 +230,19 @@ export async function seaFinished(
progress?: boolean;
callback?: (status: OperationStatus) => unknown;
},
// Rich status fields the backend read off the terminal statement, merged into
// the synthesised completion tick so callback consumers see them. Lazy (a
// thunk) so the (potentially RPC-backed) read only happens when a callback is
// actually wired.
richFields?: () => Promise<Partial<OperationStatus>>,
): Promise<void> {
if (state.isCancelled || state.isClosed) {
return;
}

if (options?.callback) {
const response = synthesizeFinishedStatus();
const extra = richFields ? await richFields() : undefined;
const response = synthesizeFinishedStatus(extra);
// Await the callback in case it returns a promise — matches the
// Thrift code path at `lib/DBSQLOperation.ts:348-351`.
await Promise.resolve(options.callback(response));
Expand Down
27 changes: 23 additions & 4 deletions lib/thrift-backend/wireSynthesis.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Int64 from 'node-int64';
import {
TGetOperationStatusResp,
TGetResultSetMetadataResp,
Expand Down Expand Up @@ -77,10 +78,17 @@ function resultFormatToThrift(format: ResultFormat): TSparkRowSetType {
* `OperationStatus` DTO. Used by `DBSQLOperation.status()` when running
* against a non-Thrift backend (e.g. SEA) so the public API stays Thrift-shaped.
*
* Lossy by design: Thrift-only fields not carried by `OperationStatus`
* (`taskStatus`, `numModifiedRows`, `operationStarted`, `operationCompleted`,
* `displayMessage`, `diagnosticInfo`) are left undefined. Consumers that
* read those fields will see `undefined` on non-Thrift backends.
* Carries the rich status fields when the backend supplies them
* (`numModifiedRows`, `displayMessage`, `diagnosticInfo`, `errorDetailsJson`)
* — the SEA backend reads these off the terminal kernel statement, so DML
* operations report `numModifiedRows` at parity with the Thrift path.
* `numModifiedRows` is re-boxed as a Thrift `Int64` (`node-int64`) to match the
* wire shape the Thrift deserializer produces, so consumers can read it
* uniformly across backends.
*
* Still lossy for Thrift-only fields not carried by `OperationStatus`
* (`taskStatus`, `operationStarted`, `operationCompleted`), which are left
* undefined.
*/
export function synthesizeThriftStatus(status: OperationStatus): TGetOperationStatusResp {
return {
Expand All @@ -90,6 +98,17 @@ export function synthesizeThriftStatus(status: OperationStatus): TGetOperationSt
errorMessage: status.errorMessage,
hasResultSet: status.hasResultSet,
progressUpdateResponse: status.progressUpdateResponse as TGetOperationStatusResp['progressUpdateResponse'],
// Rich status fields: only present on backends that surface them (SEA on a
// terminal sync statement). `null` (server didn't supply) maps to
// `undefined` so the synthesized response matches the Thrift path, where an
// absent field is simply not set.
numModifiedRows:
status.numModifiedRows === undefined || status.numModifiedRows === null
? undefined
: new Int64(status.numModifiedRows),
displayMessage: status.displayMessage ?? undefined,
diagnosticInfo: status.diagnosticInfo ?? undefined,
errorDetailsJson: status.errorDetailsJson ?? undefined,
} as TGetOperationStatusResp;
}

Expand Down
Loading