diff --git a/examples/README.md b/examples/README.md index 42adf0fa..8901595e 100644 --- a/examples/README.md +++ b/examples/README.md @@ -22,6 +22,7 @@ If something is missing, or you found a mistake in one of these examples, please - [read_only_user.ts](read_only_user.ts) - an example of using the client with a read-only user, with possible read-only user limitations highlights. - [basic_tls.ts](node/basic_tls.ts) - (Node.js only) using certificates for basic TLS authentication. - [mutual_tls.ts](node/mutual_tls.ts) - (Node.js only) using certificates for mutual TLS authentication. +- [custom_json_handling.ts](custom_json_handling.ts) - Customize JSON serialization/deserialization by providing a custom `parse` and `stringify` function. This is particularly useful when working with obscure data formats like `bigint`s. #### Creating tables diff --git a/examples/custom_json_handling.ts b/examples/custom_json_handling.ts new file mode 100644 index 00000000..f11e50cc --- /dev/null +++ b/examples/custom_json_handling.ts @@ -0,0 +1,65 @@ +import { createClient } from '@clickhouse/client' // or '@clickhouse/client-web' + +/** + * Similar to `examples/insert_js_dates.ts` but testing custom JSON handling + * + * JSON.stringify does not handle BigInt data types by default, so we'll provide + * a custom serializer before passing it to the JSON.stringify function. + * + * This example also shows how you can serialize Date objects in a custom way. + */ +void (async () => { + const valueSerializer = (value: unknown): unknown => { + if (value instanceof Date) { + // if you would have put this in the `replacer` parameter of JSON.stringify, (e.x: JSON.stringify(obj, replacerFn)) + // it would have been an ISO string, but since we are serializing before `stringify`ing, + // it will convert it before the `.toJSON()` method has been called + return value.getTime() + } + + if (typeof value === 'bigint') { + return value.toString() + } + + if (Array.isArray(value)) { + return value.map(valueSerializer) + } + + return value + } + + const tableName = 'inserts_custom_json_handling' + const client = createClient({ + json: { + parse: JSON.parse, + stringify: (obj: unknown) => JSON.stringify(valueSerializer(obj)), + }, + }) + await client.command({ + query: `DROP TABLE IF EXISTS ${tableName}`, + }) + await client.command({ + query: ` + CREATE TABLE ${tableName} + (id UInt64, dt DateTime64(3, 'UTC')) + ENGINE MergeTree() + ORDER BY (id) + `, + }) + await client.insert({ + table: tableName, + values: [ + { + id: BigInt(250000000000000200), + dt: new Date(), + }, + ], + format: 'JSONEachRow', + }) + const rows = await client.query({ + query: `SELECT * FROM ${tableName}`, + format: 'JSONEachRow', + }) + console.info(await rows.json()) + await client.close() +})() diff --git a/packages/client-common/__tests__/integration/data_types.test.ts b/packages/client-common/__tests__/integration/data_types.test.ts index ee6a6b26..2c538e79 100644 --- a/packages/client-common/__tests__/integration/data_types.test.ts +++ b/packages/client-common/__tests__/integration/data_types.test.ts @@ -216,6 +216,58 @@ describe('data types', () => { }) }) + it('should work with custom JSON handling (BigInt and Date)', async () => { + const TEST_BIGINT = BigInt(25000000000000000) + const TEST_DATE = new Date('2023-12-06T10:54:48.123Z') + const values = [ + { + big_id: TEST_BIGINT, + dt: TEST_DATE, + }, + ] + + const valueSerializer = (value: unknown): unknown => { + if (value instanceof Date) { + return value.getTime() + } + if (typeof value === 'bigint') { + return value.toString() + } + if (Array.isArray(value)) { + return value.map(valueSerializer) + } + if (value && typeof value === 'object') { + return Object.fromEntries( + Object.entries(value).map(([k, v]) => [k, valueSerializer(v)]), + ) + } + return value + } + + // modify the client to handle BigInt and Date serialization + client = createTestClient({ + json: { + parse: JSON.parse, + stringify: (obj: unknown) => { + const seralized = valueSerializer(obj) + return JSON.stringify(seralized) + }, + }, + }) + + const table = await createTableWithFields( + client, + "big_id UInt64, dt DateTime64(3, 'UTC')", + ) + + await insertAndAssert(table, values, {}, [ + { + dt: TEST_DATE.toISOString().replace('T', ' ').replace('Z', ''), // clickhouse returns DateTime64 in UTC without timezone info + big_id: TEST_BIGINT.toString(), // clickhouse by default returns UInt64 as string to be safe + }, + ]) + }) + it('should work with string enums', async () => { const values = [ { e1: 'Foo', e2: 'Qaz' }, @@ -753,8 +805,9 @@ describe('data types', () => { table: string, data: T[], clickhouse_settings: ClickHouseSettings = {}, + expectedDataBack?: unknown[], ) { await insertData(table, data, clickhouse_settings) - await assertData(table, data, clickhouse_settings) + await assertData(table, expectedDataBack ?? data, clickhouse_settings) } }) diff --git a/packages/client-common/__tests__/unit/config.test.ts b/packages/client-common/__tests__/unit/config.test.ts index 375790e1..ff012b9a 100644 --- a/packages/client-common/__tests__/unit/config.test.ts +++ b/packages/client-common/__tests__/unit/config.test.ts @@ -376,6 +376,10 @@ describe('config', () => { keep_alive: { enabled: true }, application_id: undefined, http_headers: {}, + json: { + parse: JSON.parse, + stringify: JSON.stringify, + }, }) }) @@ -426,6 +430,10 @@ describe('config', () => { log_writer: jasmine.any(LogWriter), keep_alive: { enabled: false }, application_id: 'my_app', + json: { + parse: JSON.parse, + stringify: JSON.stringify, + }, }) }) @@ -496,6 +504,10 @@ describe('config', () => { keep_alive: { enabled: true }, application_id: undefined, http_headers: {}, + json: { + parse: JSON.parse, + stringify: JSON.stringify, + }, }) }) }) diff --git a/packages/client-common/src/client.ts b/packages/client-common/src/client.ts index acc5eddb..5a56fc2c 100644 --- a/packages/client-common/src/client.ts +++ b/packages/client-common/src/client.ts @@ -10,11 +10,16 @@ import type { WithClickHouseSummary, WithResponseHeaders, } from '@clickhouse/client-common' -import { type DataFormat, DefaultLogger } from '@clickhouse/client-common' +import { + type DataFormat, + defaultJSONHandling, + DefaultLogger, +} from '@clickhouse/client-common' import type { InsertValues, NonEmptyArray } from './clickhouse_types' import type { ImplementationDetails, ValuesEncoder } from './config' import { getConnectionParams, prepareConfigWithURL } from './config' import type { ConnPingResult } from './connection' +import type { JSONHandling } from './parse/json_handling' import type { BaseResultSet } from './result' export interface BaseQueryParams { @@ -170,6 +175,7 @@ export class ClickHouseClient { private readonly sessionId?: string private readonly role?: string | Array private readonly logWriter: LogWriter + private readonly jsonHandling: JSONHandling constructor( config: BaseClickHouseClientConfigOptions & ImplementationDetails, @@ -192,7 +198,12 @@ export class ClickHouseClient { this.connectionParams, ) this.makeResultSet = config.impl.make_result_set - this.valuesEncoder = config.impl.values_encoder + this.jsonHandling = { + ...defaultJSONHandling, + ...config.json, + } + + this.valuesEncoder = config.impl.values_encoder(this.jsonHandling) } /** @@ -231,6 +242,7 @@ export class ClickHouseClient { }) }, response_headers, + this.jsonHandling, ) } diff --git a/packages/client-common/src/config.ts b/packages/client-common/src/config.ts index 86bb34bd..5be63be9 100644 --- a/packages/client-common/src/config.ts +++ b/packages/client-common/src/config.ts @@ -3,6 +3,7 @@ import type { Connection, ConnectionParams } from './connection' import type { DataFormat } from './data_formatter' import type { Logger } from './logger' import { ClickHouseLogLevel, LogWriter } from './logger' +import { defaultJSONHandling, type JSONHandling } from './parse/json_handling' import type { BaseResultSet } from './result' import type { ClickHouseSettings } from './settings' @@ -86,6 +87,12 @@ export interface BaseClickHouseClientConfigOptions { * @default true */ enabled?: boolean } + /** + * Custom parsing when handling with JSON objects + * + * Defaults to using standard `JSON.parse` and `JSON.stringify` + */ + json?: Partial } export type MakeConnection< @@ -102,8 +109,13 @@ export type MakeResultSet = < query_id: string, log_error: (err: Error) => void, response_headers: ResponseHeaders, + jsonHandling: JSONHandling, ) => ResultSet +export type MakeValuesEncoder = ( + jsonHandling: JSONHandling, +) => ValuesEncoder + export interface ValuesEncoder { validateInsertValues( values: InsertValues, @@ -150,7 +162,7 @@ export interface ImplementationDetails { impl: { make_connection: MakeConnection make_result_set: MakeResultSet - values_encoder: ValuesEncoder + values_encoder: MakeValuesEncoder handle_specific_url_params?: HandleImplSpecificURLParams } } @@ -241,6 +253,10 @@ export function getConnectionParams( keep_alive: { enabled: config.keep_alive?.enabled ?? true }, clickhouse_settings: config.clickhouse_settings ?? {}, http_headers: config.http_headers ?? {}, + json: { + ...defaultJSONHandling, + ...config.json, + }, } } diff --git a/packages/client-common/src/connection.ts b/packages/client-common/src/connection.ts index 24dcb859..64e2d5ce 100644 --- a/packages/client-common/src/connection.ts +++ b/packages/client-common/src/connection.ts @@ -1,3 +1,4 @@ +import type { JSONHandling } from '.' import type { WithClickHouseSummary, WithResponseHeaders, @@ -21,6 +22,7 @@ export interface ConnectionParams { application_id?: string http_headers?: Record auth: ConnectionAuth + json?: JSONHandling } export interface CompressionSettings { diff --git a/packages/client-common/src/data_formatter/formatter.ts b/packages/client-common/src/data_formatter/formatter.ts index c6aa7389..288a455c 100644 --- a/packages/client-common/src/data_formatter/formatter.ts +++ b/packages/client-common/src/data_formatter/formatter.ts @@ -1,3 +1,5 @@ +import type { JSONHandling } from '../parse' + export const StreamableJSONFormats = [ 'JSONEachRow', 'JSONStringsEachRow', @@ -112,9 +114,13 @@ export function validateStreamFormat( * @param format One of the supported JSON formats: https://clickhouse.com/docs/en/interfaces/formats/ * @returns string */ -export function encodeJSON(value: any, format: DataFormat): string { +export function encodeJSON( + value: any, + format: DataFormat, + stringifyFn: JSONHandling['stringify'], +): string { if ((SupportedJSONFormats as readonly string[]).includes(format)) { - return JSON.stringify(value) + '\n' + return stringifyFn(value) + '\n' } throw new Error( `The client does not support JSON encoding in [${format}] format.`, diff --git a/packages/client-common/src/index.ts b/packages/client-common/src/index.ts index f0107856..7951b2df 100644 --- a/packages/client-common/src/index.ts +++ b/packages/client-common/src/index.ts @@ -80,8 +80,13 @@ export type { ParsedColumnTuple, ParsedColumnMap, ParsedColumnType, + JSONHandling, +} from './parse' +export { + SimpleColumnTypes, + parseColumnType, + defaultJSONHandling, } from './parse' -export { SimpleColumnTypes, parseColumnType } from './parse' /** For implementation usage only - should not be re-exported */ export { diff --git a/packages/client-common/src/parse/index.ts b/packages/client-common/src/parse/index.ts index 12260e55..3fb05308 100644 --- a/packages/client-common/src/parse/index.ts +++ b/packages/client-common/src/parse/index.ts @@ -1 +1,2 @@ export * from './column_types' +export * from './json_handling' diff --git a/packages/client-common/src/parse/json_handling.ts b/packages/client-common/src/parse/json_handling.ts new file mode 100644 index 00000000..e5653d05 --- /dev/null +++ b/packages/client-common/src/parse/json_handling.ts @@ -0,0 +1,23 @@ +export interface JSONHandling { + /** + * Custom parser for JSON strings + * + * @param input stringified JSON + * @default JSON.parse // See {@link JSON.parse} + * @returns parsed object + */ + parse: (input: string) => T + /** + * Custom stringifier for JSON objects + * + * @param input any JSON-compatible object + * @default JSON.stringify // See {@link JSON.stringify} + * @returns stringified JSON + */ + stringify: (input: T) => string // T is any because it can LITERALLY be anything +} + +export const defaultJSONHandling: JSONHandling = { + parse: JSON.parse, + stringify: JSON.stringify, +} diff --git a/packages/client-node/__tests__/integration/node_stream_json_formats.test.ts b/packages/client-node/__tests__/integration/node_stream_json_formats.test.ts index 3810ea86..5d351cc1 100644 --- a/packages/client-node/__tests__/integration/node_stream_json_formats.test.ts +++ b/packages/client-node/__tests__/integration/node_stream_json_formats.test.ts @@ -4,6 +4,7 @@ import { assertJsonValues, jsonValues } from '@test/fixtures/test_data' import { createTestClient, guid } from '@test/utils' import { requireServerVersionAtLeast } from '@test/utils/jasmine' import Stream from 'stream' +import * as simdjson from 'simdjson' import { makeObjectStream } from '../utils/stream' describe('[Node.js] stream JSON formats', () => { @@ -330,6 +331,225 @@ describe('[Node.js] stream JSON formats', () => { }), ) }) + + describe('custom JSON handling', () => { + it('should use custom stringify when inserting with JSONEachRow stream', async () => { + let stringifyCalls = 0 + const customClient = createTestClient({ + json: { + parse: JSON.parse, + stringify: (value) => { + stringifyCalls++ + return JSON.stringify(value) + }, + }, + }) + + const stream = makeObjectStream() + stream.push({ id: '42', name: 'foo', sku: [0, 1] }) + stream.push({ id: '43', name: 'bar', sku: [2, 3] }) + setTimeout(() => stream.push(null), 100) + + await customClient.insert({ + table: tableName, + values: stream, + format: 'JSONEachRow', + }) + + expect(stringifyCalls).toBe(2) + + const result = await customClient.query({ + query: `SELECT * FROM ${tableName} ORDER BY id ASC`, + format: 'JSONEachRow', + }) + expect(await result.json()).toEqual([ + { id: '42', name: 'foo', sku: [0, 1] }, + { id: '43', name: 'bar', sku: [2, 3] }, + ]) + + await customClient.close() + }) + + it('should use custom stringify when inserting with JSONEachRow array', async () => { + let stringifyCalls = 0 + const customClient = createTestClient({ + json: { + parse: JSON.parse, + stringify: (value) => { + stringifyCalls++ + return JSON.stringify(value) + }, + }, + }) + + const values = [ + { id: '42', name: 'foo', sku: [0, 1] }, + { id: '43', name: 'bar', sku: [2, 3] }, + ] + + await customClient.insert({ + table: tableName, + values, + format: 'JSONEachRow', + }) + + expect(stringifyCalls).toBe(2) + + const result = await customClient.query({ + query: `SELECT * FROM ${tableName} ORDER BY id ASC`, + format: 'JSONEachRow', + }) + expect(await result.json()).toEqual(values) + + await customClient.close() + }) + + it('should use custom parse when querying with JSONEachRow', async () => { + let parseCalls = 0 + const customClient = createTestClient({ + json: { + parse: (text) => { + parseCalls++ + return JSON.parse(text) + }, + stringify: JSON.stringify, + }, + }) + + const stream = makeObjectStream() + stream.push({ id: '42', name: 'foo', sku: [0, 1] }) + stream.push({ id: '43', name: 'bar', sku: [2, 3] }) + setTimeout(() => stream.push(null), 100) + + await customClient.insert({ + table: tableName, + values: stream, + format: 'JSONEachRow', + }) + + parseCalls = 0 + const result = await customClient.query({ + query: `SELECT * FROM ${tableName} ORDER BY id ASC`, + format: 'JSONEachRow', + }) + await result.json() + + expect(parseCalls).toBeGreaterThan(0) + + await customClient.close() + }) + + it('should work with simdjson parser', async () => { + const customClient = createTestClient({ + json: { + parse: simdjson.parse, + stringify: JSON.stringify, + }, + }) + + const stream = makeObjectStream() + stream.push({ id: '42', name: 'foo', sku: [0, 1] }) + stream.push({ id: '43', name: 'bar', sku: [2, 3] }) + setTimeout(() => stream.push(null), 100) + + await customClient.insert({ + table: tableName, + values: stream, + format: 'JSONEachRow', + }) + + const result = await customClient.query({ + query: `SELECT * FROM ${tableName} ORDER BY id ASC`, + format: 'JSONEachRow', + }) + expect(await result.json()).toEqual([ + { id: '42', name: 'foo', sku: [0, 1] }, + { id: '43', name: 'bar', sku: [2, 3] }, + ]) + + await customClient.close() + }) + + it('should use custom stringify with JSONCompactEachRow', async () => { + let stringifyCalls = 0 + const customClient = createTestClient({ + json: { + parse: JSON.parse, + stringify: (value) => { + stringifyCalls++ + return JSON.stringify(value) + }, + }, + }) + + const stream = makeObjectStream() + stream.push(['42', 'foo', [0, 1]]) + stream.push(['43', 'bar', [2, 3]]) + setTimeout(() => stream.push(null), 100) + + await customClient.insert({ + table: tableName, + values: stream, + format: 'JSONCompactEachRow', + }) + + expect(stringifyCalls).toBe(2) + + const result = await customClient.query({ + query: `SELECT * FROM ${tableName} ORDER BY id ASC`, + format: 'JSONCompactEachRow', + }) + expect(await result.json()).toEqual([ + ['42', 'foo', [0, 1]], + ['43', 'bar', [2, 3]], + ]) + + await customClient.close() + }) + + it('should use custom stringify for data transformation', async () => { + const customClient = createTestClient({ + json: { + parse: JSON.parse, + stringify: (value) => { + if ( + typeof value === 'object' && + value !== null && + 'id' in value + ) { + return JSON.stringify({ + ...value, + id: String(Number(value.id) * 10), + }) + } + return JSON.stringify(value) + }, + }, + }) + + const values = [ + { id: '4', name: 'foo', sku: [0, 1] }, + { id: '5', name: 'bar', sku: [2, 3] }, + ] + + await customClient.insert({ + table: tableName, + values, + format: 'JSONEachRow', + }) + + const result = await customClient.query({ + query: `SELECT * FROM ${tableName} ORDER BY id ASC`, + format: 'JSONEachRow', + }) + expect(await result.json()).toEqual([ + { id: '40', name: 'foo', sku: [0, 1] }, + { id: '50', name: 'bar', sku: [2, 3] }, + ]) + + await customClient.close() + }) + }) }) it('does not throw if stream closes prematurely', async () => { diff --git a/packages/client-node/__tests__/unit/node_client.test.ts b/packages/client-node/__tests__/unit/node_client.test.ts index ab33f3fd..69a84f22 100644 --- a/packages/client-node/__tests__/unit/node_client.test.ts +++ b/packages/client-node/__tests__/unit/node_client.test.ts @@ -72,7 +72,13 @@ describe('[Node.js] createClient', () => { ].join('&'), }) expect(createConnectionStub).toHaveBeenCalledWith({ - connection_params: params, + connection_params: { + ...params, + json: { + parse: JSON.parse, + stringify: JSON.stringify, + }, + }, tls: undefined, keep_alive: { enabled: true, @@ -103,6 +109,10 @@ describe('[Node.js] createClient', () => { connection_params: { ...params, url: new URL('https://my.host:8443/my_proxy'), + json: { + parse: JSON.parse, + stringify: JSON.stringify, + }, }, tls: undefined, keep_alive: { @@ -138,6 +148,10 @@ describe('[Node.js] createClient', () => { ...params, url: new URL('https://my.host:8443/my_proxy'), auth: { username, password, type: 'Credentials' }, + json: { + parse: JSON.parse, + stringify: JSON.stringify, + }, }, tls: undefined, keep_alive: { diff --git a/packages/client-node/__tests__/unit/node_values_encoder.test.ts b/packages/client-node/__tests__/unit/node_values_encoder.test.ts index 94e95a45..2109ce09 100644 --- a/packages/client-node/__tests__/unit/node_values_encoder.test.ts +++ b/packages/client-node/__tests__/unit/node_values_encoder.test.ts @@ -5,6 +5,7 @@ import type { } from '@clickhouse/client-common' import Stream from 'stream' import { NodeValuesEncoder } from '../../src/utils' +import * as simdjson from 'simdjson' describe('[Node.js] ValuesEncoder', () => { const rawFormats = [ @@ -46,7 +47,10 @@ describe('[Node.js] ValuesEncoder', () => { 'JSONCompactStringsEachRowWithNamesAndTypes', ] - const encoder = new NodeValuesEncoder() + const encoder = new NodeValuesEncoder({ + parse: JSON.parse, + stringify: JSON.stringify, + }) describe('[Node.js] validateInsertValues', () => { it('should allow object mode stream for JSON* and raw for Tab* or CSV*', async () => { @@ -122,22 +126,154 @@ describe('[Node.js] ValuesEncoder', () => { } }) - it('should encode JSON input', async () => { + it('should encode JSON input (with and without custom JSON handling)', async () => { + const encoders = [ + encoder, + new NodeValuesEncoder({ + parse: simdjson.parse, + stringify: JSON.stringify, // simdjson doesn't have a stringify handler + }), + ] + + for (const encoder of encoders) { + const values: InputJSON = { + meta: [ + { + name: 'name', + type: 'string', + }, + ], + data: [{ name: 'foo' }, { name: 'bar' }], + } + const result = encoder.encodeValues(values, 'JSON') + let encoded = '' + for await (const chunk of result) { + encoded += chunk + } + expect(encoded).toEqual(JSON.stringify(values) + '\n') + } + }) + + it('should use custom stringify for JSON streams', async () => { + const customEncoder = new NodeValuesEncoder({ + parse: JSON.parse, + stringify: (value) => `custom:${JSON.stringify(value)}`, + }) + + const values = Stream.Readable.from([{ name: 'foo' }, { name: 'bar' }], { + objectMode: true, + }) + const result = customEncoder.encodeValues(values, 'JSON') + let encoded = '' + for await (const chunk of result) { + encoded += chunk + } + expect(encoded).toEqual('custom:{"name":"foo"}\ncustom:{"name":"bar"}\n') + }) + + it('should use custom stringify for JSON arrays', async () => { + const customEncoder = new NodeValuesEncoder({ + parse: JSON.parse, + stringify: (value) => `[${JSON.stringify(value)}]`, + }) + + const values = [{ id: 1 }, { id: 2 }] + const result = customEncoder.encodeValues(values, 'JSONEachRow') + let encoded = '' + for await (const chunk of result) { + encoded += chunk + } + expect(encoded).toEqual('[{"id":1}]\n[{"id":2}]\n') + }) + + it('should use custom stringify for InputJSON objects', async () => { + const customEncoder = new NodeValuesEncoder({ + parse: JSON.parse, + stringify: (value) => JSON.stringify(value).toUpperCase(), + }) + const values: InputJSON = { - meta: [ - { - name: 'name', - type: 'string', - }, - ], - data: [{ name: 'foo' }, { name: 'bar' }], + meta: [{ name: 'id', type: 'UInt32' }], + data: [{ id: 1 }], } - const result = encoder.encodeValues(values, 'JSON') + const result = customEncoder.encodeValues(values, 'JSON') let encoded = '' for await (const chunk of result) { encoded += chunk } - expect(encoded).toEqual(JSON.stringify(values) + '\n') + expect(encoded).toEqual(JSON.stringify(values).toUpperCase() + '\n') + }) + + it('should use custom stringify for JSONObjectEachRow', async () => { + const customEncoder = new NodeValuesEncoder({ + parse: JSON.parse, + stringify: (value) => `CUSTOM_${JSON.stringify(value)}`, + }) + + const values: InputJSONObjectEachRow = { + row1: { name: 'test1' }, + row2: { name: 'test2' }, + } + const result = customEncoder.encodeValues(values, 'JSONObjectEachRow') + let encoded = '' + for await (const chunk of result) { + encoded += chunk + } + expect(encoded).toEqual(`CUSTOM_${JSON.stringify(values)}\n`) + }) + + it('should handle custom stringify with complex objects', async () => { + const customEncoder = new NodeValuesEncoder({ + parse: JSON.parse, + stringify: (value) => { + if ( + typeof value === 'object' && + value !== null && + 'timestamp' in value + ) { + return JSON.stringify({ + ...value, + timestamp: new Date(value.timestamp as string).toISOString(), + }) + } + return JSON.stringify(value) + }, + }) + + const values = [ + { id: 1, timestamp: '2024-01-01' }, + { id: 2, timestamp: '2024-01-02' }, + ] + const result = customEncoder.encodeValues(values, 'JSONEachRow') + let encoded = '' + for await (const chunk of result) { + encoded += chunk + } + expect(encoded).toContain('"timestamp":"2024-01-01T00:00:00.000Z"') + expect(encoded).toContain('"timestamp":"2024-01-02T00:00:00.000Z"') + }) + + it('should use custom stringify across different json formats', async () => { + const customEncoder = new NodeValuesEncoder({ + parse: JSON.parse, + stringify: (value) => `>>>${JSON.stringify(value)}<<<`, + }) + + const testFormats = [ + 'JSONEachRow', + 'JSONStringsEachRow', + 'JSONCompactEachRow', + ] + + for (const format of testFormats) { + const values = [{ test: 'data' }] + const result = customEncoder.encodeValues(values, format as DataFormat) + let encoded = '' + for await (const chunk of result) { + encoded += chunk + } + expect(encoded).toEqual('>>>{"test":"data"}<<<\n') + } }) it('should encode JSONObjectEachRow input', async () => { diff --git a/packages/client-node/package.json b/packages/client-node/package.json index 490847a4..39d85229 100644 --- a/packages/client-node/package.json +++ b/packages/client-node/package.json @@ -24,5 +24,8 @@ ], "dependencies": { "@clickhouse/client-common": "*" + }, + "devDependencies": { + "simdjson": "^0.9.2" } } diff --git a/packages/client-node/src/config.ts b/packages/client-node/src/config.ts index bd122109..19fef20e 100644 --- a/packages/client-node/src/config.ts +++ b/packages/client-node/src/config.ts @@ -1,6 +1,7 @@ import type { DataFormat, ImplementationDetails, + JSONHandling, ResponseHeaders, } from '@clickhouse/client-common' import { @@ -131,13 +132,15 @@ export const NodeConfigImpl: Required< tls, }) }, - values_encoder: new NodeValuesEncoder(), + values_encoder: (jsonHandling: JSONHandling) => + new NodeValuesEncoder(jsonHandling), make_result_set: (( stream: Stream.Readable, format: DataFormat, query_id: string, log_error: (err: Error) => void, response_headers: ResponseHeaders, + jsonHandling: JSONHandling, ) => ResultSet.instance({ stream, @@ -145,5 +148,6 @@ export const NodeConfigImpl: Required< query_id, log_error, response_headers, + jsonHandling, })) as any, } diff --git a/packages/client-node/src/connection/node_base_connection.ts b/packages/client-node/src/connection/node_base_connection.ts index 2728057d..f407a076 100644 --- a/packages/client-node/src/connection/node_base_connection.ts +++ b/packages/client-node/src/connection/node_base_connection.ts @@ -11,6 +11,7 @@ import type { ConnOperation, ConnPingResult, ConnQueryResult, + JSONHandling, LogWriter, ResponseHeaders, } from '@clickhouse/client-common' @@ -82,6 +83,7 @@ export abstract class NodeBaseConnection protected readonly defaultAuthHeader: string protected readonly defaultHeaders: Http.OutgoingHttpHeaders + private readonly jsonHandling: JSONHandling private readonly logger: LogWriter private readonly knownSockets = new WeakMap() private readonly idleSocketTTL: number @@ -106,6 +108,10 @@ export abstract class NodeBaseConnection } this.logger = params.log_writer this.idleSocketTTL = params.keep_alive.idle_socket_ttl + this.jsonHandling = params.json ?? { + parse: JSON.parse, + stringify: JSON.stringify, + } } async ping(params: ConnPingParams): Promise { @@ -421,7 +427,7 @@ export abstract class NodeBaseConnection const summaryHeader = response.headers['x-clickhouse-summary'] if (typeof summaryHeader === 'string') { try { - return JSON.parse(summaryHeader) + return this.jsonHandling.parse(summaryHeader) } catch (err) { this.logger.error({ message: `${op}: failed to parse X-ClickHouse-Summary header.`, diff --git a/packages/client-node/src/result_set.ts b/packages/client-node/src/result_set.ts index 1ed02534..535ee64e 100644 --- a/packages/client-node/src/result_set.ts +++ b/packages/client-node/src/result_set.ts @@ -1,6 +1,7 @@ import type { BaseResultSet, DataFormat, + JSONHandling, ResponseHeaders, ResultJSONType, ResultStream, @@ -8,6 +9,7 @@ import type { } from '@clickhouse/client-common' import { checkErrorInChunkAtIndex, + defaultJSONHandling, EXCEPTION_TAG_HEADER_NAME, } from '@clickhouse/client-common' import { @@ -53,6 +55,7 @@ export interface ResultSetOptions { query_id: string log_error: (error: Error) => void response_headers: ResponseHeaders + jsonHandling?: JSONHandling } export class ResultSet @@ -62,6 +65,7 @@ export class ResultSet private readonly exceptionTag: string | undefined = undefined private readonly log_error: (error: Error) => void + private readonly jsonHandling: JSONHandling constructor( private _stream: Stream.Readable, @@ -69,7 +73,12 @@ export class ResultSet public readonly query_id: string, log_error?: (error: Error) => void, _response_headers?: ResponseHeaders, + jsonHandling?: JSONHandling, ) { + this.jsonHandling = { + ...defaultJSONHandling, + ...jsonHandling, + } // eslint-disable-next-line no-console this.log_error = log_error ?? ((err: Error) => console.error(err)) @@ -108,7 +117,7 @@ export class ResultSet // JSON, JSONObjectEachRow, etc. if (isNotStreamableJSONFamily(this.format as DataFormat)) { const text = await getAsText(this._stream) - return JSON.parse(text) + return this.jsonHandling.parse(text) } // should not be called for CSV, etc. throw new Error(`Cannot decode ${this.format} as JSON`) @@ -128,6 +137,7 @@ export class ResultSet let incompleteChunks: Buffer[] = [] const logError = this.log_error const exceptionTag = this.exceptionTag + const jsonHandling = this.jsonHandling const toRows = new Transform({ transform( chunk: Buffer, @@ -163,7 +173,7 @@ export class ResultSet rows.push({ text, json(): T { - return JSON.parse(text) + return jsonHandling.parse(text) }, }) lastIdx = idx + 1 // skipping newline character @@ -207,8 +217,16 @@ export class ResultSet query_id, log_error, response_headers, + jsonHandling, }: ResultSetOptions): ResultSet { - return new ResultSet(stream, format, query_id, log_error, response_headers) + return new ResultSet( + stream, + format, + query_id, + log_error, + response_headers, + jsonHandling, + ) } } diff --git a/packages/client-node/src/utils/encoder.ts b/packages/client-node/src/utils/encoder.ts index fadc60c5..43611ee6 100644 --- a/packages/client-node/src/utils/encoder.ts +++ b/packages/client-node/src/utils/encoder.ts @@ -1,6 +1,7 @@ import type { DataFormat, InsertValues, + JSONHandling, ValuesEncoder, } from '@clickhouse/client-common' import { encodeJSON, isSupportedRawFormat } from '@clickhouse/client-common' @@ -8,6 +9,12 @@ import Stream from 'stream' import { isStream, mapStream } from './stream' export class NodeValuesEncoder implements ValuesEncoder { + private readonly json: JSONHandling + + constructor(customJSONConfig: JSONHandling) { + this.json = customJSONConfig + } + encodeValues( values: InsertValues, format: DataFormat, @@ -20,17 +27,19 @@ export class NodeValuesEncoder implements ValuesEncoder { // JSON* formats streams return Stream.pipeline( values, - mapStream((value) => encodeJSON(value, format)), + mapStream((value) => encodeJSON(value, format, this.json.stringify)), pipelineCb, ) } // JSON* arrays if (Array.isArray(values)) { - return values.map((value) => encodeJSON(value, format)).join('') + return values + .map((value) => encodeJSON(value, format, this.json.stringify)) + .join('') } // JSON & JSONObjectEachRow format input if (typeof values === 'object') { - return encodeJSON(values, format) + return encodeJSON(values, format, this.json.stringify) } throw new Error( `Cannot encode values of type ${typeof values} with ${format} format`, diff --git a/packages/client-web/src/config.ts b/packages/client-web/src/config.ts index b8339f82..7df293f0 100644 --- a/packages/client-web/src/config.ts +++ b/packages/client-web/src/config.ts @@ -3,6 +3,7 @@ import type { ConnectionParams, DataFormat, ImplementationDetails, + JSONHandling, ResponseHeaders, } from '@clickhouse/client-common' import { WebConnection } from './connection' @@ -32,5 +33,6 @@ export const WebImpl: ImplementationDetails['impl'] = { _log_error: (err: Error) => void, response_headers: ResponseHeaders, ) => new ResultSet(stream, format, query_id, response_headers)) as any, - values_encoder: new WebValuesEncoder(), + values_encoder: (jsonHandling: JSONHandling) => + new WebValuesEncoder(jsonHandling), } diff --git a/packages/client-web/src/result_set.ts b/packages/client-web/src/result_set.ts index 981f070a..e2f9f3db 100644 --- a/packages/client-web/src/result_set.ts +++ b/packages/client-web/src/result_set.ts @@ -1,6 +1,7 @@ import type { BaseResultSet, DataFormat, + JSONHandling, ResponseHeaders, ResultJSONType, ResultStream, @@ -23,18 +24,25 @@ export class ResultSet private readonly exceptionTag: string | undefined = undefined private isAlreadyConsumed = false + private readonly jsonHandling: JSONHandling constructor( private _stream: ReadableStream, private readonly format: Format, public readonly query_id: string, _response_headers?: ResponseHeaders, + jsonHandling: JSONHandling = { + parse: JSON.parse, + stringify: JSON.stringify, + }, ) { this.response_headers = _response_headers !== undefined ? Object.freeze(_response_headers) : {} this.exceptionTag = this.response_headers['x-clickhouse-exception-tag'] as | string | undefined + + this.jsonHandling = jsonHandling } /** See {@link BaseResultSet.text} */ @@ -64,7 +72,7 @@ export class ResultSet // JSON, JSONObjectEachRow, etc. if (isNotStreamableJSONFamily(this.format as DataFormat)) { const text = await getAsText(this._stream) - return JSON.parse(text) + return this.jsonHandling.parse(text) } // should not be called for CSV, etc. throw new Error(`Cannot decode ${this.format} as JSON`) @@ -79,6 +87,7 @@ export class ResultSet let totalIncompleteLength = 0 const exceptionTag = this.exceptionTag + const jsonHandling = this.jsonHandling const decoder = new TextDecoder('utf-8') const transform = new TransformStream({ start() { @@ -135,7 +144,7 @@ export class ResultSet rows.push({ text, json(): T { - return JSON.parse(text) + return jsonHandling.parse(text) }, }) diff --git a/packages/client-web/src/utils/encoder.ts b/packages/client-web/src/utils/encoder.ts index 0f7f6ea7..b9bbd7a5 100644 --- a/packages/client-web/src/utils/encoder.ts +++ b/packages/client-web/src/utils/encoder.ts @@ -3,10 +3,21 @@ import type { InsertValues, ValuesEncoder, } from '@clickhouse/client-common' -import { encodeJSON } from '@clickhouse/client-common' +import { encodeJSON, type JSONHandling } from '@clickhouse/client-common' import { isStream } from './stream' export class WebValuesEncoder implements ValuesEncoder { + private readonly json: JSONHandling + + constructor( + jsonHandling: JSONHandling = { + parse: JSON.parse, + stringify: JSON.stringify, + }, + ) { + this.json = jsonHandling + } + encodeValues( values: InsertValues, format: DataFormat, @@ -14,11 +25,13 @@ export class WebValuesEncoder implements ValuesEncoder { throwIfStream(values) // JSON* arrays if (Array.isArray(values)) { - return values.map((value) => encodeJSON(value, format)).join('') + return values + .map((value) => encodeJSON(value, format, this.json.stringify)) + .join('') } // JSON & JSONObjectEachRow format input if (typeof values === 'object') { - return encodeJSON(values, format) + return encodeJSON(values, format, this.json.stringify) } throw new Error( `Cannot encode values of type ${typeof values} with ${format} format`,