diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index 3b22087..cb535eb 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -14,15 +14,14 @@ * under the License. */ -/* eslint-disable @typescript-eslint/no-non-null-assertion */ - +import { PollOptions, pollWithOverhead } from '../../rest'; import { TransactionAsyncApi } from '../transaction/transactionAsyncApi'; import { isTransactionDone, makeArrowRelations, } from '../transaction/transactionUtils'; import { - TransactionAsyncCompact, + TransactionAsync, TransactionAsyncPayload, TransactionAsyncResult, } from '../transaction/types'; @@ -61,8 +60,7 @@ export class ExecAsyncApi extends TransactionAsyncApi { inputs: QueryInput[] = [], readonly = true, tags: string[] = [], - interval = 1000, // 1 second - timeout = Number.POSITIVE_INFINITY, + options?: PollOptions, ) { const result = await this.execAsync( database, @@ -78,59 +76,38 @@ export class ExecAsyncApi extends TransactionAsyncApi { return result; } - return await this.pollTransaction(txnId, interval, timeout); + return await this.pollTransaction(txnId, options); } async pollTransaction( txnId: string, - interval = 1000, - timeout = Number.POSITIVE_INFINITY, - ) { - const startedAt = Date.now(); - - let transaction: TransactionAsyncCompact | undefined; - - await new Promise((resolve, reject) => { - const checkState = () => { - setTimeout(async () => { - try { - transaction = await this.getTransaction(txnId); - // eslint-disable-next-line no-empty - } catch {} - - if (transaction && isTransactionDone(transaction.state)) { - resolve(); - } else { - if (Date.now() - startedAt > timeout) { - reject( - new Error( - `Polling transaction timeout of ${timeout}ms has been exceeded.`, - ), - ); - } - - checkState(); - } - }, interval); + options?: PollOptions, + ): Promise { + const transaction = await pollWithOverhead(async () => { + const transaction = await this.getTransaction(txnId); + if (isTransactionDone(transaction.state)) { + return { + done: true, + result: transaction, + }; + } + + return { + done: false, }; - - checkState(); - }); + }, options); const data = await Promise.all([ this.getTransactionMetadata(txnId), this.getTransactionProblems(txnId), this.getTransactionResults(txnId), ]); - const results = await makeArrowRelations(data[2], data[0]); - const res: TransactionAsyncResult = { - transaction: transaction!, + return { + transaction, problems: data[1], - results, + results: makeArrowRelations(data[2], data[0]), }; - - return res; } async loadJson( diff --git a/src/api/transaction/transactionUtils.ts b/src/api/transaction/transactionUtils.ts index 7260e48..565745b 100644 --- a/src/api/transaction/transactionUtils.ts +++ b/src/api/transaction/transactionUtils.ts @@ -26,7 +26,7 @@ import { TransactionAsyncFile, TransactionAsyncResult, TransactionAsyncState, -} from '../transaction/types'; +} from './types'; export function makeLabeledAction( name: string, @@ -105,7 +105,7 @@ export async function readArrowFiles(files: TransactionAsyncFile[]) { return results; } -export async function makeArrowRelations( +export function makeArrowRelations( results: ArrowResult[], metadata?: MetadataInfo, ) { diff --git a/src/rest.ts b/src/rest.ts index 67e2233..73c56cb 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -33,6 +33,17 @@ export type RequestOptions = { onResponse?: (r: ApiResponse) => void; }; +export type PollOptions = { + overheadRate?: number; + maxInterval?: number; + timeout?: number; +}; + +export type PollingResult = { + done: boolean; + result?: T; +}; + function addDefaultHeaders(headers: RequestInit['headers'], url: string) { const sdkUserAgent = `rai-sdk-javascript/${VERSION}`; const defaultHeaders: RequestInit['headers'] = { @@ -150,3 +161,40 @@ function responseToInfo(response: Response, body: any) { return info; } + +export async function pollWithOverhead( + callback: () => PollingResult | PromiseLike>, + options?: PollOptions, +) { + const overheadRate = options?.overheadRate ?? 0.1; + const startTime = Date.now(); + const timeout = options?.timeout ?? Number.POSITIVE_INFINITY; + const maxInterval = options?.maxInterval ?? 120000; + return new Promise((resolve, reject) => { + const poll = (delay: number) => { + setTimeout(async () => { + try { + const pollingResult = await callback(); + if (pollingResult.done && pollingResult.result) { + resolve(pollingResult.result); + return; + } + } catch (error: any) { + reject(error); + return; + } + + const currentDelay = Date.now() - startTime; + if (currentDelay > timeout) { + reject( + new Error(`Polling timeout of ${timeout}ms has been exceeded.`), + ); + return; + } + poll(Math.min(maxInterval, currentDelay * overheadRate)); + }, delay); + }; + + poll(Math.min(maxInterval, (Date.now() - startTime) * overheadRate)); + }); +} diff --git a/src/testUtils.ts b/src/testUtils.ts index ae8d1bb..d0bedd0 100644 --- a/src/testUtils.ts +++ b/src/testUtils.ts @@ -270,11 +270,9 @@ function logifyClient(client: Client) { client.pollTransaction = async (...args) => { testLog(`polling transaction ${args[0]}`); - return await pollTransaction( - args[0], - args[1], - timeout ? Number(timeout) : 120000, - ); + return await pollTransaction(args[0], { + timeout: timeout ? Number(timeout) : 120000, + }); }; }