Skip to content

Commit

Permalink
feat: Use single instance for Cube Store handler (#2229)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Feb 28, 2021
1 parent 8821b9e commit 35c140c
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 82 deletions.
3 changes: 2 additions & 1 deletion packages/cubejs-cubestore-driver/index.js
@@ -1,6 +1,6 @@
const { CubeStoreDriver } = require('./dist/src/CubeStoreDriver');
const { CubeStoreDevDriver } = require('./dist/src/CubeStoreDevDriver');
const { isCubeStoreSupported } = require('./dist/src/utils');
const { isCubeStoreSupported, CubeStoreHandler } = require('./dist/src/rexport');

/**
* After 5 years working with TypeScript, now I know
Expand All @@ -14,3 +14,4 @@ module.exports = CubeStoreDriver;
*/
module.exports.CubeStoreDevDriver = CubeStoreDevDriver;
module.exports.isCubeStoreSupported = isCubeStoreSupported;
module.exports.CubeStoreHandler = CubeStoreHandler;
26 changes: 6 additions & 20 deletions packages/cubejs-cubestore-driver/src/CubeStoreDevDriver.ts
@@ -1,14 +1,14 @@
import { CubeStoreHandler, startCubeStoreHandler } from '@cubejs-backend/cubestore';
import { CubeStoreHandler } from '@cubejs-backend/cubestore';

import { CubeStoreDriver } from './CubeStoreDriver';
import { ConnectionConfig } from './types';
import { AsyncConnection } from './connection';

export class CubeStoreDevDriver extends CubeStoreDriver {
// Let's use Promise as Mutex to protect multiple starting of Cube Store
protected cubeStoreHandler: Promise<CubeStoreHandler>|null = null;

public constructor(config?: Partial<ConnectionConfig>) {
public constructor(
protected readonly cubeStoreHandler: CubeStoreHandler,
config?: Partial<ConnectionConfig>
) {
super({
...config,
// @todo Make random port selection when 13306 is already used?
Expand All @@ -17,21 +17,7 @@ export class CubeStoreDevDriver extends CubeStoreDriver {
}

protected async acquireCubeStore() {
if (!this.cubeStoreHandler) {
this.cubeStoreHandler = startCubeStoreHandler({
stdout: (data) => {
console.log(data.toString().trim());
},
stderr: (data) => {
console.log(data.toString().trim());
},
onRestart: (code) => this.logger('Cube Store Restarting', {
warning: `Instance exit with ${code}, restarting`,
}),
});
}

await (await this.cubeStoreHandler).acquire();
return this.cubeStoreHandler.acquire();
}

public async withConnection(fn: (connection: AsyncConnection) => Promise<unknown>) {
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-cubestore-driver/src/index.ts
@@ -1,4 +1,4 @@
export * from './CubeStoreQuery';
export * from './CubeStoreDriver';
export * from './CubeStoreDevDriver';
export * from './utils';
export * from './rexport';
1 change: 1 addition & 0 deletions packages/cubejs-cubestore-driver/src/rexport.ts
@@ -0,0 +1 @@
export { isCubeStoreSupported, CubeStoreHandler } from '@cubejs-backend/cubestore';
9 changes: 0 additions & 9 deletions packages/cubejs-cubestore-driver/src/utils.ts

This file was deleted.

25 changes: 20 additions & 5 deletions packages/cubejs-server-core/src/core/server.ts
Expand Up @@ -17,7 +17,7 @@ import {

import type { Application as ExpressApplication } from 'express';
import type { BaseDriver } from '@cubejs-backend/query-orchestrator';
import type { CubeStoreDevDriver, isCubeStoreSupported } from '@cubejs-backend/cubestore-driver';
import type { CubeStoreDevDriver, CubeStoreHandler, isCubeStoreSupported } from '@cubejs-backend/cubestore-driver';
import type {
ContextToAppIdFn,
CreateOptions,
Expand All @@ -30,6 +30,7 @@ import type {
DriverContext,
SchemaFileRepository,
UserBackgroundContext,
LoggerFn,
} from './types';

import { FileRepository } from './FileRepository';
Expand Down Expand Up @@ -294,6 +295,9 @@ export class CubejsServerCore {
const dbType = opts.dbType || <DatabaseType|undefined>process.env.CUBEJS_DB_TYPE;
const externalDbType = opts.externalDbType || <DatabaseType|undefined>process.env.CUBEJS_EXT_DB_TYPE;
const devServer = process.env.NODE_ENV !== 'production' || process.env.CUBEJS_DEV_MODE === 'true';
const logger: LoggerFn = opts.logger || process.env.NODE_ENV !== 'production'
? devLogger(process.env.CUBEJS_LOG_LEVEL)
: prodLogger(process.env.CUBEJS_LOG_LEVEL);

let externalDriverFactory = externalDbType && (
() => new (CubejsServerCore.lookupDriverClass(externalDbType))({
Expand All @@ -311,6 +315,7 @@ export class CubejsServerCore {
if (!externalDbType && getEnv('devMode')) {
const cubeStorePackage = requireFromPackage<{
isCubeStoreSupported: typeof isCubeStoreSupported,
CubeStoreHandler: typeof CubeStoreHandler,
CubeStoreDevDriver: typeof CubeStoreDevDriver,
}>('@cubejs-backend/cubestore-driver', {
relative: isDockerImage(),
Expand All @@ -320,8 +325,20 @@ export class CubejsServerCore {
if (cubeStorePackage.isCubeStoreSupported()) {
console.log(`🔥 Cube Store (${version}) is assigned to 13306 port.`);

const cubeStoreHandler = new cubeStorePackage.CubeStoreHandler({
stdout: (data) => {
console.log(data.toString().trim());
},
stderr: (data) => {
console.log(data.toString().trim());
},
onRestart: (code) => logger('Cube Store Restarting', {
warning: `Instance exit with ${code}, restarting`,
}),
});

// Lazy loading for Cube Store
externalDriverFactory = () => new cubeStorePackage.CubeStoreDevDriver();
externalDriverFactory = () => new cubeStorePackage.CubeStoreDevDriver(cubeStoreHandler);
externalDialectFactory = () => cubeStorePackage.CubeStoreDevDriver.dialectClass();
}
}
Expand Down Expand Up @@ -349,9 +366,7 @@ export class CubejsServerCore {
devServer ? 'dev_pre_aggregations' : 'prod_pre_aggregations'
),
schemaPath: process.env.CUBEJS_SCHEMA_PATH || 'schema',
logger: opts.logger || process.env.NODE_ENV !== 'production'
? devLogger(process.env.CUBEJS_LOG_LEVEL)
: prodLogger(process.env.CUBEJS_LOG_LEVEL),
logger,
scheduledRefreshTimer: getEnv('scheduledRefresh') !== undefined ? getEnv('scheduledRefresh') : getEnv('refreshTimer'),
sqlCache: false,
...opts,
Expand Down
4 changes: 3 additions & 1 deletion packages/cubejs-server-core/src/core/types.ts
Expand Up @@ -95,14 +95,16 @@ export type ExternalDialectFactoryFn = (context: RequestContext) => BaseQuery;

export type DbTypeFn = (context: RequestContext) => DatabaseType;

export type LoggerFn = (msg: string, params: any) => void;

export interface CreateOptions {
dbType?: DatabaseType | DbTypeFn;
externalDbType?: DatabaseType | ExternalDbTypeFn;
schemaPath?: string;
basePath?: string;
devServer?: boolean;
apiSecret?: string;
logger?: (msg: string, params: any) => void;
logger?: LoggerFn;
driverFactory?: (context: DriverContext) => Promise<BaseDriver>|BaseDriver;
dialectFactory?: (context: DialectContext) => BaseQuery;
externalDriverFactory?: ExternalDriverFactoryFn;
Expand Down
4 changes: 2 additions & 2 deletions rust/js-wrapper/cubestore-dev.ts
@@ -1,7 +1,7 @@
import { startCubeStoreHandler } from './process';
import { CubeStoreHandler } from './process';

(async () => {
const handler = await startCubeStoreHandler({
const handler = new CubeStoreHandler({
stdout: (v) => {
console.log(v.toString());
},
Expand Down
28 changes: 21 additions & 7 deletions rust/js-wrapper/download.ts
Expand Up @@ -117,10 +117,7 @@ export function getTarget(): string {
}
}

async function fetchRelease() {
// eslint-disable-next-line global-require
const { version } = require('../package.json');

async function fetchRelease(version: string) {
const client = new Octokit();

const { data } = await client.request('GET /repos/{owner}/{repo}/releases/tags/{tag}', {
Expand All @@ -133,10 +130,15 @@ async function fetchRelease() {
}

export async function downloadBinaryFromRelease() {
const release = await fetchRelease();
// eslint-disable-next-line global-require
const { version } = require('../package.json');

const release = await fetchRelease(version);
if (release) {
if (release.assets.length === 0) {
throw new Error('No assets in release');
throw new Error(
`There are no artifacts for Cube Store v${version}. Most probably it is still building. Please try again later.`
);
}

const target = getTarget();
Expand All @@ -146,9 +148,21 @@ export async function downloadBinaryFromRelease() {
if (fileName.startsWith('cubestored-')) {
const assetTarget = fileName.substr('cubestored-'.length);
if (assetTarget === target) {
await downloadAndExtractFile(asset.browser_download_url, asset.name, path.resolve(__dirname, '..'));
return downloadAndExtractFile(
asset.browser_download_url,
asset.name,
path.resolve(__dirname, '..')
);
}
}
}

throw new Error(
`Cube Store v${version} Artifact for ${process.platform} is not found. Most probably it is still building. Please try again later.`
);
}

throw new Error(
`Unable to find Cube Store release v${version}. Most probably it was removed.`
);
}
82 changes: 46 additions & 36 deletions rust/js-wrapper/process.ts
Expand Up @@ -7,11 +7,6 @@ import { downloadBinaryFromRelease } from './download';

const binaryName = process.platform === 'win32' ? 'cubestored.exe' : 'cubestored';

export interface CubeStoreHandler {
acquire: () => Promise<void>;
release: () => Promise<void>;
}

export interface CubeStoreHandlerOptions {
stdout: (data: Buffer) => void;
stderr: (data: Buffer) => void;
Expand Down Expand Up @@ -55,44 +50,59 @@ async function startProcess(pathToExecutable: string, config: Readonly<StartProc
return cubeStore;
}

export async function startCubeStoreHandler(config: Readonly<CubeStoreHandlerOptions>): Promise<CubeStoreHandler> {
const pathToExecutable = path.join(__dirname, '..', 'downloaded', 'latest', 'bin', binaryName);
export function isCubeStoreSupported(): boolean {
if (process.arch !== 'x64') {
return false;
}

return ['win32', 'darwin', 'linux'].includes(process.platform);
}

if (!fs.existsSync(pathToExecutable)) {
await downloadBinaryFromRelease();
export class CubeStoreHandler {
protected cubeStore: Promise<ChildProcess> | null = null;

if (!fs.existsSync(pathToExecutable)) {
throw new Error('Something wrong with downloading');
public constructor(
protected readonly config: Readonly<CubeStoreHandlerOptions>
) {}

public async acquire() {
if (this.cubeStore) {
return this.cubeStore;
}
}

let cubeStore: Promise<ChildProcess> | null = null;
// eslint-disable-next-line no-async-promise-executor
this.cubeStore = new Promise<ChildProcess>(async (resolve) => {
const pathToExecutable = path.join(__dirname, '..', 'downloaded', 'latest', 'bin', binaryName);

if (!fs.existsSync(pathToExecutable)) {
await downloadBinaryFromRelease();

if (!fs.existsSync(pathToExecutable)) {
throw new Error('Something wrong with downloading Cube Store before running it.');
}
}

const onExit = (code: number|null) => {
this.config.onRestart(code);

this.cubeStore = startProcess(pathToExecutable, {
...this.config,
onExit
});
};

const onExit = (code: number|null) => {
config.onRestart(code);
this.cubeStore = startProcess(pathToExecutable, {
...this.config,
onExit
});

cubeStore = startProcess(pathToExecutable, {
...config,
onExit
resolve(this.cubeStore);
});
};

cubeStore = startProcess(pathToExecutable, {
...config,
onExit
});
return this.cubeStore;
}

return {
acquire: async () => {
if (cubeStore) {
await cubeStore;
}
},
release: async () => {
// @todo Use SIGTERM for gracefully shutdown?
// if (cubeStore) {
// (await cubeStore).kill();
// }
},
};
public async release() {
// @todo Use SIGTERM for gracefully shutdown?
}
}

0 comments on commit 35c140c

Please sign in to comment.