Skip to content

Commit

Permalink
feat: Smart and Semi-Fast shutdown (#8411)
Browse files Browse the repository at this point in the history
When CUBEJS_GRACEFUL_SHUTDOWN=<nsecs> is enabled, makes
Cube use "fast" shutdown upon SIGINT and "semifast" shutdown upon
SIGTERM.
  • Loading branch information
srh committed Jul 24, 2024
1 parent 63262d8 commit 0bc8e6f
Show file tree
Hide file tree
Showing 18 changed files with 600 additions and 44 deletions.
5 changes: 3 additions & 2 deletions packages/cubejs-api-gateway/src/sql-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
Request as NativeRequest,
LoadRequestMeta,
} from '@cubejs-backend/native';
import type { ShutdownMode } from '@cubejs-backend/native';
import { displayCLIWarning, getEnv } from '@cubejs-backend/shared';

import * as crypto from 'crypto';
Expand Down Expand Up @@ -347,7 +348,7 @@ export class SQLServer {
// @todo Implement
}

public async shutdown(): Promise<void> {
await shutdownInterface(this.sqlInterfaceInstance!);
public async shutdown(mode: ShutdownMode): Promise<void> {
await shutdownInterface(this.sqlInterfaceInstance!, mode);
}
}
6 changes: 4 additions & 2 deletions packages/cubejs-backend-native/js/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,12 @@ export const registerInterface = async (options: SQLInterfaceOptions): Promise<S
});
};

