Skip to content

Commit

Permalink
feat(databricks-jdbc-driver): Support HLL (#8257)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed May 14, 2024
1 parent 66aa01d commit da231ed
Show file tree
Hide file tree
Showing 24 changed files with 4,939 additions and 2,084 deletions.
12 changes: 12 additions & 0 deletions packages/cubejs-databricks-jdbc-driver/src/DatabricksQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ export class DatabricksQuery extends BaseQuery {
return new DatabricksFilter(this, filter);
}

public hllInit(sql: string) {
return `hll_sketch_agg(${sql})`;
}

public hllMerge(sql: string) {
return `hll_union_agg(${sql})`;
}

public countDistinctApprox(sql: string) {
return `approx_count_distinct(${sql})`;
}

public convertTz(field: string) {
return `from_utc_timestamp(${field}, '${this.timezone}')`;
}
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-jdbc-driver/src/JDBCDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ export class JDBCDriver extends BaseDriver {
reject(err);
return;
}

const rowStream = new QueryStream(res.rows.next, highWaterMark);
resolve({
rowStream,
Expand Down Expand Up @@ -322,6 +323,7 @@ export class JDBCDriver extends BaseDriver {
if (options.streamImport) {
return this.stream(query, values, options);
}

return super.downloadQueryResults(query, values, options);
}

Expand Down
16 changes: 13 additions & 3 deletions packages/cubejs-jdbc-driver/src/QueryStream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
/* eslint-disable import/no-extraneous-dependencies */
import { Readable } from 'stream';
import { getEnv } from '@cubejs-backend/shared';

export type Row = {
[field: string]: boolean | number | string
Expand All @@ -25,6 +23,18 @@ export class QueryStream extends Readable {
this.next = nextFn;
}

protected transformRow(row: any) {
// eslint-disable-next-line no-restricted-syntax
for (const [name, field] of Object.entries(row)) {
// console.log({ name, field });
if (field instanceof Int8Array) {
row[name] = Buffer.from(field).toString('base64');
}
}

return row;
}

/**
* @override
*/
Expand All @@ -34,7 +44,7 @@ export class QueryStream extends Readable {
if (this.next) {
const row = this.next();
if (row.value) {
this.push(row.value);
this.push(this.transformRow(row.value));
}
if (row.done) {
this.push(null);
Expand Down
15 changes: 15 additions & 0 deletions packages/cubejs-testing-drivers/fixtures/_schemas.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
"type": "count",
"sql": "customer_id"
},
{
"name": "countApproxByCustomer",
"type": "count_distinct_approx",
"sql": "customer_id"
},
{
"name": "runningTotal",
"type": "count",
Expand Down Expand Up @@ -127,6 +132,11 @@
"type": "count",
"sql": "customer_id"
},
{
"name": "countApproxByCustomer",
"type": "count_distinct_approx",
"sql": "customer_id"
},
{
"name": "totalQuantity",
"sql": "quantity",
Expand Down Expand Up @@ -244,6 +254,11 @@
"type": "count",
"sql": "customer_id"
},
{
"name": "countApproxByCustomer",
"type": "count_distinct_approx",
"sql": "customer_id"
},
{
"name": "totalQuantity",
"sql": "quantity",
Expand Down
20 changes: 20 additions & 0 deletions packages/cubejs-testing-drivers/fixtures/athena.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
],
"BigECommerce": [
Expand All @@ -77,6 +87,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
]
},
Expand Down
20 changes: 20 additions & 0 deletions packages/cubejs-testing-drivers/fixtures/bigquery.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
],
"BigECommerce": [
Expand All @@ -78,6 +88,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
]
},
Expand Down
20 changes: 20 additions & 0 deletions packages/cubejs-testing-drivers/fixtures/databricks-jdbc.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
],
"BigECommerce": [
Expand All @@ -88,6 +98,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
]
},
Expand Down
22 changes: 21 additions & 1 deletion packages/cubejs-testing-drivers/fixtures/postgres.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"ports" : ["4000", "5656"]
},
"data": {
"image": "postgres:13",
"image": "hbontempo/postgres-hll:16-v2.18",
"environment": [
"POSTGRES_PASSWORD=test",
"POSTGRES_USER=test",
Expand Down Expand Up @@ -74,6 +74,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
],
"BigECommerce": [
Expand All @@ -89,6 +99,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
]
},
Expand Down
20 changes: 20 additions & 0 deletions packages/cubejs-testing-drivers/fixtures/snowflake.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
],
"BigECommerce": [
Expand All @@ -89,6 +99,16 @@
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "CountByProduct",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.countApproxByCustomer"
]
}
]
},
Expand Down
4 changes: 2 additions & 2 deletions packages/cubejs-testing-drivers/src/dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ export const BigECommerce = {
select 7293 as row_id, 'CA-2017-109183' as order_id, ${DATE_PREFIX}'2020-12-04'${DATE_SUFFIX} as order_date, 'LR-16915' as customer_id, 'Columbus' as city, 'Technology' as category, 'Machines' as sub_category, 'Okidata C610n Printer' as product_name, 649.00000 as sales, 2 as quantity, 0.50000 as discount, -272.58000 as profit, ${falseLiteral} as is_returning
`;
if (!GENERATE_BIG_SERIES) {
return `SELECT row_id as id, row_id, order_id, order_date, city, category, sub_category, product_name, sales, quantity, discount, profit, is_returning from (${data}) d`;
return `SELECT row_id as id, row_id, order_id, order_date, city, category, sub_category, product_name, customer_id, sales, quantity, discount, profit, is_returning from (${data}) d`;
}
return `select value * 10000 + row_id as id, row_id, order_id, order_date, city, category, sub_category, product_name, sales, quantity, discount, profit, is_returning from ${GENERATE_BIG_SERIES} CROSS JOIN (${data}) d`;
return `select value * 10000 + row_id as id, row_id, order_id, order_date, city, category, sub_category, product_name, customer_id, sales, quantity, discount, profit, is_returning from ${GENERATE_BIG_SERIES} CROSS JOIN (${data}) d`;
},
create: (cast: Cast, name: string, suf?: string) => create(name, BigECommerce.select(cast), cast, suf),
};
30 changes: 23 additions & 7 deletions packages/cubejs-testing-drivers/src/tests/testQueries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ import {
} from '../helpers';
import { incrementalSchemaLoadingSuite } from './testIncrementalSchemaLoading';

export function testQueries(type: string, { includeIncrementalSchemaSuite, extendedEnv }: { includeIncrementalSchemaSuite?: boolean, extendedEnv?: string } = {}): void {
type TestQueriesOptions = {
includeIncrementalSchemaSuite?: boolean,
includeHLLSuite?: boolean,
extendedEnv?: string
};

export function testQueries(type: string, { includeIncrementalSchemaSuite, extendedEnv, includeHLLSuite }: TestQueriesOptions = {}): void {
describe(`Queries with the @cubejs-backend/${type}-driver${extendedEnv ? ` ${extendedEnv}` : ''}`, () => {
jest.setTimeout(60 * 5 * 1000);

Expand All @@ -23,14 +29,14 @@ export function testQueries(type: string, { includeIncrementalSchemaSuite, exten
let driver: BaseDriver;
let queries: string[];
let env: Environment;
let connection: PgClient;

let connectionId = 0;

async function createPostgresClient(user: string, password: string, pgPort: number | undefined) {
if (!pgPort) {
return <any>undefined;
throw new Error('port must be defined');
}

connectionId++;
const currentConnId = connectionId;

Expand All @@ -39,16 +45,16 @@ export function testQueries(type: string, { includeIncrementalSchemaSuite, exten
const conn = new PgClient({
database: 'db',
port: pgPort,
host: 'localhost',
host: '127.0.0.1',
user,
password,
ssl: false,
});
conn.on('error', (err) => {
console.log(err);
console.log(`[pg] #${currentConnId}`, err);
});
conn.on('end', () => {
console.debug(`[pg] end ${currentConnId}`);
console.debug(`[pg] #${currentConnId} end`);
});

