Skip to content

Commit

Permalink
fix(clickhouse-driver): Initial support for DateTime64, fix #7537 (#7538
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ovr committed Dec 18, 2023
1 parent 75e201d commit 401e9e1
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 80 deletions.
12 changes: 6 additions & 6 deletions .github/actions/integration/clickhouse.sh
Expand Up @@ -4,23 +4,23 @@ set -eo pipefail
# Debug log for test containers
export DEBUG=testcontainers

export TEST_CLICKHOUSE_VERSION=21.1.2
export TEST_CLICKHOUSE_VERSION=23.11

echo "::group::Clickhouse ${TEST_CLICKHOUSE_VERSION}";
docker pull yandex/clickhouse-server:${TEST_CLICKHOUSE_VERSION}
docker pull clickhouse/clickhouse-server:${TEST_CLICKHOUSE_VERSION}
yarn lerna run --concurrency 1 --stream --no-prefix integration:clickhouse
echo "::endgroup::"

export TEST_CLICKHOUSE_VERSION=20.6
export TEST_CLICKHOUSE_VERSION=22.8

echo "::group::Clickhouse ${TEST_CLICKHOUSE_VERSION}";
docker pull yandex/clickhouse-server:${TEST_CLICKHOUSE_VERSION}
docker pull clickhouse/clickhouse-server:${TEST_CLICKHOUSE_VERSION}
yarn lerna run --concurrency 1 --stream --no-prefix integration:clickhouse
echo "::endgroup::"

export TEST_CLICKHOUSE_VERSION=19
export TEST_CLICKHOUSE_VERSION=21.8