export const shutdownInterface = async (instance: SqlInterfaceInstance): Promise<void> => {
export type ShutdownMode = 'fast' | 'semifast' | 'smart';

export const shutdownInterface = async (instance: SqlInterfaceInstance, shutdownMode: ShutdownMode): Promise<void> => {
const native = loadNative();

await native.shutdownInterface(instance);
await native.shutdownInterface(instance, shutdownMode);
};

export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string, stream: any, securityContext?: any): Promise<void> => {
Expand Down
14 changes: 13 additions & 1 deletion packages/cubejs-backend-native/src/node_export.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use cubesql::compile::{convert_sql_to_cube_query, get_df_batches};
use cubesql::config::processing_loop::ShutdownMode;
use cubesql::config::ConfigObj;
use cubesql::sql::{DatabaseProtocol, SessionManager};
use cubesql::transport::TransportService;
Expand Down Expand Up @@ -123,6 +124,17 @@ fn register_interface<C: NodeConfiguration>(mut cx: FunctionContext) -> JsResult

fn shutdown_interface(mut cx: FunctionContext) -> JsResult<JsPromise> {
let interface = cx.argument::<JsBox<SQLInterface>>(0)?;
let js_shutdown_mode = cx.argument::<JsString>(1)?;
let shutdown_mode = match js_shutdown_mode.value(&mut cx).as_str() {
"fast" => ShutdownMode::Fast,
"semifast" => ShutdownMode::SemiFast,
"smart" => ShutdownMode::Smart,
_ => {
return cx.throw_range_error::<&str, Handle<JsPromise>>(
"ShutdownMode param must be 'fast', 'semifast', or 'smart'",
);
}
};

let (deferred, promise) = cx.promise();
let channel = cx.channel();
Expand All @@ -131,7 +143,7 @@ fn shutdown_interface(mut cx: FunctionContext) -> JsResult<JsPromise> {
let runtime = tokio_runtime_node(&mut cx)?;

runtime.spawn(async move {
match services.stop_processing_loops().await {
match services.stop_processing_loops(shutdown_mode).await {
Ok(_) => {
if let Err(err) = services.await_processing_loops().await {
log::error!("Error during awaiting on shutdown: {}", err)
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-backend-native/test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ const meta_fixture = require('./meta');
console.log('SIGINT signal');

try {
await native.shutdownInterface(server);
await native.shutdownInterface(server, 'fast');
} catch (e) {
console.log(e);
} finally {
Expand Down
4 changes: 2 additions & 2 deletions packages/cubejs-backend-native/test/sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ describe('SQLInterface', () => {

await connection.end();
} finally {
await native.shutdownInterface(instance);
await native.shutdownInterface(instance, 'fast');
}
});

Expand Down Expand Up @@ -346,7 +346,7 @@ describe('SQLInterface', () => {

expect(rows).toBe(100000);

await native.shutdownInterface(instance);
await native.shutdownInterface(instance, 'fast');
} else {
expect(process.env.CUBESQL_STREAM_MODE).toBeFalsy();
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export class CubejsServer {

if (this.sqlServer) {
locks.push(
this.sqlServer.shutdown()
this.sqlServer.shutdown(graceful && (signal === 'SIGTERM') ? 'semifast' : 'fast')
);
}

Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server/src/server/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ export class ServerContainer {
process.exit(1);
}
} else {
console.log(`Recevied ${signal} signal, terminating with process exit`);
process.exit(0);
}
});
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-testing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
"smoke:crate:snapshot": "jest --verbose --updateSnapshot -i dist/test/smoke-crate.test.js",
"smoke:firebolt": "jest --verbose -i dist/test/smoke-firebolt.test.js",
"smoke:firebolt:snapshot": "jest --updateSnapshot --verbose -i dist/test/smoke-firebolt.test.js",
"smoke:graceful-shutdown": "jest --verbose -i dist/test/smoke-graceful-shutdown.test.js",
"smoke:graceful-shutdown:snapshot": "jest --updateSnapshot --verbose -i dist/test/smoke-graceful-shutdown.test.js",
"smoke:lambda": "jest --verbose -i dist/test/smoke-lambda.test.js",
"smoke:lambda:snapshot": "jest --updateSnapshot --verbose -i dist/test/smoke-lambda.test.js",
"smoke:materialize": "jest --verbose -i dist/test/smoke-materialize.test.js",
Expand Down
55 changes: 54 additions & 1 deletion packages/cubejs-testing/src/birdbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,17 @@ function prepareTestData(type: DriverType, schemas?: Schemas) {
}
}

// Some logic to kill Cube in stop is more precise if we know killCube is only used to send signals
// that get the process terminated.
type KillCubeSignal = 'SIGINT' | 'SIGTERM';

/**
* Birdbox object interface.
*/
export interface BirdBox {
stop: () => Promise<void>;
killCube: (signal: KillCubeSignal) => void;
onCubeExit: () => Promise<number | null>;
stdout: internal.Readable | null;
configuration: {
playgroundUrl: string;
Expand All @@ -246,11 +252,21 @@ export async function startBirdBoxFromContainer(
if (process.env.TEST_CUBE_HOST) {
const host = process.env.TEST_CUBE_HOST || 'localhost';
const port = process.env.TEST_CUBE_PORT || '8888';
const pid = process.env.TEST_CUBE_PID ? Number(process.env.TEST_CUBE_PID) : null;

return {
stop: async () => {
process.stdout.write('[Birdbox] Closed\n');
},
killCube: (signal: KillCubeSignal) => {
if (pid !== null) {
process.kill(pid, signal);
} else {
process.stdout.write(`[Birdbox] Cannot kill Cube instance running in TEST_CUBE_HOST mode without TEST_CUBE_PID defined\n`);
throw new Error('Attempted to use killCube while running with TEST_CUBE_HOST');
}
},
onCubeExit: (): Promise<number | null> => Promise.reject(new Error('onCubeExit not implemented')), // TODO: Implement
stdout: null,
configuration: {
playgroundUrl: `http://${host}:${port}`,
Expand Down Expand Up @@ -409,6 +425,15 @@ export async function startBirdBoxFromContainer(
process.stdout.write('[Birdbox] Closed\n');
}
},
killCube: (signal: KillCubeSignal) => {
process.stdout.write(`[Birdbox] killCube (with signal ${signal}) not implemented for containers\n`);
throw new Error('killCube not implemented for containers');
},
onCubeExit: (): Promise<number | null> => {
const _ = 0;
return Promise.reject(new Error('onCubeExit not implemented for containers'));
// TODO: Implement.
},
configuration: {
playgroundUrl: `http://${host}:${playgroundPort}`,
apiUrl: `http://${host}:${port}/cubejs-api/v1`,
Expand Down Expand Up @@ -559,6 +584,11 @@ export async function startBirdBoxFromCli(
...options.env,
};

let exitResolve: (code: number | null) => void;
const exitPromise = new Promise<number | null>((res, _rej) => {
exitResolve = res;
});

try {
cli = spawn(
options.useCubejsServerBinary
Expand Down Expand Up @@ -589,13 +619,18 @@ export async function startBirdBoxFromCli(
process.stdout.write(msg);
});
}
cli.on('exit', (code, signal) => {
process.stdout.write(`[Birdbox] Child process '${cli.pid}' exited with 'exit' event code ${code}, signal ${signal}\n`);
exitResolve(code);
});
await pausePromise(10 * 1000);
} catch (e) {
process.stdout.write(`Error spawning cube: ${e}\n`);
// @ts-ignore
db.stop();
}

let sentKillSignal = false;
return {
// @ts-expect-error
stdout: cli.stdout,
Expand All @@ -611,12 +646,30 @@ export async function startBirdBoxFromCli(
process.stdout.write('[Birdbox] Done with DB\n');
}
if (cli.pid) {
process.kill(-cli.pid, 'SIGINT');
process.stdout.write(`[Birdbox] Killing process group '${cli.pid}'\n`);
// Here, normally, we kill the process group by passing -cli.pid (a negative value), but
// with killCube we just kill the main process, and then can't kill any process group --
// maybe that test has poor cleanup actions.
try {
process.kill(-cli.pid, 'SIGINT');
} catch (error) {
if (!sentKillSignal) {
throw error;
}
}
}
if (options.log === Log.PIPE) {
process.stdout.write('[Birdbox] Closed\n');
}
},
killCube: (signal: KillCubeSignal) => {
process.stdout.write(`[Birdbox] Killing Cube (pid = '${cli.pid}') with signal ${signal}\n`);
if (cli.pid) {
process.kill(cli.pid, signal);
sentKillSignal = true;
}
},
onCubeExit: (): Promise<number | null> => exitPromise,
configuration: {
playgroundUrl: 'http://127.0.0.1:4000',
apiUrl: 'http://127.0.0.1:4000/cubejs-api/v1',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`graceful shutdown PgClient Graceful Shutdown Finishing Transaction: sql_orders 1`] = `
Array [
Object {
"cn": "2",
"status": "processed",
},
Object {
"cn": "2",
"status": "new",
},
Object {
"cn": "1",
"status": "shipped",
},
]
`;

exports[`graceful shutdown PgClient Graceful Shutdown SIGINT: sql_orders 1`] = `
Array [
Object {
"cn": "2",
"status": "processed",
},
Object {
"cn": "2",
"status": "new",
},
Object {
"cn": "1",
"status": "shipped",
},
]
`;

exports[`graceful shutdown PgClient Graceful Shutdown SIGTERM: sql_orders 1`] = `
Array [
Object {
"cn": "2",
"status": "processed",
},
Object {
"cn": "2",
"status": "new",
},
Object {
"cn": "1",
"status": "shipped",
},
]
`;
Loading

0 comments on commit 0bc8e6f

Please sign in to comment.