From 35c140cac864b5b588fa88e90fec3d8b7de6acda Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sun, 28 Feb 2021 18:52:32 +0300 Subject: [PATCH] feat: Use single instance for Cube Store handler (#2229) --- packages/cubejs-cubestore-driver/index.js | 3 +- .../src/CubeStoreDevDriver.ts | 26 ++---- packages/cubejs-cubestore-driver/src/index.ts | 2 +- .../cubejs-cubestore-driver/src/rexport.ts | 1 + packages/cubejs-cubestore-driver/src/utils.ts | 9 -- .../cubejs-server-core/src/core/server.ts | 25 ++++-- packages/cubejs-server-core/src/core/types.ts | 4 +- rust/js-wrapper/cubestore-dev.ts | 4 +- rust/js-wrapper/download.ts | 28 +++++-- rust/js-wrapper/process.ts | 82 +++++++++++-------- 10 files changed, 102 insertions(+), 82 deletions(-) create mode 100644 packages/cubejs-cubestore-driver/src/rexport.ts delete mode 100644 packages/cubejs-cubestore-driver/src/utils.ts diff --git a/packages/cubejs-cubestore-driver/index.js b/packages/cubejs-cubestore-driver/index.js index 54b233f3e515..843734718594 100644 --- a/packages/cubejs-cubestore-driver/index.js +++ b/packages/cubejs-cubestore-driver/index.js @@ -1,6 +1,6 @@ const { CubeStoreDriver } = require('./dist/src/CubeStoreDriver'); const { CubeStoreDevDriver } = require('./dist/src/CubeStoreDevDriver'); -const { isCubeStoreSupported } = require('./dist/src/utils'); +const { isCubeStoreSupported, CubeStoreHandler } = require('./dist/src/rexport'); /** * After 5 years working with TypeScript, now I know @@ -14,3 +14,4 @@ module.exports = CubeStoreDriver; */ module.exports.CubeStoreDevDriver = CubeStoreDevDriver; module.exports.isCubeStoreSupported = isCubeStoreSupported; +module.exports.CubeStoreHandler = CubeStoreHandler; diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreDevDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreDevDriver.ts index 2824df65b853..fa06d918c963 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreDevDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreDevDriver.ts @@ -1,14 +1,14 @@ -import { CubeStoreHandler, startCubeStoreHandler } from '@cubejs-backend/cubestore'; +import { CubeStoreHandler } from '@cubejs-backend/cubestore'; import { CubeStoreDriver } from './CubeStoreDriver'; import { ConnectionConfig } from './types'; import { AsyncConnection } from './connection'; export class CubeStoreDevDriver extends CubeStoreDriver { - // Let's use Promise as Mutex to protect multiple starting of Cube Store - protected cubeStoreHandler: Promise|null = null; - - public constructor(config?: Partial) { + public constructor( + protected readonly cubeStoreHandler: CubeStoreHandler, + config?: Partial + ) { super({ ...config, // @todo Make random port selection when 13306 is already used? @@ -17,21 +17,7 @@ export class CubeStoreDevDriver extends CubeStoreDriver { } protected async acquireCubeStore() { - if (!this.cubeStoreHandler) { - this.cubeStoreHandler = startCubeStoreHandler({ - stdout: (data) => { - console.log(data.toString().trim()); - }, - stderr: (data) => { - console.log(data.toString().trim()); - }, - onRestart: (code) => this.logger('Cube Store Restarting', { - warning: `Instance exit with ${code}, restarting`, - }), - }); - } - - await (await this.cubeStoreHandler).acquire(); + return this.cubeStoreHandler.acquire(); } public async withConnection(fn: (connection: AsyncConnection) => Promise) { diff --git a/packages/cubejs-cubestore-driver/src/index.ts b/packages/cubejs-cubestore-driver/src/index.ts index a402174fa28d..77fdd6ac2862 100644 --- a/packages/cubejs-cubestore-driver/src/index.ts +++ b/packages/cubejs-cubestore-driver/src/index.ts @@ -1,4 +1,4 @@ export * from './CubeStoreQuery'; export * from './CubeStoreDriver'; export * from './CubeStoreDevDriver'; -export * from './utils'; +export * from './rexport'; diff --git a/packages/cubejs-cubestore-driver/src/rexport.ts b/packages/cubejs-cubestore-driver/src/rexport.ts new file mode 100644 index 000000000000..cb94769d40cd --- /dev/null +++ b/packages/cubejs-cubestore-driver/src/rexport.ts @@ -0,0 +1 @@ +export { isCubeStoreSupported, CubeStoreHandler } from '@cubejs-backend/cubestore'; diff --git a/packages/cubejs-cubestore-driver/src/utils.ts b/packages/cubejs-cubestore-driver/src/utils.ts deleted file mode 100644 index ad2d46b3969b..000000000000 --- a/packages/cubejs-cubestore-driver/src/utils.ts +++ /dev/null @@ -1,9 +0,0 @@ -import process from 'process'; - -export function isCubeStoreSupported(): boolean { - if (process.arch !== 'x64') { - return false; - } - - return ['win32', 'darwin', 'linux'].includes(process.platform); -} diff --git a/packages/cubejs-server-core/src/core/server.ts b/packages/cubejs-server-core/src/core/server.ts index d9868840d747..844532671bc5 100644 --- a/packages/cubejs-server-core/src/core/server.ts +++ b/packages/cubejs-server-core/src/core/server.ts @@ -17,7 +17,7 @@ import { import type { Application as ExpressApplication } from 'express'; import type { BaseDriver } from '@cubejs-backend/query-orchestrator'; -import type { CubeStoreDevDriver, isCubeStoreSupported } from '@cubejs-backend/cubestore-driver'; +import type { CubeStoreDevDriver, CubeStoreHandler, isCubeStoreSupported } from '@cubejs-backend/cubestore-driver'; import type { ContextToAppIdFn, CreateOptions, @@ -30,6 +30,7 @@ import type { DriverContext, SchemaFileRepository, UserBackgroundContext, + LoggerFn, } from './types'; import { FileRepository } from './FileRepository'; @@ -294,6 +295,9 @@ export class CubejsServerCore { const dbType = opts.dbType || process.env.CUBEJS_DB_TYPE; const externalDbType = opts.externalDbType || process.env.CUBEJS_EXT_DB_TYPE; const devServer = process.env.NODE_ENV !== 'production' || process.env.CUBEJS_DEV_MODE === 'true'; + const logger: LoggerFn = opts.logger || process.env.NODE_ENV !== 'production' + ? devLogger(process.env.CUBEJS_LOG_LEVEL) + : prodLogger(process.env.CUBEJS_LOG_LEVEL); let externalDriverFactory = externalDbType && ( () => new (CubejsServerCore.lookupDriverClass(externalDbType))({ @@ -311,6 +315,7 @@ export class CubejsServerCore { if (!externalDbType && getEnv('devMode')) { const cubeStorePackage = requireFromPackage<{ isCubeStoreSupported: typeof isCubeStoreSupported, + CubeStoreHandler: typeof CubeStoreHandler, CubeStoreDevDriver: typeof CubeStoreDevDriver, }>('@cubejs-backend/cubestore-driver', { relative: isDockerImage(), @@ -320,8 +325,20 @@ export class CubejsServerCore { if (cubeStorePackage.isCubeStoreSupported()) { console.log(`🔥 Cube Store (${version}) is assigned to 13306 port.`); + const cubeStoreHandler = new cubeStorePackage.CubeStoreHandler({ + stdout: (data) => { + console.log(data.toString().trim()); + }, + stderr: (data) => { + console.log(data.toString().trim()); + }, + onRestart: (code) => logger('Cube Store Restarting', { + warning: `Instance exit with ${code}, restarting`, + }), + }); + // Lazy loading for Cube Store - externalDriverFactory = () => new cubeStorePackage.CubeStoreDevDriver(); + externalDriverFactory = () => new cubeStorePackage.CubeStoreDevDriver(cubeStoreHandler); externalDialectFactory = () => cubeStorePackage.CubeStoreDevDriver.dialectClass(); } } @@ -349,9 +366,7 @@ export class CubejsServerCore { devServer ? 'dev_pre_aggregations' : 'prod_pre_aggregations' ), schemaPath: process.env.CUBEJS_SCHEMA_PATH || 'schema', - logger: opts.logger || process.env.NODE_ENV !== 'production' - ? devLogger(process.env.CUBEJS_LOG_LEVEL) - : prodLogger(process.env.CUBEJS_LOG_LEVEL), + logger, scheduledRefreshTimer: getEnv('scheduledRefresh') !== undefined ? getEnv('scheduledRefresh') : getEnv('refreshTimer'), sqlCache: false, ...opts, diff --git a/packages/cubejs-server-core/src/core/types.ts b/packages/cubejs-server-core/src/core/types.ts index a77e050ebffd..af9fe14d7873 100644 --- a/packages/cubejs-server-core/src/core/types.ts +++ b/packages/cubejs-server-core/src/core/types.ts @@ -95,6 +95,8 @@ export type ExternalDialectFactoryFn = (context: RequestContext) => BaseQuery; export type DbTypeFn = (context: RequestContext) => DatabaseType; +export type LoggerFn = (msg: string, params: any) => void; + export interface CreateOptions { dbType?: DatabaseType | DbTypeFn; externalDbType?: DatabaseType | ExternalDbTypeFn; @@ -102,7 +104,7 @@ export interface CreateOptions { basePath?: string; devServer?: boolean; apiSecret?: string; - logger?: (msg: string, params: any) => void; + logger?: LoggerFn; driverFactory?: (context: DriverContext) => Promise|BaseDriver; dialectFactory?: (context: DialectContext) => BaseQuery; externalDriverFactory?: ExternalDriverFactoryFn; diff --git a/rust/js-wrapper/cubestore-dev.ts b/rust/js-wrapper/cubestore-dev.ts index af752b159df4..11a03806071b 100644 --- a/rust/js-wrapper/cubestore-dev.ts +++ b/rust/js-wrapper/cubestore-dev.ts @@ -1,7 +1,7 @@ -import { startCubeStoreHandler } from './process'; +import { CubeStoreHandler } from './process'; (async () => { - const handler = await startCubeStoreHandler({ + const handler = new CubeStoreHandler({ stdout: (v) => { console.log(v.toString()); }, diff --git a/rust/js-wrapper/download.ts b/rust/js-wrapper/download.ts index 239c4a7215de..739e6b2a6a8c 100644 --- a/rust/js-wrapper/download.ts +++ b/rust/js-wrapper/download.ts @@ -117,10 +117,7 @@ export function getTarget(): string { } } -async function fetchRelease() { - // eslint-disable-next-line global-require - const { version } = require('../package.json'); - +async function fetchRelease(version: string) { const client = new Octokit(); const { data } = await client.request('GET /repos/{owner}/{repo}/releases/tags/{tag}', { @@ -133,10 +130,15 @@ async function fetchRelease() { } export async function downloadBinaryFromRelease() { - const release = await fetchRelease(); + // eslint-disable-next-line global-require + const { version } = require('../package.json'); + + const release = await fetchRelease(version); if (release) { if (release.assets.length === 0) { - throw new Error('No assets in release'); + throw new Error( + `There are no artifacts for Cube Store v${version}. Most probably it is still building. Please try again later.` + ); } const target = getTarget(); @@ -146,9 +148,21 @@ export async function downloadBinaryFromRelease() { if (fileName.startsWith('cubestored-')) { const assetTarget = fileName.substr('cubestored-'.length); if (assetTarget === target) { - await downloadAndExtractFile(asset.browser_download_url, asset.name, path.resolve(__dirname, '..')); + return downloadAndExtractFile( + asset.browser_download_url, + asset.name, + path.resolve(__dirname, '..') + ); } } } + + throw new Error( + `Cube Store v${version} Artifact for ${process.platform} is not found. Most probably it is still building. Please try again later.` + ); } + + throw new Error( + `Unable to find Cube Store release v${version}. Most probably it was removed.` + ); } diff --git a/rust/js-wrapper/process.ts b/rust/js-wrapper/process.ts index 5d24631072c8..428f786a92f0 100644 --- a/rust/js-wrapper/process.ts +++ b/rust/js-wrapper/process.ts @@ -7,11 +7,6 @@ import { downloadBinaryFromRelease } from './download'; const binaryName = process.platform === 'win32' ? 'cubestored.exe' : 'cubestored'; -export interface CubeStoreHandler { - acquire: () => Promise; - release: () => Promise; -} - export interface CubeStoreHandlerOptions { stdout: (data: Buffer) => void; stderr: (data: Buffer) => void; @@ -55,44 +50,59 @@ async function startProcess(pathToExecutable: string, config: Readonly): Promise { - const pathToExecutable = path.join(__dirname, '..', 'downloaded', 'latest', 'bin', binaryName); +export function isCubeStoreSupported(): boolean { + if (process.arch !== 'x64') { + return false; + } + + return ['win32', 'darwin', 'linux'].includes(process.platform); +} - if (!fs.existsSync(pathToExecutable)) { - await downloadBinaryFromRelease(); +export class CubeStoreHandler { + protected cubeStore: Promise | null = null; - if (!fs.existsSync(pathToExecutable)) { - throw new Error('Something wrong with downloading'); + public constructor( + protected readonly config: Readonly + ) {} + + public async acquire() { + if (this.cubeStore) { + return this.cubeStore; } - } - let cubeStore: Promise | null = null; + // eslint-disable-next-line no-async-promise-executor + this.cubeStore = new Promise(async (resolve) => { + const pathToExecutable = path.join(__dirname, '..', 'downloaded', 'latest', 'bin', binaryName); + + if (!fs.existsSync(pathToExecutable)) { + await downloadBinaryFromRelease(); + + if (!fs.existsSync(pathToExecutable)) { + throw new Error('Something wrong with downloading Cube Store before running it.'); + } + } + + const onExit = (code: number|null) => { + this.config.onRestart(code); + + this.cubeStore = startProcess(pathToExecutable, { + ...this.config, + onExit + }); + }; - const onExit = (code: number|null) => { - config.onRestart(code); + this.cubeStore = startProcess(pathToExecutable, { + ...this.config, + onExit + }); - cubeStore = startProcess(pathToExecutable, { - ...config, - onExit + resolve(this.cubeStore); }); - }; - cubeStore = startProcess(pathToExecutable, { - ...config, - onExit - }); + return this.cubeStore; + } - return { - acquire: async () => { - if (cubeStore) { - await cubeStore; - } - }, - release: async () => { - // @todo Use SIGTERM for gracefully shutdown? - // if (cubeStore) { - // (await cubeStore).kill(); - // } - }, - }; + public async release() { + // @todo Use SIGTERM for gracefully shutdown? + } }