Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
SASProtocol,
generateBlobSASQueryParameters,
} from '@azure/storage-blob';
import { DriverCapabilities, QueryColumnsResult, QueryOptions, QuerySchemasResult, QueryTablesResult, UnloadOptions } from '@cubejs-backend/base-driver';
import { DriverCapabilities, QueryColumnsResult, QueryOptions, QuerySchemasResult, QueryTablesResult, UnloadOptions, GenericDataBaseType } from '@cubejs-backend/base-driver';
import {
JDBCDriver,
JDBCDriverConfiguration,
Expand Down Expand Up @@ -99,7 +99,13 @@ type ShowDatabasesRow = {
databaseName: string,
};

type ColumnInfo = {
name: any;
type: GenericDataBaseType;
};

const DatabricksToGenericType: Record<string, string> = {
binary: 'hll_datasketches',
'decimal(10,0)': 'bigint',
};

Expand Down Expand Up @@ -537,8 +543,9 @@ export class DatabricksDriver extends JDBCDriver {
public async queryColumnTypes(
sql: string,
params?: unknown[]
): Promise<{ name: any; type: string; }[]> {
): Promise<ColumnInfo[]> {
const result = [];

// eslint-disable-next-line camelcase
const response = await this.query<{col_name: string; data_type: string}>(
`DESCRIBE QUERY ${sql}`,
Expand Down Expand Up @@ -631,7 +638,7 @@ export class DatabricksDriver extends JDBCDriver {
private async unloadWithSql(tableFullName: string, sql: string, params: unknown[]) {
const types = await this.queryColumnTypes(sql, params);

await this.createExternalTableFromSql(tableFullName, sql, params);
await this.createExternalTableFromSql(tableFullName, sql, params, types);

return types;
}
Expand All @@ -641,9 +648,8 @@ export class DatabricksDriver extends JDBCDriver {
*/
private async unloadWithTable(tableFullName: string) {
const types = await this.tableColumnTypes(tableFullName);
const columns = types.map(t => t.name).join(', ');

await this.createExternalTableFromTable(tableFullName, columns);
await this.createExternalTableFromTable(tableFullName, types);

return types;
}
Expand Down Expand Up @@ -735,14 +741,17 @@ export class DatabricksDriver extends JDBCDriver {
},
region: this.config.awsRegion,
});

const url = new URL(pathname);
const list = await client.listObjectsV2({
Bucket: url.host,
Prefix: url.pathname.slice(1),
});

if (list.Contents === undefined) {
throw new Error(`No content in specified path: ${pathname}`);
}

const csvFile = await Promise.all(
list.Contents
.filter(file => file.Key && /.csv$/i.test(file.Key))
Expand All @@ -758,9 +767,23 @@ export class DatabricksDriver extends JDBCDriver {
throw new Error('No CSV files were exported to the specified bucket. ' +
'Please check your export bucket configuration.');
}

return csvFile;
}

protected generateTableColumnsForExport(columns: ColumnInfo[]): string {
const wrapped = columns.map((c) => {
if (c.type === 'hll_datasketches') {
// [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The CSV datasource doesn't support type \"BINARY\". SQLSTATE: 0A000
return `base64(${c.name})`;
} else {
return c.name;
}
});

return wrapped.join(', ');
}

/**
* Saves specified query to the configured bucket. This requires Databricks
* cluster to be configured.
Expand All @@ -778,14 +801,20 @@ export class DatabricksDriver extends JDBCDriver {
* `fs.s3a.access.key <aws-access-key>`
* `fs.s3a.secret.key <aws-secret-key>`
*/
private async createExternalTableFromSql(tableFullName: string, sql: string, params: unknown[]) {
private async createExternalTableFromSql(tableFullName: string, sql: string, params: unknown[], columns: ColumnInfo[]) {
let select = sql;

if (columns.find((column) => column.type === 'hll_datasketches')) {
select = `SELECT ${this.generateTableColumnsForExport(columns)} FROM (${sql})`;
}

try {
await this.query(
`
CREATE TABLE ${tableFullName}
USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv'
OPTIONS (escape = '"')
AS (${sql});
AS (${select});
`,
params,
);
Expand All @@ -811,14 +840,14 @@ export class DatabricksDriver extends JDBCDriver {
* `fs.s3a.access.key <aws-access-key>`
* `fs.s3a.secret.key <aws-secret-key>`
*/
private async createExternalTableFromTable(tableFullName: string, columns: string) {
private async createExternalTableFromTable(tableFullName: string, columns: ColumnInfo[]) {
try {
await this.query(
`
CREATE TABLE _${tableFullName}
USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv'
OPTIONS (escape = '"')
AS SELECT ${columns} FROM ${tableFullName}
AS SELECT ${this.generateTableColumnsForExport(columns)} FROM ${tableFullName}
`,
[],
);
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-druid-driver/src/DruidQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ export class DruidQuery extends BaseQuery {
}

public nowTimestampSql(): string {
return `CURRENT_TIMESTAMP`;
return 'CURRENT_TIMESTAMP';
}
}
25 changes: 15 additions & 10 deletions packages/cubejs-testing-drivers/src/helpers/getComposePath.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@
import fs from 'fs-extra';
import path from 'path';
import * as YAML from 'yaml';
import { getFixtures } from './getFixtures';

import type { Fixture } from '../types/Fixture';

/**
* Returns docker compose file by data source type.
*/
export function getComposePath(type: string, isLocal: boolean): [path: string, file: string] {
export function getComposePath(type: string, fixture: Fixture, isLocal: boolean): [path: string, file: string] {
const _path = path.resolve(process.cwd(), './.temp');
const _file = `${type}-compose.yaml`;
const { cube, data } = getFixtures(type);

const depends_on = ['store'];
if (cube.depends_on) {
depends_on.concat(cube.depends_on);
if (fixture.cube.depends_on) {
depends_on.concat(fixture.cube.depends_on);
}

const links = ['store'];
if (cube.links) {
depends_on.concat(cube.links);
if (fixture.cube.links) {
depends_on.concat(fixture.cube.links);
}

const volumes = [
'./cube.js:/cube/conf/cube.js',
'./package.json:/cube/conf/package.json',
Expand All @@ -29,7 +32,7 @@ export function getComposePath(type: string, isLocal: boolean): [path: string, f
services: {
...(!isLocal ? {
cube: {
...cube,
...fixture.cube,
container_name: 'cube',
image: 'cubejs/cube:testing-drivers',
depends_on,
Expand All @@ -46,12 +49,14 @@ export function getComposePath(type: string, isLocal: boolean): [path: string, f
}
}
};
if (data) {

if (fixture.data) {
compose.services.data = {
...data,
...fixture.data,
container_name: 'data',
};
}

fs.writeFileSync(
path.resolve(_path, _file),
YAML.stringify(compose),
Expand Down
37 changes: 16 additions & 21 deletions packages/cubejs-testing-drivers/src/helpers/runEnvironment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export async function runEnvironment(
suf?: string,
{ extendedEnv }: { extendedEnv?: string } = {}
): Promise<Environment> {
const fixtures = getFixtures(type, extendedEnv);
const fixture = getFixtures(type, extendedEnv);
getTempPath();
getSchemaPath(type, suf);
getCubeJsPath(type);
Expand All @@ -110,7 +110,7 @@ export async function runEnvironment(
})
.argv;
const isLocal = mode === 'local';
const [composePath, composeFile] = getComposePath(type, isLocal);
const [composePath, composeFile] = getComposePath(type, fixture, isLocal);
const compose = new DockerComposeEnvironment(
composePath,
composeFile,
Expand All @@ -120,16 +120,8 @@ export async function runEnvironment(
CUBEJS_TELEMETRY: 'false',
});

const _path = `${path.resolve(process.cwd(), `./fixtures/${type}.env`)}`;
if (fs.existsSync(_path)) {
config({
path: _path,
encoding: 'utf8',
override: true,
});
}
Object.keys(fixtures.cube.environment).forEach((key) => {
const val = fixtures.cube.environment[key];
Object.keys(fixture.cube.environment).forEach((key) => {
const val = fixture.cube.environment[key];
const { length } = val;

if (val.indexOf('${') === 0 && val.indexOf('}') === length - 1) {
Expand All @@ -144,8 +136,8 @@ export async function runEnvironment(

if (process.env[key]) {
compose.withEnvironment({ [key]: <string>process.env[key] });
} else if (fixtures.cube.environment[key]) {
process.env[key] = fixtures.cube.environment[key];
} else if (fixture.cube.environment[key]) {
process.env[key] = fixture.cube.environment[key];
}
});

Expand All @@ -166,8 +158,8 @@ export async function runEnvironment(
};

const cliEnv = isLocal ? new CubeCliEnvironment(composePath) : null;
const mappedDataPort = fixtures.data ? environment.getContainer('data').getMappedPort(
parseInt(fixtures.data.ports[0], 10),
const mappedDataPort = fixture.data ? environment.getContainer('data').getMappedPort(
parseInt(fixture.data.ports[0], 10),
) : null;
if (cliEnv) {
cliEnv.withEnvironment({
Expand All @@ -184,19 +176,19 @@ export async function runEnvironment(
}
const cube = cliEnv ? {
port: 4000,
pgPort: parseInt(fixtures.cube.ports[1], 10),
pgPort: parseInt(fixture.cube.ports[1], 10),
logs: cliEnv.cli?.stdout || process.stdout
} : {
port: environment.getContainer('cube').getMappedPort(
parseInt(fixtures.cube.ports[0], 10),
parseInt(fixture.cube.ports[0], 10),
),
pgPort: fixtures.cube.ports[1] && environment.getContainer('cube').getMappedPort(
parseInt(fixtures.cube.ports[1], 10),
pgPort: fixture.cube.ports[1] && environment.getContainer('cube').getMappedPort(
parseInt(fixture.cube.ports[1], 10),
) || undefined,
logs: await environment.getContainer('cube').logs(),
};

if (fixtures.data) {
if (fixture.data) {
const data = {
port: mappedDataPort!,
logs: await environment.getContainer('data').logs(),
Expand All @@ -207,6 +199,9 @@ export async function runEnvironment(
data,
stop: async () => {
await environment.down({ timeout: 30 * 1000 });
if (cliEnv) {
await cliEnv.down();
}
},
};
}
Expand Down