Skip to content

Commit

Permalink
feat: Dont introspect schema, if driver can detect types
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed May 20, 2021
1 parent 7685ffd commit 3467b44
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 73 deletions.
2 changes: 1 addition & 1 deletion packages/cubejs-cli/src/command/proxy-command.ts
@@ -1,13 +1,13 @@
import { CommanderStatic } from 'commander';
import chalk from 'chalk';
import semver from 'semver';
import type { Command, flags } from '@oclif/command';
import {
isDockerImage,
packageExists,
requireFromPackage,
requirePackageManifest,
} from '@cubejs-backend/shared';
import type { Command, flags } from '@oclif/command';
import { displayError, loadCliManifest, } from '../utils';

export async function proxyCommand(program: CommanderStatic, command: string) {
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-cli/src/command/token.ts
@@ -1,7 +1,7 @@
import chalk from 'chalk';
import jwt from 'jsonwebtoken';
import type { CommanderStatic } from 'commander';
import { isDockerImage, requireFromPackage } from '@cubejs-backend/shared';
import type { CommanderStatic } from 'commander';

import { displayError, displayWarning, event } from '../utils';

Expand Down
3 changes: 2 additions & 1 deletion packages/cubejs-postgres-driver/package.json
Expand Up @@ -21,7 +21,8 @@
"build": "rm -rf dist && npm run tsc",
"tsc": "tsc",
"watch": "tsc -w",
"lint": "eslint **/*.js"
"lint": "eslint src/* --ext .ts",
"lint:fix": "eslint --fix src/* --ext .ts"
},
"dependencies": {
"@cubejs-backend/query-orchestrator": "^0.27.16",
Expand Down
3 changes: 1 addition & 2 deletions packages/cubejs-postgres-driver/src/PostgresDriver.ts
Expand Up @@ -86,7 +86,7 @@ export class PostgresDriver extends BaseDriver implements DriverInterface {
public async stream(
query: string,
values: unknown[],
{ batchSize, highWaterMark }: StreamOptions
{ highWaterMark }: StreamOptions
): Promise<StreamTableDataWithTypes> {
const conn = await this.pool.connect();

Expand All @@ -97,7 +97,6 @@ export class PostgresDriver extends BaseDriver implements DriverInterface {
types: {
getTypeParser,
},
batchSize,
highWaterMark
});
const rowStream: QueryStream = await conn.query(queryStream);
Expand Down
Expand Up @@ -41,7 +41,6 @@ export interface ExternalDriverCompatibilities {
streamImport?: true,
}
export type StreamOptions = {
batchSize: number
highWaterMark: number
};

Expand Down
Expand Up @@ -693,6 +693,8 @@ class PreAggregationLoader {
protected async refreshImplStreamExternalStrategy(client, newVersionEntry, saveCancelFn, invalidationKeys) {
const [sql, params] =
Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []];

// @todo Deprecated, BaseDriver already implements it, before remove we need to add check for factoryDriver
if (!client.downloadQueryResults) {
throw new Error('Can\'t load external pre-aggregation: source driver doesn\'t support downloadQueryResults()');
}
Expand Down Expand Up @@ -726,14 +728,17 @@ class PreAggregationLoader {
}

protected getStreamingOptions(): StreamOptions {
return { batchSize: 1000, highWaterMark: 2000 }
return {
// Default: 16384 (16KB), or 16 for objectMode streams. PostgreSQL/MySQL use object streams
highWaterMark: 10000
};
}

/**
* Create table (for db with write permissions) and extract data via memory/stream/unload
*/
protected async downloadTempExternalPreAggregation(client: DriverInterface, newVersionEntry, saveCancelFn) {
// @todo Absolute, before remove we need to add checks for factoryDriver, that it extends from BaseDriver
// @todo Deprecated, BaseDriver already implements it, before remove we need to add check for factoryDriver
if (!client.downloadTable) {
throw new Error('Can\'t load external pre-aggregation: source driver doesn\'t support downloadTable()');
}
Expand All @@ -752,7 +757,10 @@ class PreAggregationLoader {
? client.stream(`SELECT * FROM ${table}`, [], this.getStreamingOptions())
: client.downloadTable(table, capabilities)
);
tableData.types = await saveCancelFn(client.tableColumnTypes(table));

if (!tableData.types) {
tableData.types = await saveCancelFn(client.tableColumnTypes(table));
}

return tableData;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-schema-compiler/package.json
Expand Up @@ -42,6 +42,7 @@
"@babel/standalone": "^7.12.10",
"@babel/traverse": "^7.12.10",
"@babel/types": "^7.12.12",
"@cubejs-backend/shared": "^0.27.15",
"@hapi/joi": "^15.1.1",
"antlr4ts": "0.5.0-alpha.4",
"camelcase": "^6.2.0",
Expand All @@ -58,7 +59,6 @@
"devDependencies": {
"@apla/clickhouse": "^1.5.5",
"@cubejs-backend/linter": "^0.27.0",
"@cubejs-backend/shared": "^0.27.15",
"@types/babel__code-frame": "^7.0.2",
"@types/babel__generator": "^7.6.2",
"@types/babel__parser": "^7.1.1",
Expand Down
Expand Up @@ -34,6 +34,6 @@ export class SnowflakeQuery extends BaseQuery {
}

nowTimestampSql() {
return `CURRENT_TIMESTAMP`;
return 'CURRENT_TIMESTAMP';
}
}
4 changes: 2 additions & 2 deletions packages/cubejs-server-core/src/core/DevServer.ts
@@ -1,15 +1,15 @@
/* eslint-disable global-require,no-restricted-syntax */
import type { ChildProcess } from 'child_process';
import dotenv from '@cubejs-backend/dotenv';
import spawn from 'cross-spawn';
import path from 'path';
import fs from 'fs-extra';
import { getRequestIdFromRequest } from '@cubejs-backend/api-gateway';
import { LivePreviewWatcher } from '@cubejs-backend/cloud';
import { AppContainer, DependencyTree, PackageFetcher, DevPackageFetcher } from '@cubejs-backend/templates';
import type { Application as ExpressApplication } from 'express';
import jwt from 'jsonwebtoken';
import isDocker from 'is-docker';
import type { Application as ExpressApplication } from 'express';
import type { ChildProcess } from 'child_process';

import type { BaseDriver } from '@cubejs-backend/query-orchestrator';

Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-server/src/websocket-server.ts
@@ -1,10 +1,10 @@
import WebSocket from 'ws';
import crypto from 'crypto';
import util from 'util';
import { CancelableInterval, createCancelableInterval } from '@cubejs-backend/shared';
import type { CubejsServerCore } from '@cubejs-backend/server-core';
import type http from 'http';
import type https from 'https';
import { CancelableInterval, createCancelableInterval } from '@cubejs-backend/shared';

export interface WebSocketServerOptions {
processSubscriptionsInterval?: number,
Expand Down

0 comments on commit 3467b44

Please sign in to comment.