diff --git a/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts b/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts index 53e04c9479dea..7de53aea1eeb0 100644 --- a/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts +++ b/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts @@ -17,7 +17,7 @@ import { SASProtocol, generateBlobSASQueryParameters, } from '@azure/storage-blob'; -import { DriverCapabilities, QueryColumnsResult, QueryOptions, QuerySchemasResult, QueryTablesResult, UnloadOptions } from '@cubejs-backend/base-driver'; +import { DriverCapabilities, QueryColumnsResult, QueryOptions, QuerySchemasResult, QueryTablesResult, UnloadOptions, GenericDataBaseType } from '@cubejs-backend/base-driver'; import { JDBCDriver, JDBCDriverConfiguration, @@ -99,7 +99,13 @@ type ShowDatabasesRow = { databaseName: string, }; +type ColumnInfo = { + name: any; + type: GenericDataBaseType; +}; + const DatabricksToGenericType: Record = { + binary: 'hll_datasketches', 'decimal(10,0)': 'bigint', }; @@ -537,8 +543,9 @@ export class DatabricksDriver extends JDBCDriver { public async queryColumnTypes( sql: string, params?: unknown[] - ): Promise<{ name: any; type: string; }[]> { + ): Promise { const result = []; + // eslint-disable-next-line camelcase const response = await this.query<{col_name: string; data_type: string}>( `DESCRIBE QUERY ${sql}`, @@ -631,7 +638,7 @@ export class DatabricksDriver extends JDBCDriver { private async unloadWithSql(tableFullName: string, sql: string, params: unknown[]) { const types = await this.queryColumnTypes(sql, params); - await this.createExternalTableFromSql(tableFullName, sql, params); + await this.createExternalTableFromSql(tableFullName, sql, params, types); return types; } @@ -641,9 +648,8 @@ export class DatabricksDriver extends JDBCDriver { */ private async unloadWithTable(tableFullName: string) { const types = await this.tableColumnTypes(tableFullName); - const columns = types.map(t => t.name).join(', '); - await this.createExternalTableFromTable(tableFullName, columns); + await this.createExternalTableFromTable(tableFullName, types); return types; } @@ -735,14 +741,17 @@ export class DatabricksDriver extends JDBCDriver { }, region: this.config.awsRegion, }); + const url = new URL(pathname); const list = await client.listObjectsV2({ Bucket: url.host, Prefix: url.pathname.slice(1), }); + if (list.Contents === undefined) { throw new Error(`No content in specified path: ${pathname}`); } + const csvFile = await Promise.all( list.Contents .filter(file => file.Key && /.csv$/i.test(file.Key)) @@ -758,9 +767,23 @@ export class DatabricksDriver extends JDBCDriver { throw new Error('No CSV files were exported to the specified bucket. ' + 'Please check your export bucket configuration.'); } + return csvFile; } + protected generateTableColumnsForExport(columns: ColumnInfo[]): string { + const wrapped = columns.map((c) => { + if (c.type === 'hll_datasketches') { + // [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The CSV datasource doesn't support type \"BINARY\". SQLSTATE: 0A000 + return `base64(${c.name})`; + } else { + return c.name; + } + }); + + return wrapped.join(', '); + } + /** * Saves specified query to the configured bucket. This requires Databricks * cluster to be configured. @@ -778,14 +801,20 @@ export class DatabricksDriver extends JDBCDriver { * `fs.s3a.access.key ` * `fs.s3a.secret.key ` */ - private async createExternalTableFromSql(tableFullName: string, sql: string, params: unknown[]) { + private async createExternalTableFromSql(tableFullName: string, sql: string, params: unknown[], columns: ColumnInfo[]) { + let select = sql; + + if (columns.find((column) => column.type === 'hll_datasketches')) { + select = `SELECT ${this.generateTableColumnsForExport(columns)} FROM (${sql})`; + } + try { await this.query( ` CREATE TABLE ${tableFullName} USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv' OPTIONS (escape = '"') - AS (${sql}); + AS (${select}); `, params, ); @@ -811,14 +840,14 @@ export class DatabricksDriver extends JDBCDriver { * `fs.s3a.access.key ` * `fs.s3a.secret.key ` */ - private async createExternalTableFromTable(tableFullName: string, columns: string) { + private async createExternalTableFromTable(tableFullName: string, columns: ColumnInfo[]) { try { await this.query( ` CREATE TABLE _${tableFullName} USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${tableFullName}.csv' OPTIONS (escape = '"') - AS SELECT ${columns} FROM ${tableFullName} + AS SELECT ${this.generateTableColumnsForExport(columns)} FROM ${tableFullName} `, [], ); diff --git a/packages/cubejs-druid-driver/src/DruidQuery.ts b/packages/cubejs-druid-driver/src/DruidQuery.ts index 5cd3d975de336..cdcedeb7e97df 100644 --- a/packages/cubejs-druid-driver/src/DruidQuery.ts +++ b/packages/cubejs-druid-driver/src/DruidQuery.ts @@ -50,6 +50,6 @@ export class DruidQuery extends BaseQuery { } public nowTimestampSql(): string { - return `CURRENT_TIMESTAMP`; + return 'CURRENT_TIMESTAMP'; } } diff --git a/packages/cubejs-testing-drivers/src/helpers/getComposePath.ts b/packages/cubejs-testing-drivers/src/helpers/getComposePath.ts index c0e85299de1da..5c0f895100251 100644 --- a/packages/cubejs-testing-drivers/src/helpers/getComposePath.ts +++ b/packages/cubejs-testing-drivers/src/helpers/getComposePath.ts @@ -2,23 +2,26 @@ import fs from 'fs-extra'; import path from 'path'; import * as YAML from 'yaml'; -import { getFixtures } from './getFixtures'; + +import type { Fixture } from '../types/Fixture'; /** * Returns docker compose file by data source type. */ -export function getComposePath(type: string, isLocal: boolean): [path: string, file: string] { +export function getComposePath(type: string, fixture: Fixture, isLocal: boolean): [path: string, file: string] { const _path = path.resolve(process.cwd(), './.temp'); const _file = `${type}-compose.yaml`; - const { cube, data } = getFixtures(type); + const depends_on = ['store']; - if (cube.depends_on) { - depends_on.concat(cube.depends_on); + if (fixture.cube.depends_on) { + depends_on.concat(fixture.cube.depends_on); } + const links = ['store']; - if (cube.links) { - depends_on.concat(cube.links); + if (fixture.cube.links) { + depends_on.concat(fixture.cube.links); } + const volumes = [ './cube.js:/cube/conf/cube.js', './package.json:/cube/conf/package.json', @@ -29,7 +32,7 @@ export function getComposePath(type: string, isLocal: boolean): [path: string, f services: { ...(!isLocal ? { cube: { - ...cube, + ...fixture.cube, container_name: 'cube', image: 'cubejs/cube:testing-drivers', depends_on, @@ -46,12 +49,14 @@ export function getComposePath(type: string, isLocal: boolean): [path: string, f } } }; - if (data) { + + if (fixture.data) { compose.services.data = { - ...data, + ...fixture.data, container_name: 'data', }; } + fs.writeFileSync( path.resolve(_path, _file), YAML.stringify(compose), diff --git a/packages/cubejs-testing-drivers/src/helpers/runEnvironment.ts b/packages/cubejs-testing-drivers/src/helpers/runEnvironment.ts index 41e1b96aec2e3..1c103393de7d9 100644 --- a/packages/cubejs-testing-drivers/src/helpers/runEnvironment.ts +++ b/packages/cubejs-testing-drivers/src/helpers/runEnvironment.ts @@ -90,7 +90,7 @@ export async function runEnvironment( suf?: string, { extendedEnv }: { extendedEnv?: string } = {} ): Promise { - const fixtures = getFixtures(type, extendedEnv); + const fixture = getFixtures(type, extendedEnv); getTempPath(); getSchemaPath(type, suf); getCubeJsPath(type); @@ -110,7 +110,7 @@ export async function runEnvironment( }) .argv; const isLocal = mode === 'local'; - const [composePath, composeFile] = getComposePath(type, isLocal); + const [composePath, composeFile] = getComposePath(type, fixture, isLocal); const compose = new DockerComposeEnvironment( composePath, composeFile, @@ -120,16 +120,8 @@ export async function runEnvironment( CUBEJS_TELEMETRY: 'false', }); - const _path = `${path.resolve(process.cwd(), `./fixtures/${type}.env`)}`; - if (fs.existsSync(_path)) { - config({ - path: _path, - encoding: 'utf8', - override: true, - }); - } - Object.keys(fixtures.cube.environment).forEach((key) => { - const val = fixtures.cube.environment[key]; + Object.keys(fixture.cube.environment).forEach((key) => { + const val = fixture.cube.environment[key]; const { length } = val; if (val.indexOf('${') === 0 && val.indexOf('}') === length - 1) { @@ -144,8 +136,8 @@ export async function runEnvironment( if (process.env[key]) { compose.withEnvironment({ [key]: process.env[key] }); - } else if (fixtures.cube.environment[key]) { - process.env[key] = fixtures.cube.environment[key]; + } else if (fixture.cube.environment[key]) { + process.env[key] = fixture.cube.environment[key]; } }); @@ -166,8 +158,8 @@ export async function runEnvironment( }; const cliEnv = isLocal ? new CubeCliEnvironment(composePath) : null; - const mappedDataPort = fixtures.data ? environment.getContainer('data').getMappedPort( - parseInt(fixtures.data.ports[0], 10), + const mappedDataPort = fixture.data ? environment.getContainer('data').getMappedPort( + parseInt(fixture.data.ports[0], 10), ) : null; if (cliEnv) { cliEnv.withEnvironment({ @@ -184,19 +176,19 @@ export async function runEnvironment( } const cube = cliEnv ? { port: 4000, - pgPort: parseInt(fixtures.cube.ports[1], 10), + pgPort: parseInt(fixture.cube.ports[1], 10), logs: cliEnv.cli?.stdout || process.stdout } : { port: environment.getContainer('cube').getMappedPort( - parseInt(fixtures.cube.ports[0], 10), + parseInt(fixture.cube.ports[0], 10), ), - pgPort: fixtures.cube.ports[1] && environment.getContainer('cube').getMappedPort( - parseInt(fixtures.cube.ports[1], 10), + pgPort: fixture.cube.ports[1] && environment.getContainer('cube').getMappedPort( + parseInt(fixture.cube.ports[1], 10), ) || undefined, logs: await environment.getContainer('cube').logs(), }; - if (fixtures.data) { + if (fixture.data) { const data = { port: mappedDataPort!, logs: await environment.getContainer('data').logs(), @@ -207,6 +199,9 @@ export async function runEnvironment( data, stop: async () => { await environment.down({ timeout: 30 * 1000 }); + if (cliEnv) { + await cliEnv.down(); + } }, }; }