await conn.connect();
Expand Down Expand Up @@ -106,7 +112,6 @@ export function testQueries(type: string, { includeIncrementalSchemaSuite, exten
console.log('Error creating fixtures', e.stack);
throw e;
}
connection = await createPostgresClient('admin', 'admin_password', env.cube.pgPort);
});

afterAll(async () => {
Expand Down Expand Up @@ -147,6 +152,14 @@ export function testQueries(type: string, { includeIncrementalSchemaSuite, exten
preAggregations: ['BigECommerce.TAExternal'],
contexts: [{ securityContext: { tenant: 't1' } }],
});

if (includeHLLSuite) {
await buildPreaggs(env.cube.port, apiToken, {
timezones: ['UTC'],
preAggregations: ['BigECommerce.CountByProductExternal'],
contexts: [{ securityContext: { tenant: 't1' } }],
});
}
});

execute('must not fetch a hidden cube', async () => {
Expand Down Expand Up @@ -1500,6 +1513,7 @@ export function testQueries(type: string, { includeIncrementalSchemaSuite, exten
}

executePg('SQL API: powerbi min max push down', async () => {
const connection = await createPostgresClient('admin', 'admin_password', env.cube.pgPort);
const res = await connection.query(`
select
max("rows"."orderDate") as "a0",
Expand All @@ -1516,6 +1530,7 @@ from
});

executePg('SQL API: powerbi min max ungrouped flag', async () => {
const connection = await createPostgresClient('admin', 'admin_password', env.cube.pgPort);
const res = await connection.query(`
select
count(distinct("rows"."totalSales")) + max(
Expand All @@ -1538,6 +1553,7 @@ from
});

executePg('SQL API: ungrouped pre-agg', async () => {
const connection = await createPostgresClient('admin', 'admin_password', env.cube.pgPort);
const res = await connection.query(`
select
"productName",
Expand Down

0 comments on commit da231ed

Please sign in to comment.