From 04c3f5efadee82a0f8b3ef80cbe2a805d34141ef Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Fri, 7 Apr 2023 16:15:36 +0200 Subject: [PATCH 01/12] RAI-11434 refactor polling transaction logic --- src/api/query/execAsyncApi.ts | 72 ++++++-------- src/rest.ts | 182 ++++++++++++++++++++++++++++++++++ 2 files changed, 213 insertions(+), 41 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index 3b22087..6fa1105 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -16,6 +16,7 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ +import { PollingPromise } from '../../rest'; import { TransactionAsyncApi } from '../transaction/transactionAsyncApi'; import { isTransactionDone, @@ -24,7 +25,6 @@ import { import { TransactionAsyncCompact, TransactionAsyncPayload, - TransactionAsyncResult, } from '../transaction/types'; import { CsvConfigSchema, CsvConfigSyntax, QueryInput } from './types'; import { makeQueryInput, schemaToRel, syntaxToRel } from './utils'; @@ -90,47 +90,37 @@ export class ExecAsyncApi extends TransactionAsyncApi { let transaction: TransactionAsyncCompact | undefined; - await new Promise((resolve, reject) => { - const checkState = () => { - setTimeout(async () => { - try { - transaction = await this.getTransaction(txnId); - // eslint-disable-next-line no-empty - } catch {} - - if (transaction && isTransactionDone(transaction.state)) { - resolve(); - } else { - if (Date.now() - startedAt > timeout) { - reject( - new Error( - `Polling transaction timeout of ${timeout}ms has been exceeded.`, - ), - ); - } - - checkState(); - } - }, interval); - }; - - checkState(); - }); - - const data = await Promise.all([ - this.getTransactionMetadata(txnId), - this.getTransactionProblems(txnId), - this.getTransactionResults(txnId), - ]); - const results = await makeArrowRelations(data[2], data[0]); - - const res: TransactionAsyncResult = { - transaction: transaction!, - problems: data[1], - results, - }; + return await new PollingPromise( + async resolve => { + try { + transaction = await this.getTransaction(txnId); + // eslint-disable-next-line no-empty + } catch {} - return res; + if (transaction && isTransactionDone(transaction.state)) { + resolve(); + } + }, + timeout, + interval, + startedAt, + ) + .then(() => + Promise.all([ + this.getTransactionMetadata(txnId), + this.getTransactionProblems(txnId), + this.getTransactionResults(txnId), + ]), + ) + .then(async data => ({ + results: await makeArrowRelations(data[2], data[0]), + problems: data[1], + })) + .then(({ results, problems }) => ({ + transaction: transaction!, + problems, + results, + })); } async loadJson( diff --git a/src/rest.ts b/src/rest.ts index 67e2233..2340899 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -150,3 +150,185 @@ function responseToInfo(response: Response, body: any) { return info; } + +enum PollingState { + PENDING, + FULFILLED, + REJECTED, +} + +export class PollingPromise implements Promise { + private state: PollingState = PollingState.PENDING; + private value: T | undefined = undefined; + private chain: { + onFulfilled: ((value: T) => any) | undefined | null; + onRejected: ((reason: any) => any) | undefined | null; + }[] = []; + constructor( + executor: ( + resolve: (value: T | PromiseLike) => void, + reject: (reason?: any) => void, + ) => void | PromiseLike, + public timeoutMs = Number.POSITIVE_INFINITY, + public intervalMs = 1000, + public startTimeMs = Date.now(), + public maxIntervalMs = 120000, + public overheadRate = 0.3, + ) { + const poll = async (newInterval: number) => { + await new Promise(res => setTimeout(res, newInterval)); + try { + await executor(this._resolve, this._reject); + } catch (error) { + this._reject(error); + } + + const currentDelay = Date.now() - this.startTimeMs; + + if (currentDelay > this.timeoutMs) { + this._reject('Polling timeout'); + } else if (this.state === PollingState.PENDING) { + poll(Math.min(this.maxIntervalMs, currentDelay * overheadRate)); + } + }; + + poll(this.intervalMs); + } + + private _resolve = (value: T | PromiseLike) => { + this.updateState(value, PollingState.FULFILLED); + }; + + private _reject = (reason: any) => { + this.updateState(reason, PollingState.REJECTED); + }; + + private isThenable = (value: any) => { + return ( + typeof value === 'object' && + value.then && + typeof value.then === 'function' + ); + }; + + private updateState = (value: any, state: PollingState) => { + setTimeout(() => { + /* + Process the promise if it is still in pending state. + An already rejected or resolved promise cannot be processed + */ + if (this.state !== PollingState.PENDING) { + return; + } + + // check if value is also a promise + if (this.isThenable(value)) { + return value.then(this._resolve, this._reject); + } + + this.value = value; + this.state = state; + + // execute chain if already attached + this.executeChain(); + }, 0); + }; + + private pushToChain(handlers: { + onFulfilled: ((value: T) => any) | undefined | null; + onRejected: ((reason: any) => any) | undefined | null; + }) { + this.chain.push(handlers); + this.executeChain(); + } + + private executeChain() { + // Don't execute chain if promise is not yet fulfilled or rejected + if (this.state === PollingState.PENDING) { + return null; + } + + // We have multiple handlers because add them for .finally block too + this.chain.forEach(({ onFulfilled, onRejected }) => { + if (this.state === PollingState.FULFILLED) { + return onFulfilled?.(this.value!); + } + return onRejected?.(this.value); + }); + // After processing the chain, reset to empty. + this.chain = []; + } + + readonly [Symbol.toStringTag]: string; + + then( + onFulfilled?: + | ((value: T) => PromiseLike | TResult1) + | undefined + | null, + onRejected?: + | ((reason: any) => PromiseLike | TResult2) + | undefined + | null, + ): Promise { + return new Promise((resolve, reject) => { + this.pushToChain({ + onFulfilled: value => { + // if no onFulfilled provided, resolve the value for the next promise chain + if (!onFulfilled) { + return resolve(value as any); + } + try { + return resolve(onFulfilled(value)); + } catch (error) { + return reject(error); + } + }, + onRejected: reason => { + // if no onRejected provided, reject the value for the next promise chain + if (!onRejected) { + return reject(reason); + } + try { + return resolve(onRejected(reason)); + } catch (error) { + return reject(error); + } + }, + }); + }); + } + + catch( + onRejected?: + | ((reason: any) => PromiseLike | TResult) + | undefined + | null, + ): Promise { + return this.then(null, onRejected); + } + + finally(onfinally?: (() => void) | undefined | null): Promise { + return new Promise((resolve, reject) => { + let val: any; + let wasRejected = false; + this.then( + value => { + wasRejected = false; + val = value; + return onfinally?.(); + }, + error => { + wasRejected = true; + val = error; + return onfinally?.(); + }, + ).then(() => { + if (!wasRejected) { + return resolve(val); + } + return reject(val); + }); + }); + } +} From d5dae5f51471572209c589dc6377ff27921d266f Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Fri, 7 Apr 2023 16:36:08 +0200 Subject: [PATCH 02/12] refactor --- src/api/query/execAsyncApi.ts | 44 +++++--- src/rest.ts | 182 ---------------------------------- 2 files changed, 28 insertions(+), 198 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index 6fa1105..ee92d66 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -16,7 +16,6 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ -import { PollingPromise } from '../../rest'; import { TransactionAsyncApi } from '../transaction/transactionAsyncApi'; import { isTransactionDone, @@ -87,24 +86,37 @@ export class ExecAsyncApi extends TransactionAsyncApi { timeout = Number.POSITIVE_INFINITY, ) { const startedAt = Date.now(); + const maxInterval = 60000; + const overheadRate = 0.3; let transaction: TransactionAsyncCompact | undefined; - return await new PollingPromise( - async resolve => { - try { - transaction = await this.getTransaction(txnId); - // eslint-disable-next-line no-empty - } catch {} - - if (transaction && isTransactionDone(transaction.state)) { - resolve(); - } - }, - timeout, - interval, - startedAt, - ) + return await new Promise((resolve, reject) => { + const poll = (newInterval: number) => { + setTimeout(async () => { + try { + transaction = await this.getTransaction(txnId); + // eslint-disable-next-line no-empty + } catch {} + + if (transaction && isTransactionDone(transaction.state)) { + resolve(); + } else { + const currentDelay = Date.now() - startedAt; + if (currentDelay > timeout) { + reject( + new Error( + `Polling transaction timeout of ${timeout}ms has been exceeded.`, + ), + ); + } + poll(Math.min(maxInterval, currentDelay * overheadRate)); + } + }, newInterval); + }; + + poll(interval); + }) .then(() => Promise.all([ this.getTransactionMetadata(txnId), diff --git a/src/rest.ts b/src/rest.ts index 2340899..67e2233 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -150,185 +150,3 @@ function responseToInfo(response: Response, body: any) { return info; } - -enum PollingState { - PENDING, - FULFILLED, - REJECTED, -} - -export class PollingPromise implements Promise { - private state: PollingState = PollingState.PENDING; - private value: T | undefined = undefined; - private chain: { - onFulfilled: ((value: T) => any) | undefined | null; - onRejected: ((reason: any) => any) | undefined | null; - }[] = []; - constructor( - executor: ( - resolve: (value: T | PromiseLike) => void, - reject: (reason?: any) => void, - ) => void | PromiseLike, - public timeoutMs = Number.POSITIVE_INFINITY, - public intervalMs = 1000, - public startTimeMs = Date.now(), - public maxIntervalMs = 120000, - public overheadRate = 0.3, - ) { - const poll = async (newInterval: number) => { - await new Promise(res => setTimeout(res, newInterval)); - try { - await executor(this._resolve, this._reject); - } catch (error) { - this._reject(error); - } - - const currentDelay = Date.now() - this.startTimeMs; - - if (currentDelay > this.timeoutMs) { - this._reject('Polling timeout'); - } else if (this.state === PollingState.PENDING) { - poll(Math.min(this.maxIntervalMs, currentDelay * overheadRate)); - } - }; - - poll(this.intervalMs); - } - - private _resolve = (value: T | PromiseLike) => { - this.updateState(value, PollingState.FULFILLED); - }; - - private _reject = (reason: any) => { - this.updateState(reason, PollingState.REJECTED); - }; - - private isThenable = (value: any) => { - return ( - typeof value === 'object' && - value.then && - typeof value.then === 'function' - ); - }; - - private updateState = (value: any, state: PollingState) => { - setTimeout(() => { - /* - Process the promise if it is still in pending state. - An already rejected or resolved promise cannot be processed - */ - if (this.state !== PollingState.PENDING) { - return; - } - - // check if value is also a promise - if (this.isThenable(value)) { - return value.then(this._resolve, this._reject); - } - - this.value = value; - this.state = state; - - // execute chain if already attached - this.executeChain(); - }, 0); - }; - - private pushToChain(handlers: { - onFulfilled: ((value: T) => any) | undefined | null; - onRejected: ((reason: any) => any) | undefined | null; - }) { - this.chain.push(handlers); - this.executeChain(); - } - - private executeChain() { - // Don't execute chain if promise is not yet fulfilled or rejected - if (this.state === PollingState.PENDING) { - return null; - } - - // We have multiple handlers because add them for .finally block too - this.chain.forEach(({ onFulfilled, onRejected }) => { - if (this.state === PollingState.FULFILLED) { - return onFulfilled?.(this.value!); - } - return onRejected?.(this.value); - }); - // After processing the chain, reset to empty. - this.chain = []; - } - - readonly [Symbol.toStringTag]: string; - - then( - onFulfilled?: - | ((value: T) => PromiseLike | TResult1) - | undefined - | null, - onRejected?: - | ((reason: any) => PromiseLike | TResult2) - | undefined - | null, - ): Promise { - return new Promise((resolve, reject) => { - this.pushToChain({ - onFulfilled: value => { - // if no onFulfilled provided, resolve the value for the next promise chain - if (!onFulfilled) { - return resolve(value as any); - } - try { - return resolve(onFulfilled(value)); - } catch (error) { - return reject(error); - } - }, - onRejected: reason => { - // if no onRejected provided, reject the value for the next promise chain - if (!onRejected) { - return reject(reason); - } - try { - return resolve(onRejected(reason)); - } catch (error) { - return reject(error); - } - }, - }); - }); - } - - catch( - onRejected?: - | ((reason: any) => PromiseLike | TResult) - | undefined - | null, - ): Promise { - return this.then(null, onRejected); - } - - finally(onfinally?: (() => void) | undefined | null): Promise { - return new Promise((resolve, reject) => { - let val: any; - let wasRejected = false; - this.then( - value => { - wasRejected = false; - val = value; - return onfinally?.(); - }, - error => { - wasRejected = true; - val = error; - return onfinally?.(); - }, - ).then(() => { - if (!wasRejected) { - return resolve(val); - } - return reject(val); - }); - }); - } -} From 64442f9916ce6a7f6295c50ca12a7643988dff2a Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Mon, 10 Apr 2023 12:22:34 +0200 Subject: [PATCH 03/12] match julia sdk --- src/api/query/execAsyncApi.ts | 61 +++++++++++++---------------------- src/api/query/types.ts | 5 +++ src/rest.ts | 46 ++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 38 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index ee92d66..7d77e22 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -16,6 +16,7 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ +import { pollWithOverhead } from '../../rest'; import { TransactionAsyncApi } from '../transaction/transactionAsyncApi'; import { isTransactionDone, @@ -25,7 +26,12 @@ import { TransactionAsyncCompact, TransactionAsyncPayload, } from '../transaction/types'; -import { CsvConfigSchema, CsvConfigSyntax, QueryInput } from './types'; +import { + CsvConfigSchema, + CsvConfigSyntax, + PollTransactionOptions, + QueryInput, +} from './types'; import { makeQueryInput, schemaToRel, syntaxToRel } from './utils'; export class ExecAsyncApi extends TransactionAsyncApi { @@ -60,9 +66,9 @@ export class ExecAsyncApi extends TransactionAsyncApi { inputs: QueryInput[] = [], readonly = true, tags: string[] = [], - interval = 1000, // 1 second timeout = Number.POSITIVE_INFINITY, ) { + const startTime = Date.now(); const result = await this.execAsync( database, engine, @@ -77,46 +83,25 @@ export class ExecAsyncApi extends TransactionAsyncApi { return result; } - return await this.pollTransaction(txnId, interval, timeout); + return await this.pollTransaction(txnId, { timeout, startTime }); } - async pollTransaction( - txnId: string, - interval = 1000, - timeout = Number.POSITIVE_INFINITY, - ) { - const startedAt = Date.now(); - const maxInterval = 60000; - const overheadRate = 0.3; - + async pollTransaction(txnId: string, options?: PollTransactionOptions) { + const startTime = options?.startTime ?? Date.now(); + const timeout = options?.timeout ?? Number.POSITIVE_INFINITY; let transaction: TransactionAsyncCompact | undefined; - return await new Promise((resolve, reject) => { - const poll = (newInterval: number) => { - setTimeout(async () => { - try { - transaction = await this.getTransaction(txnId); - // eslint-disable-next-line no-empty - } catch {} - - if (transaction && isTransactionDone(transaction.state)) { - resolve(); - } else { - const currentDelay = Date.now() - startedAt; - if (currentDelay > timeout) { - reject( - new Error( - `Polling transaction timeout of ${timeout}ms has been exceeded.`, - ), - ); - } - poll(Math.min(maxInterval, currentDelay * overheadRate)); - } - }, newInterval); - }; - - poll(interval); - }) + return await pollWithOverhead( + async () => { + transaction = await this.getTransaction(txnId); + + return transaction && isTransactionDone(transaction.state); + }, + { + startTime, + timeout, + }, + ) .then(() => Promise.all([ this.getTransactionMetadata(txnId), diff --git a/src/api/query/types.ts b/src/api/query/types.ts index ae4d861..8b57b69 100644 --- a/src/api/query/types.ts +++ b/src/api/query/types.ts @@ -34,3 +34,8 @@ export type CsvConfigSyntax = { export type CsvConfigSchema = { [colName: string]: string; }; + +export type PollTransactionOptions = { + timeout?: number; + startTime?: number; +}; diff --git a/src/rest.ts b/src/rest.ts index 67e2233..ea6588e 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -33,6 +33,13 @@ export type RequestOptions = { onResponse?: (r: ApiResponse) => void; }; +export type PollOptions = { + overheadRate?: number; + startTime?: number; + maxInterval?: number; + timeout?: number; +}; + function addDefaultHeaders(headers: RequestInit['headers'], url: string) { const sdkUserAgent = `rai-sdk-javascript/${VERSION}`; const defaultHeaders: RequestInit['headers'] = { @@ -150,3 +157,42 @@ function responseToInfo(response: Response, body: any) { return info; } + +export async function pollWithOverhead( + callback: () => boolean | PromiseLike, + { + overheadRate = 0.1, + startTime = Date.now(), + maxInterval = 120000, + timeout = Number.POSITIVE_INFINITY, + }: PollOptions, +) { + return new Promise((resolve, reject) => { + const poll = (delay: number) => { + console.log(delay); + setTimeout(async () => { + try { + const done = await callback(); + if (done) { + resolve(); + } else { + const currentDelay = Date.now() - startTime; + if (currentDelay > timeout) { + reject( + new Error( + `Polling transaction timeout of ${timeout}ms has been exceeded.`, + ), + ); + } + poll(Math.min(maxInterval, currentDelay * overheadRate)); + } + } catch (error: any) { + console.error(error); + reject(error); + } + }, delay); + }; + + poll(Math.min(maxInterval, Date.now() - startTime)); + }); +} From 0baa221a0b5abe22dd848e24d6f0459e292f8535 Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Mon, 10 Apr 2023 12:26:54 +0200 Subject: [PATCH 04/12] fix ts --- src/rest.ts | 2 -- src/testUtils.ts | 8 +++----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/rest.ts b/src/rest.ts index ea6588e..3994e78 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -169,7 +169,6 @@ export async function pollWithOverhead( ) { return new Promise((resolve, reject) => { const poll = (delay: number) => { - console.log(delay); setTimeout(async () => { try { const done = await callback(); @@ -187,7 +186,6 @@ export async function pollWithOverhead( poll(Math.min(maxInterval, currentDelay * overheadRate)); } } catch (error: any) { - console.error(error); reject(error); } }, delay); diff --git a/src/testUtils.ts b/src/testUtils.ts index ae8d1bb..d0bedd0 100644 --- a/src/testUtils.ts +++ b/src/testUtils.ts @@ -270,11 +270,9 @@ function logifyClient(client: Client) { client.pollTransaction = async (...args) => { testLog(`polling transaction ${args[0]}`); - return await pollTransaction( - args[0], - args[1], - timeout ? Number(timeout) : 120000, - ); + return await pollTransaction(args[0], { + timeout: timeout ? Number(timeout) : 120000, + }); }; } From ebbecc85b63c93633873c9bf2e7a1d932303f1ce Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Mon, 10 Apr 2023 12:28:55 +0200 Subject: [PATCH 05/12] refactor --- src/api/query/execAsyncApi.ts | 36 +++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index 7d77e22..330ab6b 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -25,6 +25,7 @@ import { import { TransactionAsyncCompact, TransactionAsyncPayload, + TransactionAsyncResult, } from '../transaction/types'; import { CsvConfigSchema, @@ -91,7 +92,7 @@ export class ExecAsyncApi extends TransactionAsyncApi { const timeout = options?.timeout ?? Number.POSITIVE_INFINITY; let transaction: TransactionAsyncCompact | undefined; - return await pollWithOverhead( + await pollWithOverhead( async () => { transaction = await this.getTransaction(txnId); @@ -101,23 +102,22 @@ export class ExecAsyncApi extends TransactionAsyncApi { startTime, timeout, }, - ) - .then(() => - Promise.all([ - this.getTransactionMetadata(txnId), - this.getTransactionProblems(txnId), - this.getTransactionResults(txnId), - ]), - ) - .then(async data => ({ - results: await makeArrowRelations(data[2], data[0]), - problems: data[1], - })) - .then(({ results, problems }) => ({ - transaction: transaction!, - problems, - results, - })); + ); + + const data = await Promise.all([ + this.getTransactionMetadata(txnId), + this.getTransactionProblems(txnId), + this.getTransactionResults(txnId), + ]); + const results = await makeArrowRelations(data[2], data[0]); + + const res: TransactionAsyncResult = { + transaction: transaction!, + problems: data[1], + results, + }; + + return res; } async loadJson( From 5837f93e8740816b8dffe3610fe65e1b570d0214 Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Thu, 4 May 2023 21:29:33 +0300 Subject: [PATCH 06/12] refactor --- src/api/query/execAsyncApi.ts | 54 ++++++++++++++--------------------- src/api/query/types.ts | 5 ---- src/rest.ts | 38 +++++++++++++----------- 3 files changed, 43 insertions(+), 54 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index 330ab6b..f793652 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -16,23 +16,17 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ -import { pollWithOverhead } from '../../rest'; +import { PollOptions, pollWithOverhead } from '../../rest'; import { TransactionAsyncApi } from '../transaction/transactionAsyncApi'; import { isTransactionDone, makeArrowRelations, } from '../transaction/transactionUtils'; import { - TransactionAsyncCompact, + TransactionAsync, TransactionAsyncPayload, - TransactionAsyncResult, } from '../transaction/types'; -import { - CsvConfigSchema, - CsvConfigSyntax, - PollTransactionOptions, - QueryInput, -} from './types'; +import { CsvConfigSchema, CsvConfigSyntax, QueryInput } from './types'; import { makeQueryInput, schemaToRel, syntaxToRel } from './utils'; export class ExecAsyncApi extends TransactionAsyncApi { @@ -87,37 +81,31 @@ export class ExecAsyncApi extends TransactionAsyncApi { return await this.pollTransaction(txnId, { timeout, startTime }); } - async pollTransaction(txnId: string, options?: PollTransactionOptions) { - const startTime = options?.startTime ?? Date.now(); - const timeout = options?.timeout ?? Number.POSITIVE_INFINITY; - let transaction: TransactionAsyncCompact | undefined; - - await pollWithOverhead( - async () => { - transaction = await this.getTransaction(txnId); - - return transaction && isTransactionDone(transaction.state); - }, - { - startTime, - timeout, - }, - ); + async pollTransaction(txnId: string, options?: PollOptions) { + const transaction = await pollWithOverhead(async () => { + try { + const transaction = await this.getTransaction(txnId); + return { + done: transaction && isTransactionDone(transaction.state), + result: transaction, + }; + // eslint-disable-next-line no-empty + } catch {} + + return { + done: false, + }; + }, options); const data = await Promise.all([ this.getTransactionMetadata(txnId), - this.getTransactionProblems(txnId), this.getTransactionResults(txnId), ]); - const results = await makeArrowRelations(data[2], data[0]); - const res: TransactionAsyncResult = { - transaction: transaction!, - problems: data[1], - results, + return { + transaction, + results: makeArrowRelations(data[1], data[0]), }; - - return res; } async loadJson( diff --git a/src/api/query/types.ts b/src/api/query/types.ts index 8b57b69..ae4d861 100644 --- a/src/api/query/types.ts +++ b/src/api/query/types.ts @@ -34,8 +34,3 @@ export type CsvConfigSyntax = { export type CsvConfigSchema = { [colName: string]: string; }; - -export type PollTransactionOptions = { - timeout?: number; - startTime?: number; -}; diff --git a/src/rest.ts b/src/rest.ts index 3994e78..03f0cd7 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -40,6 +40,15 @@ export type PollOptions = { timeout?: number; }; +export type PollingResult = + | { + done: boolean; + result: T; + } + | { + done: false; + }; + function addDefaultHeaders(headers: RequestInit['headers'], url: string) { const sdkUserAgent = `rai-sdk-javascript/${VERSION}`; const defaultHeaders: RequestInit['headers'] = { @@ -158,29 +167,26 @@ function responseToInfo(response: Response, body: any) { return info; } -export async function pollWithOverhead( - callback: () => boolean | PromiseLike, - { - overheadRate = 0.1, - startTime = Date.now(), - maxInterval = 120000, - timeout = Number.POSITIVE_INFINITY, - }: PollOptions, +export async function pollWithOverhead( + callback: () => PollingResult | PromiseLike>, + options?: PollOptions, ) { - return new Promise((resolve, reject) => { + const overheadRate = options?.overheadRate ?? 0.1; + const startTime = options?.startTime ?? Date.now(); + const timeout = options?.timeout ?? Number.POSITIVE_INFINITY; + const maxInterval = options?.maxInterval ?? 120000; + return new Promise((resolve, reject) => { const poll = (delay: number) => { setTimeout(async () => { try { - const done = await callback(); - if (done) { - resolve(); + const pollingResult = await callback(); + if (pollingResult.done) { + resolve(pollingResult.result); } else { const currentDelay = Date.now() - startTime; if (currentDelay > timeout) { reject( - new Error( - `Polling transaction timeout of ${timeout}ms has been exceeded.`, - ), + new Error(`Polling timeout of ${timeout}ms has been exceeded.`), ); } poll(Math.min(maxInterval, currentDelay * overheadRate)); @@ -191,6 +197,6 @@ export async function pollWithOverhead( }, delay); }; - poll(Math.min(maxInterval, Date.now() - startTime)); + poll(Math.min(maxInterval, (Date.now() - startTime) * overheadRate)); }); } From f55aeed40660c72416b9d7f80453958d195eb547 Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Thu, 4 May 2023 21:40:28 +0300 Subject: [PATCH 07/12] fix types --- src/api/query/execAsyncApi.ts | 10 ++++++++-- src/api/transaction/transactionUtils.ts | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index f793652..87d52a7 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -25,6 +25,7 @@ import { import { TransactionAsync, TransactionAsyncPayload, + TransactionAsyncResult, } from '../transaction/types'; import { CsvConfigSchema, CsvConfigSyntax, QueryInput } from './types'; import { makeQueryInput, schemaToRel, syntaxToRel } from './utils'; @@ -81,7 +82,10 @@ export class ExecAsyncApi extends TransactionAsyncApi { return await this.pollTransaction(txnId, { timeout, startTime }); } - async pollTransaction(txnId: string, options?: PollOptions) { + async pollTransaction( + txnId: string, + options?: PollOptions, + ): Promise { const transaction = await pollWithOverhead(async () => { try { const transaction = await this.getTransaction(txnId); @@ -99,12 +103,14 @@ export class ExecAsyncApi extends TransactionAsyncApi { const data = await Promise.all([ this.getTransactionMetadata(txnId), + this.getTransactionProblems(txnId), this.getTransactionResults(txnId), ]); return { transaction, - results: makeArrowRelations(data[1], data[0]), + problems: data[1], + results: makeArrowRelations(data[2], data[0]), }; } diff --git a/src/api/transaction/transactionUtils.ts b/src/api/transaction/transactionUtils.ts index 7260e48..565745b 100644 --- a/src/api/transaction/transactionUtils.ts +++ b/src/api/transaction/transactionUtils.ts @@ -26,7 +26,7 @@ import { TransactionAsyncFile, TransactionAsyncResult, TransactionAsyncState, -} from '../transaction/types'; +} from './types'; export function makeLabeledAction( name: string, @@ -105,7 +105,7 @@ export async function readArrowFiles(files: TransactionAsyncFile[]) { return results; } -export async function makeArrowRelations( +export function makeArrowRelations( results: ArrowResult[], metadata?: MetadataInfo, ) { From 00d531e540ba2b615965609a51e57059c2078a19 Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Thu, 4 May 2023 21:41:30 +0300 Subject: [PATCH 08/12] remove unused assertion --- src/api/query/execAsyncApi.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index 87d52a7..8304d9a 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -14,8 +14,6 @@ * under the License. */ -/* eslint-disable @typescript-eslint/no-non-null-assertion */ - import { PollOptions, pollWithOverhead } from '../../rest'; import { TransactionAsyncApi } from '../transaction/transactionAsyncApi'; import { From fc215adb5872d3821a674ad0acb9b31e51d5d4ec Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Mon, 8 May 2023 13:33:28 +0300 Subject: [PATCH 09/12] address feedback --- src/api/query/execAsyncApi.ts | 8 +++++-- src/rest.ts | 40 +++++++++++++++++++---------------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index 8304d9a..e7996d3 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -14,6 +14,7 @@ * under the License. */ +import { ApiError } from '../../errors'; import { PollOptions, pollWithOverhead } from '../../rest'; import { TransactionAsyncApi } from '../transaction/transactionAsyncApi'; import { @@ -91,8 +92,11 @@ export class ExecAsyncApi extends TransactionAsyncApi { done: transaction && isTransactionDone(transaction.state), result: transaction, }; - // eslint-disable-next-line no-empty - } catch {} + } catch (error: any) { + if (error instanceof ApiError && error.response.status < 500) { + throw error; + } + } return { done: false, diff --git a/src/rest.ts b/src/rest.ts index 03f0cd7..ef05465 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -38,16 +38,13 @@ export type PollOptions = { startTime?: number; maxInterval?: number; timeout?: number; + maxRetries?: number; }; -export type PollingResult = - | { - done: boolean; - result: T; - } - | { - done: false; - }; +export type PollingResult = { + done: boolean; + result?: T; +}; function addDefaultHeaders(headers: RequestInit['headers'], url: string) { const sdkUserAgent = `rai-sdk-javascript/${VERSION}`; @@ -175,25 +172,32 @@ export async function pollWithOverhead( const startTime = options?.startTime ?? Date.now(); const timeout = options?.timeout ?? Number.POSITIVE_INFINITY; const maxInterval = options?.maxInterval ?? 120000; + const maxRetry = options?.maxRetries ?? Number.POSITIVE_INFINITY; return new Promise((resolve, reject) => { + let tryNumber = 0; const poll = (delay: number) => { setTimeout(async () => { try { const pollingResult = await callback(); - if (pollingResult.done) { + if (pollingResult.done && pollingResult.result) { resolve(pollingResult.result); - } else { - const currentDelay = Date.now() - startTime; - if (currentDelay > timeout) { - reject( - new Error(`Polling timeout of ${timeout}ms has been exceeded.`), - ); - } - poll(Math.min(maxInterval, currentDelay * overheadRate)); + return; } } catch (error: any) { - reject(error); + if (++tryNumber >= maxRetry) { + reject(error); + return; + } + } + + const currentDelay = Date.now() - startTime; + if (currentDelay > timeout) { + reject( + new Error(`Polling timeout of ${timeout}ms has been exceeded.`), + ); + return; } + poll(Math.min(maxInterval, currentDelay * overheadRate)); }, delay); }; From ef25f7a15dabc2bbdf427d45795860c53585d357 Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Tue, 9 May 2023 16:33:00 +0300 Subject: [PATCH 10/12] remove try/catch --- src/api/query/execAsyncApi.ts | 11 +++-------- src/rest.ts | 18 ++++-------------- 2 files changed, 7 insertions(+), 22 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index e7996d3..c6dc3c8 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -14,7 +14,6 @@ * under the License. */ -import { ApiError } from '../../errors'; import { PollOptions, pollWithOverhead } from '../../rest'; import { TransactionAsyncApi } from '../transaction/transactionAsyncApi'; import { @@ -86,16 +85,12 @@ export class ExecAsyncApi extends TransactionAsyncApi { options?: PollOptions, ): Promise { const transaction = await pollWithOverhead(async () => { - try { - const transaction = await this.getTransaction(txnId); + const transaction = await this.getTransaction(txnId); + if (isTransactionDone(transaction.state)) { return { - done: transaction && isTransactionDone(transaction.state), + done: true, result: transaction, }; - } catch (error: any) { - if (error instanceof ApiError && error.response.status < 500) { - throw error; - } } return { diff --git a/src/rest.ts b/src/rest.ts index ef05465..d774312 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -38,7 +38,6 @@ export type PollOptions = { startTime?: number; maxInterval?: number; timeout?: number; - maxRetries?: number; }; export type PollingResult = { @@ -172,22 +171,13 @@ export async function pollWithOverhead( const startTime = options?.startTime ?? Date.now(); const timeout = options?.timeout ?? Number.POSITIVE_INFINITY; const maxInterval = options?.maxInterval ?? 120000; - const maxRetry = options?.maxRetries ?? Number.POSITIVE_INFINITY; return new Promise((resolve, reject) => { - let tryNumber = 0; const poll = (delay: number) => { setTimeout(async () => { - try { - const pollingResult = await callback(); - if (pollingResult.done && pollingResult.result) { - resolve(pollingResult.result); - return; - } - } catch (error: any) { - if (++tryNumber >= maxRetry) { - reject(error); - return; - } + const pollingResult = await callback(); + if (pollingResult.done && pollingResult.result) { + resolve(pollingResult.result); + return; } const currentDelay = Date.now() - startTime; From 41b94a4493de9bb2d0a2043aacf09f649ef5aaca Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Tue, 9 May 2023 16:43:01 +0300 Subject: [PATCH 11/12] address feedback --- src/api/query/execAsyncApi.ts | 5 ++--- src/rest.ts | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/api/query/execAsyncApi.ts b/src/api/query/execAsyncApi.ts index c6dc3c8..cb535eb 100644 --- a/src/api/query/execAsyncApi.ts +++ b/src/api/query/execAsyncApi.ts @@ -60,9 +60,8 @@ export class ExecAsyncApi extends TransactionAsyncApi { inputs: QueryInput[] = [], readonly = true, tags: string[] = [], - timeout = Number.POSITIVE_INFINITY, + options?: PollOptions, ) { - const startTime = Date.now(); const result = await this.execAsync( database, engine, @@ -77,7 +76,7 @@ export class ExecAsyncApi extends TransactionAsyncApi { return result; } - return await this.pollTransaction(txnId, { timeout, startTime }); + return await this.pollTransaction(txnId, options); } async pollTransaction( diff --git a/src/rest.ts b/src/rest.ts index d774312..1c83108 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -35,7 +35,6 @@ export type RequestOptions = { export type PollOptions = { overheadRate?: number; - startTime?: number; maxInterval?: number; timeout?: number; }; @@ -168,7 +167,7 @@ export async function pollWithOverhead( options?: PollOptions, ) { const overheadRate = options?.overheadRate ?? 0.1; - const startTime = options?.startTime ?? Date.now(); + const startTime = Date.now(); const timeout = options?.timeout ?? Number.POSITIVE_INFINITY; const maxInterval = options?.maxInterval ?? 120000; return new Promise((resolve, reject) => { From c2eb718de7595fc79493777ed1276a3e38ec3a5a Mon Sep 17 00:00:00 2001 From: Osama Alfakhouri Date: Tue, 9 May 2023 17:03:34 +0300 Subject: [PATCH 12/12] wrap with try/catch --- src/rest.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/rest.ts b/src/rest.ts index 1c83108..73c56cb 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -173,9 +173,14 @@ export async function pollWithOverhead( return new Promise((resolve, reject) => { const poll = (delay: number) => { setTimeout(async () => { - const pollingResult = await callback(); - if (pollingResult.done && pollingResult.result) { - resolve(pollingResult.result); + try { + const pollingResult = await callback(); + if (pollingResult.done && pollingResult.result) { + resolve(pollingResult.result); + return; + } + } catch (error: any) { + reject(error); return; }