echo "::group::Clickhouse ${TEST_CLICKHOUSE_VERSION}";
docker pull yandex/clickhouse-server:${TEST_CLICKHOUSE_VERSION}
docker pull clickhouse/clickhouse-server:${TEST_CLICKHOUSE_VERSION}
yarn lerna run --concurrency 1 --stream --no-prefix integration:clickhouse
echo "::endgroup::"
4 changes: 2 additions & 2 deletions packages/cubejs-clickhouse-driver/package.json
Expand Up @@ -31,16 +31,16 @@
"@cubejs-backend/base-driver": "^0.34.33",
"@cubejs-backend/shared": "^0.34.33",
"generic-pool": "^3.6.0",
"moment": "^2.24.0",
"sqlstring": "^2.3.1",
"uuid": "^8.3.2"
},
"license": "Apache-2.0",
"devDependencies": {
"@cubejs-backend/testing-shared": "^0.34.35",
"@cubejs-backend/linter": "^0.34.25",
"@types/jest": "^27",
"jest": "27",
"stream-to-array": "^2.3.0",
"testcontainers": "^8.12",
"typescript": "~5.2.2"
},
"publishConfig": {
Expand Down
32 changes: 9 additions & 23 deletions packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts
Expand Up @@ -21,7 +21,9 @@ import {
import genericPool, { Pool } from 'generic-pool';
import { v4 as uuidv4 } from 'uuid';
import sqlstring from 'sqlstring';
import { HydrationStream } from './HydrationStream';
import * as moment from 'moment';

import { HydrationStream, transformRow } from './HydrationStream';

const ClickHouse = require('@apla/clickhouse');

Expand Down Expand Up @@ -225,30 +227,14 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
}

protected normaliseResponse(res: any) {
//
//
// ClickHouse returns DateTime as strings in format "YYYY-DD-MM HH:MM:SS"
// cube.js expects them in format "YYYY-DD-MMTHH:MM:SS.000", so translate them based on the metadata returned
//
// ClickHouse returns some number types as js numbers, others as js string, normalise them all to strings
//
//
if (res.data) {
const meta = res.meta.reduce(
(state: any, element: any) => ({ [element.name]: element, ...state }),
{}
);

res.data.forEach((row: any) => {
Object.keys(row).forEach(field => {
const value = row[field];
if (value !== null) {
const meta = res.meta.find((m: any) => m.name === field);
if (meta.type.includes('DateTime')) {
row[field] = `${value.substring(0, 10)}T${value.substring(11, 22)}.000`;
} else if (meta.type.includes('Date')) {
row[field] = `${value}T00:00:00.000`;
} else if (meta.type.includes('Int') || meta.type.includes('Float') || meta.type.includes('Decimal')) {
// convert all numbers into strings
row[field] = `${value}`;
}
}
});
transformRow(row, meta);
});
}
return res.data;
Expand Down
44 changes: 26 additions & 18 deletions packages/cubejs-clickhouse-driver/src/HydrationStream.ts
@@ -1,29 +1,37 @@
/* eslint-disable no-restricted-syntax */
import stream, { TransformCallback } from 'stream';
import * as moment from 'moment';

export type HydrationMap = Record<string, any>;
// ClickHouse returns DateTime as strings in format "YYYY-DD-MM HH:MM:SS"
// cube.js expects them in format "YYYY-DD-MMTHH:MM:SS.000", so translate them based on the metadata returned
//
// ClickHouse returns some number types as js numbers, others as js string, normalise them all to strings
export function transformRow(row: Record<string, any>, meta: any) {
for (const [fieldName, value] of Object.entries(row)) {
if (value !== null) {
const metaForField = meta[fieldName];
if (metaForField.type === 'DateTime') {
row[fieldName] = `${value.substring(0, 10)}T${value.substring(11, 22)}.000`;
} else if (metaForField.type.includes('DateTime64')) {
row[fieldName] = moment.utc(value).format(moment.HTML5_FMT.DATETIME_LOCAL_MS);
} else if (metaForField.type.includes('Date')) {
row[fieldName] = `${value}T00:00:00.000`;
} else if (metaForField.type.includes('Int')
|| metaForField.type.includes('Float')
|| metaForField.type.includes('Decimal')
) {
// convert all numbers into strings
row[fieldName] = `${value}`;
}
}
}
}

export class HydrationStream extends stream.Transform {
public constructor(meta: any) {
super({
objectMode: true,
transform(row: any[], encoding: BufferEncoding, callback: TransformCallback) {
for (const [index, value] of Object.entries(row)) {
if (value !== null) {
const metaForField = meta[index];
if (metaForField.type.includes('DateTime')) {
row[<any>index] = `${value.substring(0, 10)}T${value.substring(11, 22)}.000`;
} else if (metaForField.type.includes('Date')) {
row[<any>index] = `${value}T00:00:00.000`;
} else if (metaForField.type.includes('Int')
|| metaForField.type.includes('Float')
|| metaForField.type.includes('Decimal')
) {
// convert all numbers into strings
row[<any>index] = `${value}`;
}
}
}
transformRow(row, meta);

this.push(row);
callback();
Expand Down
56 changes: 27 additions & 29 deletions packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts
@@ -1,10 +1,11 @@
import { GenericContainer } from 'testcontainers';
import { ClickhouseDBRunner } from '@cubejs-backend/testing-shared';
import { streamToArray } from '@cubejs-backend/shared';

import { ClickHouseDriver } from '../src/ClickHouseDriver';

const streamToArray = require('stream-to-array');
import { ClickHouseDriver } from '../src';

describe('ClickHouseDriver', () => {
jest.setTimeout(20 * 1000);

let container: any;
let config: any;

Expand All @@ -20,13 +21,7 @@ describe('ClickHouseDriver', () => {

// eslint-disable-next-line func-names
beforeAll(async () => {
jest.setTimeout(20 * 1000);

const version = process.env.TEST_CLICKHOUSE_VERSION || 'latest';

container = await new GenericContainer(`yandex/clickhouse-server:${version}`)
.withExposedPorts(8123)
.start();
container = await ClickhouseDBRunner.startContainer({});

config = {
host: 'localhost',
Expand All @@ -35,13 +30,14 @@ describe('ClickHouseDriver', () => {

await doWithDriver(async (driver) => {
await driver.createSchemaIfNotExists('test');
// Unsupported in old servers
// datetime64 DateTime64,
await driver.query(
`
CREATE TABLE test.types_test (
date Date,
datetime DateTime,
datetime64_millis DateTime64(3, 'UTC'),
datetime64_micros DateTime64(6, 'UTC'),
datetime64_nanos DateTime64(9, 'UTC'),
int8 Int8,
int16 Int16,
int32 Int32,
Expand All @@ -60,17 +56,17 @@ describe('ClickHouseDriver', () => {
[]
);

await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
'2020-01-01', '2020-01-01 00:00:00', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.01, 1.01, 1.01
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00.000', '2020-01-01 00:00:00.000000', '2020-01-01 00:00:00.000000000', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.01, 1.01, 1.01
]);
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
'2020-01-02', '2020-01-02 00:00:00', 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2.02, 2.02, 2.02
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
'2020-01-02', '2020-01-02 00:00:00', '2020-01-02 00:00:00.123', '2020-01-02 00:00:00.123456', '2020-01-02 00:00:00.123456789', 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2.02, 2.02, 2.02
]);
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
'2020-01-03', '2020-01-03 00:00:00', 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3.03, 3.03, 3.03
await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [
'2020-01-03', '2020-01-03 00:00:00', '2020-01-03 00:00:00.234', '2020-01-03 00:00:00.234567', '2020-01-03 00:00:00.234567890', 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3.03, 3.03, 3.03
]);
});
});
}, 30 * 1000);

// eslint-disable-next-line func-names
afterAll(async () => {
Expand All @@ -83,7 +79,7 @@ describe('ClickHouseDriver', () => {
if (container) {
await container.stop();
}
});
}, 30 * 1000);

it('should construct', async () => {
await doWithDriver(async () => {
Expand Down Expand Up @@ -160,8 +156,9 @@ describe('ClickHouseDriver', () => {
expect(values).toEqual([{
date: '2020-01-01T00:00:00.000',
datetime: '2020-01-01T00:00:00.000',
// Unsupported in old servers
// datetime64: '2020-01-01T00:00:00.00.000',
datetime64_millis: '2020-01-01T00:00:00.000',
datetime64_micros: '2020-01-01T00:00:00.000',
datetime64_nanos: '2020-01-01T00:00:00.000',
int8: '1',
int16: '1',
int32: '1',
Expand Down Expand Up @@ -252,8 +249,9 @@ describe('ClickHouseDriver', () => {
expect(tableData.types).toEqual([
{ name: 'date', type: 'date' },
{ name: 'datetime', type: 'timestamp' },
// Unsupported in old servers
// { name: 'datetime64', type: 'timestamp' },
{ name: 'datetime64_millis', type: 'timestamp' },
{ name: 'datetime64_micros', type: 'timestamp' },
{ name: 'datetime64_nanos', type: 'timestamp' },
{ name: 'int8', type: 'int' },
{ name: 'int16', type: 'int' },
{ name: 'int32', type: 'int' },
Expand All @@ -268,10 +266,10 @@ describe('ClickHouseDriver', () => {
{ name: 'decimal64', type: 'decimal' },
{ name: 'decimal128', type: 'decimal' },
]);
expect(await streamToArray(tableData.rowStream)).toEqual([
['2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1.01', '1.01', '1.01'],
['2020-01-02T00:00:00.000', '2020-01-02T00:00:00.000', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2.02', '2.02', '2.02'],
['2020-01-03T00:00:00.000', '2020-01-03T00:00:00.000', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3.03', '3.03', '3.03'],
expect(await streamToArray(tableData.rowStream as any)).toEqual([
['2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1.01', '1.01', '1.01'],
['2020-01-02T00:00:00.000', '2020-01-02T00:00:00.000', '2020-01-02T00:00:00.123', '2020-01-02T00:00:00.123', '2020-01-02T00:00:00.123', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2.02', '2.02', '2.02'],
['2020-01-03T00:00:00.000', '2020-01-03T00:00:00.000', '2020-01-03T00:00:00.234', '2020-01-03T00:00:00.234', '2020-01-03T00:00:00.234', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3.03', '3.03', '3.03'],
]);
} finally {
// @ts-ignore
Expand Down
Expand Up @@ -68,9 +68,9 @@ export class ClickHouseDbRunner {

testQueries = async (queries, prepareDataSet) => {
if (!this.container && !process.env.TEST_CLICKHOUSE_HOST) {
const version = process.env.TEST_CLICKHOUSE_VERSION || '21.1.2';
const version = process.env.TEST_CLICKHOUSE_VERSION || '23.11';

this.container = await new GenericContainer(`yandex/clickhouse-server:${version}`)
this.container = await new GenericContainer(`clickhouse/clickhouse-server:${version}`)
.withExposedPorts(8123)
.start();
}
Expand Down
26 changes: 26 additions & 0 deletions packages/cubejs-testing-shared/src/db/clickhouse.ts
@@ -0,0 +1,26 @@
import { GenericContainer, Wait } from 'testcontainers';

import { DbRunnerAbstract, DBRunnerContainerOptions } from './db-runner.abstract';

type ClickhouseStartOptions = DBRunnerContainerOptions & {
version?: string,
};

export class ClickhouseDBRunner extends DbRunnerAbstract {
public static startContainer(options: ClickhouseStartOptions) {
const version = process.env.TEST_CLICKHOUSE_VERSION || options.version || '23.11';

const container = new GenericContainer(`clickhouse/clickhouse-server:${version}`)
.withExposedPorts(8123)
.withStartupTimeout(10 * 1000);

if (options.volumes) {
// eslint-disable-next-line no-restricted-syntax
for (const { source, target, bindMode } of options.volumes) {
container.withBindMount(source, target, bindMode);
}
}

return container.start();
}
}
1 change: 1 addition & 0 deletions packages/cubejs-testing-shared/src/db/index.ts
@@ -1,6 +1,7 @@
export * from './mysql';
export * from './postgres';
export * from './cubestore';
export * from './clickhouse';
export * from './questdb';
export * from './materialize';
export * from './crate';
Expand Down

0 comments on commit 401e9e1

Please sign in to comment.