Skip to content

Commit

Permalink
feat: introduce graceful shutdown (#1683)
Browse files Browse the repository at this point in the history
* feat: Introduce Gracefull shutdown

* feat(server): Ability to reload configuration

* feat(server): Support reloading configuration with devMode changes

* feat(server): Graceful http shutdown

* feat: Close WS server before http, because http handle ws

* feat: Introduce CUBEJS_GRACEFUL_SHUTDOWN

* feat(shared): Introduce retryWithTimeout, withTimeout

* feat(shared): createCancelableInterval - handle too fast intervals
  • Loading branch information
ovr committed Jan 11, 2021
1 parent c3e97b5 commit 118232f
Show file tree
Hide file tree
Showing 18 changed files with 869 additions and 175 deletions.
6 changes: 5 additions & 1 deletion packages/cubejs-api-gateway/src/LocalSubscriptionStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ interface LocalSubscriptionStoreOptions {
}

export class LocalSubscriptionStore {
protected readonly connections = {};
protected connections = {};

protected readonly hearBeatInterval: number;

Expand Down Expand Up @@ -64,4 +64,8 @@ export class LocalSubscriptionStore {

return this.connections[connectionId];
}

public clear() {
this.connections = {};
}
}
4 changes: 4 additions & 0 deletions packages/cubejs-api-gateway/src/SubscriptionServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,8 @@ export class SubscriptionServer {
public async disconnect(connectionId: string) {
await this.subscriptionStore.cleanupSubscriptions(connectionId);
}

public clear() {
this.subscriptionStore.clear();
}
}
2 changes: 2 additions & 0 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ const variables = {
.asBoolStrict(),
refreshTimer: () => process.env.CUBEJS_SCHEDULED_REFRESH_TIMER
&& asBoolOrTime(process.env.CUBEJS_SCHEDULED_REFRESH_TIMER, 'CUBEJS_SCHEDULED_REFRESH_TIMER'),
gracefulShutdown: () => get('CUBEJS_GRACEFUL_SHUTDOWN')
.asIntPositive(),
scheduledRefresh: () => get('CUBEJS_SCHEDULED_REFRESH')
.asBool(),
dockerImageVersion: () => get('CUBEJS_DOCKER_IMAGE_VERSION')
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-backend-shared/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export { getEnv, isDockerImage } from './env';
export * from './package';
export * from './track';
export * from './errors';
export * from './promises';
184 changes: 184 additions & 0 deletions packages/cubejs-backend-shared/src/promises.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
export interface CancelablePromise<T> extends Promise<T> {
cancel: (waitExecution?: boolean) => Promise<any>;
}

export function pausePromise(ms: number): CancelablePromise<void> {
let cancel: Function = () => {
//
};

const promise: any = new Promise((resolve) => {
cancel = resolve;

setTimeout(resolve, ms);
});
promise.cancel = cancel;

return promise;
}

class CancelToken {
protected readonly deferred: (() => Promise<void>|void)[] = [];

protected readonly withQueue: (CancelablePromise<void>)[] = [];

protected canceled = false;

public async cancel(): Promise<void> {
if (this.canceled) {
throw new Error('CancelToken was already canceled');
}

this.canceled = true;

if (this.deferred.length) {
await Promise.all(this.deferred.map(async (queued) => queued()));
}

if (this.withQueue.length) {
// eslint-disable-next-line no-restricted-syntax
for (const queued of this.withQueue) {
await queued.cancel(false);
}
}
}

public defer(fn: () => Promise<void>|void): void {
this.deferred.push(fn);
}

public async with(fn: CancelablePromise<void>) {
this.withQueue.push(fn);

return fn;
}

public isCanceled() {
return this.canceled;
}
}

export function createCancelablePromise<T>(
fn: (cancel: CancelToken) => Promise<T>,
): CancelablePromise<T> {
const token = new CancelToken();

const promise: any = fn(token);
promise.cancel = async (waitExecution: boolean = true) => {
const locks: Promise<any>[] = [
token.cancel(),
];

if (waitExecution) {
locks.push(promise);
}

return Promise.all(locks);
};

return promise;
}

export interface CancelableInterval {
cancel: (waitExecution?: boolean) => Promise<void>,
}

/**
* It's helps to create an interval that can be canceled with awaiting latest execution
*/
export function createCancelableInterval<T>(
fn: (token: CancelToken) => Promise<T>,
interval: number,
): CancelableInterval {
let execution: CancelablePromise<T>|null = null;

const timeout = setInterval(
async () => {
if (execution) {
process.emitWarning(
'Execution of previous interval was not finished, new execution will be skipped',
'UnexpectedBehaviour'
);

return;
}

execution = createCancelablePromise(fn);

await execution;

execution = null;
},
interval,
);

return {
cancel: async (waitExecution: boolean = true) => {
clearInterval(timeout);

if (execution) {
await execution.cancel(waitExecution);
}
}
};
}

interface RetryWithTimeoutOptions {
timeout: number,
intervalPause: (iteration: number) => number,
}

export const withTimeout = <T>(
fn: CancelablePromise<T>,
timeout: number,
): Promise<T> => {
let timer: NodeJS.Timeout|null = null;

return Promise.race<any>([
fn,
new Promise((resolve, reject) => {
timer = setTimeout(async () => {
await fn.cancel(false);

reject(new Error(`Timeout reached after ${timeout}ms`));
}, timeout);

fn.then(resolve).catch(reject);
})
]).then((v) => {
if (timer) {
clearTimeout(timer);
}

return v;
}, (err) => {
if (timer) {
clearTimeout(timer);
}

throw err;
});
};

export const retryWithTimeout = <T>(
fn: (token: CancelToken) => Promise<T>,
{ timeout, intervalPause }: RetryWithTimeoutOptions,
) => withTimeout(
createCancelablePromise<T|null>(async (token) => {
let i = 0;

while (!token.isCanceled()) {
i++;

const result = await fn(token);
if (result) {
return result;
}

await token.with(pausePromise(intervalPause(i)));
}

return null;
}),
timeout
);

0 comments on commit 118232f

Please sign in to comment.