Skip to content
65 changes: 21 additions & 44 deletions src/api/query/execAsyncApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
* under the License.
*/

/* eslint-disable @typescript-eslint/no-non-null-assertion */

import { PollOptions, pollWithOverhead } from '../../rest';
import { TransactionAsyncApi } from '../transaction/transactionAsyncApi';
import {
isTransactionDone,
makeArrowRelations,
} from '../transaction/transactionUtils';
import {
TransactionAsyncCompact,
TransactionAsync,
TransactionAsyncPayload,
TransactionAsyncResult,
} from '../transaction/types';
Expand Down Expand Up @@ -61,8 +60,7 @@ export class ExecAsyncApi extends TransactionAsyncApi {
inputs: QueryInput[] = [],
readonly = true,
tags: string[] = [],
interval = 1000, // 1 second
timeout = Number.POSITIVE_INFINITY,
options?: PollOptions,
) {
const result = await this.execAsync(
database,
Expand All @@ -78,59 +76,38 @@ export class ExecAsyncApi extends TransactionAsyncApi {
return result;
}

return await this.pollTransaction(txnId, interval, timeout);
return await this.pollTransaction(txnId, options);
}

async pollTransaction(
txnId: string,
interval = 1000,
timeout = Number.POSITIVE_INFINITY,
) {
const startedAt = Date.now();

let transaction: TransactionAsyncCompact | undefined;

await new Promise<void>((resolve, reject) => {
const checkState = () => {
setTimeout(async () => {
try {
transaction = await this.getTransaction(txnId);
// eslint-disable-next-line no-empty
} catch {}

if (transaction && isTransactionDone(transaction.state)) {
resolve();
} else {
if (Date.now() - startedAt > timeout) {
reject(
new Error(
`Polling transaction timeout of ${timeout}ms has been exceeded.`,
),
);
}

checkState();
}
}, interval);
options?: PollOptions,
): Promise<TransactionAsyncResult> {
const transaction = await pollWithOverhead<TransactionAsync>(async () => {
const transaction = await this.getTransaction(txnId);
if (isTransactionDone(transaction.state)) {
return {
done: true,
result: transaction,
};
}

return {
done: false,
};

checkState();
});
}, options);

const data = await Promise.all([
this.getTransactionMetadata(txnId),
this.getTransactionProblems(txnId),
this.getTransactionResults(txnId),
]);
const results = await makeArrowRelations(data[2], data[0]);

const res: TransactionAsyncResult = {
transaction: transaction!,
return {
transaction,
problems: data[1],
results,
results: makeArrowRelations(data[2], data[0]),
};

return res;
}

async loadJson(
Expand Down
4 changes: 2 additions & 2 deletions src/api/transaction/transactionUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
TransactionAsyncFile,
TransactionAsyncResult,
TransactionAsyncState,
} from '../transaction/types';
} from './types';

export function makeLabeledAction(
name: string,
Expand Down Expand Up @@ -105,7 +105,7 @@ export async function readArrowFiles(files: TransactionAsyncFile[]) {
return results;
}

export async function makeArrowRelations(
export function makeArrowRelations(
results: ArrowResult[],
metadata?: MetadataInfo,
) {
Expand Down
48 changes: 48 additions & 0 deletions src/rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ export type RequestOptions = {
onResponse?: (r: ApiResponse) => void;
};

export type PollOptions = {
overheadRate?: number;
maxInterval?: number;
timeout?: number;
};

export type PollingResult<T> = {
done: boolean;
result?: T;
};

function addDefaultHeaders(headers: RequestInit['headers'], url: string) {
const sdkUserAgent = `rai-sdk-javascript/${VERSION}`;
const defaultHeaders: RequestInit['headers'] = {
Expand Down Expand Up @@ -150,3 +161,40 @@ function responseToInfo(response: Response, body: any) {

return info;
}

export async function pollWithOverhead<T = void>(
callback: () => PollingResult<T> | PromiseLike<PollingResult<T>>,
options?: PollOptions,
) {
const overheadRate = options?.overheadRate ?? 0.1;
const startTime = Date.now();
const timeout = options?.timeout ?? Number.POSITIVE_INFINITY;
const maxInterval = options?.maxInterval ?? 120000;
return new Promise<T>((resolve, reject) => {
const poll = (delay: number) => {
setTimeout(async () => {
try {
const pollingResult = await callback();
if (pollingResult.done && pollingResult.result) {
resolve(pollingResult.result);
return;
}
} catch (error: any) {
reject(error);
return;
}

const currentDelay = Date.now() - startTime;
if (currentDelay > timeout) {
reject(
new Error(`Polling timeout of ${timeout}ms has been exceeded.`),
);
return;
}
poll(Math.min(maxInterval, currentDelay * overheadRate));
}, delay);
};

poll(Math.min(maxInterval, (Date.now() - startTime) * overheadRate));
});
}
8 changes: 3 additions & 5 deletions src/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,9 @@ function logifyClient(client: Client) {
client.pollTransaction = async (...args) => {
testLog(`polling transaction ${args[0]}`);

return await pollTransaction(
args[0],
args[1],
timeout ? Number(timeout) : 120000,
);
return await pollTransaction(args[0], {
timeout: timeout ? Number(timeout) : 120000,
});
};
}

Expand Down