Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 1.5.0 (Node.js)

## New features

- It is now possible to disable the automatic decompression of the response stream with the `exec` method. See `ExecParams.decompress_response_stream` for more details. ([#298](https://github.com/ClickHouse/clickhouse-js/issues/298)).

# 1.4.1 (Node.js, Web)

## Improvements
Expand Down
41 changes: 29 additions & 12 deletions packages/client-common/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,17 @@ export type ExecParams = BaseQueryParams & {
* If {@link ExecParamsWithValues.values} are defined, the query is sent as a request parameter,
* and the values are sent in the request body instead. */
query: string
/** If set to `false`, the client _will not_ decompress the response stream, even if the response compression
* was requested by the client via the {@link BaseClickHouseClientConfigOptions.compression.response } setting.
* This could be useful if the response stream is passed to another application as-is,
* and the decompression is handled there.
* @note 1) Node.js only. This setting will have no effect on the Web version.
* @note 2) In case of an error, the stream will be decompressed anyway, regardless of this setting.
* @default true */
decompress_response_stream?: boolean
}
export type ExecParamsWithValues<Stream> = ExecParams & {
/** If you have a custom INSERT statement to run with `exec`,
* the data from this stream will be inserted.
/** If you have a custom INSERT statement to run with `exec`, the data from this stream will be inserted.
*
* NB: the data in the stream is expected to be serialized accordingly to the FORMAT clause
* used in {@link ExecParams.query} in this case.
Expand Down Expand Up @@ -170,11 +177,12 @@ export class ClickHouseClient<Stream = unknown> {
}

/**
* Used for most statements that can have a response, such as SELECT.
* FORMAT clause should be specified separately via {@link QueryParams.format} (default is JSON)
* Consider using {@link ClickHouseClient.insert} for data insertion,
* or {@link ClickHouseClient.command} for DDLs.
* Used for most statements that can have a response, such as `SELECT`.
* FORMAT clause should be specified separately via {@link QueryParams.format} (default is `JSON`).
* Consider using {@link ClickHouseClient.insert} for data insertion, or {@link ClickHouseClient.command} for DDLs.
* Returns an implementation of {@link BaseResultSet}.
*
* See {@link DataFormat} for the formats supported by the client.
*/
async query<Format extends DataFormat = 'JSON'>(
params: QueryParamsWithFormat<Format>,
Expand Down Expand Up @@ -211,7 +219,9 @@ export class ClickHouseClient<Stream = unknown> {
* when the format clause is not applicable, or when you are not interested in the response at all.
* Response stream is destroyed immediately as we do not expect useful information there.
* Examples of such statements are DDLs or custom inserts.
* If you are interested in the response data, consider using {@link ClickHouseClient.exec}
*
* @note if you have a custom query that does not work with {@link ClickHouseClient.query},
* and you are interested in the response data, consider using {@link ClickHouseClient.exec}.
*/
async command(params: CommandParams): Promise<CommandResult> {
const query = removeTrailingSemi(params.query.trim())
Expand All @@ -222,18 +232,23 @@ export class ClickHouseClient<Stream = unknown> {
}

/**
* Similar to {@link ClickHouseClient.command}, but for the cases where the output is expected,
* but format clause is not applicable. The caller of this method is expected to consume the stream,
* otherwise, the request will eventually be timed out.
* Similar to {@link ClickHouseClient.command}, but for the cases where the output _is expected_,
* but format clause is not applicable. The caller of this method _must_ consume the stream,
* as the underlying socket will not be released until then, and the request will eventually be timed out.
*
* @note it is not intended to use this method to execute the DDLs, such as `CREATE TABLE` or similar;
* use {@link ClickHouseClient.command} instead.
*/
async exec(
params: ExecParams | ExecParamsWithValues<Stream>,
): Promise<ExecResult<Stream>> {
const query = removeTrailingSemi(params.query.trim())
const values = 'values' in params ? params.values : undefined
const decompress_response_stream = params.decompress_response_stream ?? true
return await this.connection.exec({
query,
values,
decompress_response_stream,
...this.withClientQueryParams(params),
})
}
Expand All @@ -242,8 +257,10 @@ export class ClickHouseClient<Stream = unknown> {
* The primary method for data insertion. It is recommended to avoid arrays in case of large inserts
* to reduce application memory consumption and consider streaming for most of such use cases.
* As the insert operation does not provide any output, the response stream is immediately destroyed.
* In case of a custom insert operation, such as, for example, INSERT FROM SELECT,
* consider using {@link ClickHouseClient.command}, passing the entire raw query there (including FORMAT clause).
*
* @note in case of a custom insert operation (e.g., `INSERT FROM SELECT`),
* consider using {@link ClickHouseClient.command}, passing the entire raw query there
* (including the `FORMAT` clause).
*/
async insert<T>(params: InsertParams<Stream, T>): Promise<InsertResult> {
if (Array.isArray(params.values) && params.values.length === 0) {
Expand Down
1 change: 1 addition & 0 deletions packages/client-common/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export interface ConnInsertParams<Stream> extends ConnBaseQueryParams {

export interface ConnExecParams<Stream> extends ConnBaseQueryParams {
values?: Stream
decompress_response_stream?: boolean
}

export interface ConnBaseResult extends WithResponseHeaders {
Expand Down
12 changes: 6 additions & 6 deletions packages/client-common/src/utils/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ export type HttpHeaders = Record<string, HttpHeader | undefined>

export function withCompressionHeaders({
headers,
compress_request,
decompress_response,
enable_request_compression,
enable_response_compression,
}: {
headers: HttpHeaders
compress_request: boolean | undefined
decompress_response: boolean | undefined
enable_request_compression: boolean | undefined
enable_response_compression: boolean | undefined
}): Record<string, string> {
return {
...headers,
...(decompress_response ? { 'Accept-Encoding': 'gzip' } : {}),
...(compress_request ? { 'Content-Encoding': 'gzip' } : {}),
...(enable_response_compression ? { 'Accept-Encoding': 'gzip' } : {}),
...(enable_request_compression ? { 'Content-Encoding': 'gzip' } : {}),
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/client-common/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '1.4.1'
export default '1.5.0'
46 changes: 43 additions & 3 deletions packages/client-node/__tests__/integration/node_exec.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { ClickHouseClient } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'
import { guid } from '@test/utils'
import { createSimpleTable } from '@test/fixtures/simple_table'
import { createTestClient, guid } from '@test/utils'
import Stream from 'stream'
import { getAsText } from '../../src/utils'
import Zlib from 'zlib'
import { drainStream, ResultSet } from '../../src'
import { getAsText } from '../../src/utils'

describe('[Node.js] exec', () => {
let client: ClickHouseClient<Stream.Readable>
Expand Down Expand Up @@ -165,4 +165,44 @@ describe('[Node.js] exec', () => {
expect(await rs.json()).toEqual(expected)
}
})

describe('disabled stream decompression', () => {
beforeEach(() => {
client = createTestClient({
compression: {
response: true,
},
})
})

it('should get a compressed response stream without decompressing it', async () => {
const result = await client.exec({
query: 'SELECT 42 AS result FORMAT JSONEachRow',
decompress_response_stream: false,
})
const text = await getAsText(decompress(result.stream))
expect(text).toEqual('{"result":42}\n')
})

it('should force decompress in case of an error', async () => {
await expectAsync(
client.exec({
query: 'invalid',
decompress_response_stream: false,
}),
).toBeRejectedWith(
jasmine.objectContaining({
message: jasmine.stringContaining('Syntax error'),
}),
)
})

function decompress(stream: Stream.Readable) {
return Stream.pipeline(stream, Zlib.createGunzip(), (err) => {
if (err) {
console.error(err)
}
})
}
})
})
69 changes: 49 additions & 20 deletions packages/client-node/src/connection/node_base_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import type {
LogWriter,
ResponseHeaders,
} from '@clickhouse/client-common'
import { sleep } from '@clickhouse/client-common'
import {
isSuccessfulResponse,
parseError,
sleep,
toSearchParams,
transformUrl,
withHttpSettings,
Expand Down Expand Up @@ -63,8 +63,10 @@ export interface RequestParams {
body?: string | Stream.Readable
// provided by the user and wrapped around internally
abort_signal: AbortSignal
decompress_response?: boolean
compress_request?: boolean
enable_response_compression?: boolean
enable_request_compression?: boolean
// if there are compression headers, attempt to decompress it
try_decompress_response_stream?: boolean
parse_summary?: boolean
}

Expand All @@ -73,7 +75,6 @@ export abstract class NodeBaseConnection
{
protected readonly defaultAuthHeader: string
protected readonly defaultHeaders: Http.OutgoingHttpHeaders
protected readonly additionalHTTPHeaders: Record<string, string>

private readonly logger: LogWriter
private readonly knownSockets = new WeakMap<net.Socket, SocketInfo>()
Expand All @@ -83,12 +84,11 @@ export abstract class NodeBaseConnection
protected readonly params: NodeConnectionParams,
protected readonly agent: Http.Agent,
) {
this.additionalHTTPHeaders = params.http_headers ?? {}
this.defaultAuthHeader = `Basic ${Buffer.from(
`${params.username}:${params.password}`,
).toString('base64')}`
this.defaultHeaders = {
...this.additionalHTTPHeaders,
...(params.http_headers ?? {}),
// KeepAlive agent for some reason does not set this on its own
Connection: this.params.keep_alive.enabled ? 'keep-alive' : 'close',
'User-Agent': getUserAgent(this.params.application_id),
Expand Down Expand Up @@ -137,21 +137,23 @@ export abstract class NodeBaseConnection
)
const searchParams = toSearchParams({
database: this.params.database,
clickhouse_settings,
query_params: params.query_params,
session_id: params.session_id,
clickhouse_settings,
query_id,
})
const decompressResponse = clickhouse_settings.enable_http_compression === 1
const { controller, controllerCleanup } = this.getAbortController(params)
// allows to enforce the compression via the settings even if the client instance has it disabled
const enableResponseCompression =
clickhouse_settings.enable_http_compression === 1
try {
const { stream, response_headers } = await this.request(
{
method: 'POST',
url: transformUrl({ url: this.params.url, searchParams }),
body: params.query,
abort_signal: controller.signal,
decompress_response: decompressResponse,
enable_response_compression: enableResponseCompression,
headers: this.buildRequestHeaders(params),
},
'Query',
Expand All @@ -170,7 +172,7 @@ export abstract class NodeBaseConnection
search_params: searchParams,
err: err as Error,
extra_args: {
decompress_response: decompressResponse,
decompress_response: enableResponseCompression,
clickhouse_settings,
},
})
Expand Down Expand Up @@ -200,7 +202,7 @@ export abstract class NodeBaseConnection
url: transformUrl({ url: this.params.url, searchParams }),
body: params.values,
abort_signal: controller.signal,
compress_request: this.params.compression.compress_request,
enable_request_compression: this.params.compression.compress_request,
parse_summary: true,
headers: this.buildRequestHeaders(params),
},
Expand Down Expand Up @@ -371,16 +373,28 @@ export abstract class NodeBaseConnection
): Promise<ConnExecResult<Stream.Readable>> {
const query_id = this.getQueryId(params.query_id)
const sendQueryInParams = params.values !== undefined
const clickhouse_settings = withHttpSettings(
params.clickhouse_settings,
this.params.compression.decompress_response,
)
const toSearchParamsOptions = {
query: sendQueryInParams ? params.query : undefined,
database: this.params.database,
clickhouse_settings: params.clickhouse_settings,
query_params: params.query_params,
session_id: params.session_id,
clickhouse_settings,
query_id,
}
const searchParams = toSearchParams(toSearchParamsOptions)
const { controller, controllerCleanup } = this.getAbortController(params)
const tryDecompressResponseStream =
params.op === 'Exec'
? // allows to disable stream decompression for the `Exec` operation only
params.decompress_response_stream ??
this.params.compression.decompress_response
: // there is nothing useful in the response stream for the `Command` operation,
// and it is immediately destroyed; never decompress it
false
try {
const { stream, summary, response_headers } = await this.request(
{
Expand All @@ -389,6 +403,10 @@ export abstract class NodeBaseConnection
body: sendQueryInParams ? params.values : params.query,
abort_signal: controller.signal,
parse_summary: true,
enable_request_compression: this.params.compression.compress_request,
enable_response_compression:
this.params.compression.decompress_response,
try_decompress_response_stream: tryDecompressResponseStream,
headers: this.buildRequestHeaders(params),
},
params.op,
Expand Down Expand Up @@ -438,20 +456,30 @@ export abstract class NodeBaseConnection
): Promise<void> => {
this.logResponse(op, request, params, _response, start)

const decompressionResult = decompressResponse(_response)
if (isDecompressionError(decompressionResult)) {
return reject(decompressionResult.error)
let responseStream: Stream.Readable
const tryDecompressResponseStream =
params.try_decompress_response_stream ?? true
// even if the stream decompression is disabled, we have to decompress it in case of an error
const isFailedResponse = !isSuccessfulResponse(_response.statusCode)
if (tryDecompressResponseStream || isFailedResponse) {
const decompressionResult = decompressResponse(_response)
if (isDecompressionError(decompressionResult)) {
return reject(decompressionResult.error)
}
responseStream = decompressionResult.response
} else {
responseStream = _response
}
if (isSuccessfulResponse(_response.statusCode)) {
if (isFailedResponse) {
reject(parseError(await getAsText(responseStream)))
} else {
return resolve({
stream: decompressionResult.response,
stream: responseStream,
summary: params.parse_summary
? this.parseSummary(op, _response)
: undefined,
response_headers: { ..._response.headers },
})
} else {
reject(parseError(await getAsText(decompressionResult.response)))
}
}

Expand Down Expand Up @@ -492,7 +520,7 @@ export abstract class NodeBaseConnection
}
}

if (params.compress_request) {
if (params.enable_request_compression) {
Stream.pipeline(bodyStream, Zlib.createGzip(), request, callback)
} else {
Stream.pipeline(bodyStream, request, callback)
Expand Down Expand Up @@ -626,4 +654,5 @@ interface SocketInfo {
type RunExecParams = ConnBaseQueryParams & {
op: 'Exec' | 'Command'
values?: ConnExecParams<Stream.Readable>['values']
decompress_response_stream?: boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ export class NodeCustomAgentConnection extends NodeBaseConnection {
protected createClientRequest(params: RequestParams): Http.ClientRequest {
const headers = withCompressionHeaders({
headers: params.headers,
compress_request: params.compress_request,
decompress_response: params.decompress_response,
enable_request_compression: params.enable_request_compression,
enable_response_compression: params.enable_response_compression,
})
return Http.request(params.url, {
method: params.method,
Expand Down
4 changes: 2 additions & 2 deletions packages/client-node/src/connection/node_http_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ export class NodeHttpConnection extends NodeBaseConnection {
protected createClientRequest(params: RequestParams): Http.ClientRequest {
const headers = withCompressionHeaders({
headers: params.headers,
compress_request: params.compress_request,
decompress_response: params.decompress_response,
enable_request_compression: params.enable_request_compression,
enable_response_compression: params.enable_response_compression,
})
return Http.request(params.url, {
method: params.method,
Expand Down
Loading