Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 1.14.0

## New features

- Added an `ignore_error_response` key to `client.exec`, which allows callers to manually handle errors. ([#483])

# 1.13.0

## New features
Expand Down
11 changes: 11 additions & 0 deletions packages/client-common/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ export type ExecParams = BaseQueryParams & {
* @note 2) In case of an error, the stream will be decompressed anyway, regardless of this setting.
* @default true */
decompress_response_stream?: boolean
/**
* If set to `true`, the client will ignore error responses from the server and return them as-is in the response stream.
* This could be useful if you want to handle error responses manually.
* @note 1) Node.js only. This setting will have no effect on the Web version.
* @note 2) Default behavior is to not ignore error responses, and throw an error when an error response
* is received. This includes decompressing the error response stream if it is compressed.
* @default false
*/
ignore_error_response?: boolean
}
export type ExecParamsWithValues<Stream> = ExecParams & {
/** If you have a custom INSERT statement to run with `exec`, the data from this stream will be inserted.
Expand Down Expand Up @@ -277,10 +286,12 @@ export class ClickHouseClient<Stream = unknown> {
const query = removeTrailingSemi(params.query.trim())
const values = 'values' in params ? params.values : undefined
const decompress_response_stream = params.decompress_response_stream ?? true
const ignore_error_response = params.ignore_error_response ?? false
return await this.connection.exec({
query,
values,
decompress_response_stream,
ignore_error_response,
...this.withClientQueryParams(params),
})
}
Expand Down
1 change: 1 addition & 0 deletions packages/client-common/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export interface ConnInsertParams<Stream> extends ConnBaseQueryParams {
export interface ConnExecParams<Stream> extends ConnBaseQueryParams {
values?: Stream
decompress_response_stream?: boolean
ignore_error_response?: boolean
}

export interface ConnBaseResult extends WithResponseHeaders {
Expand Down
41 changes: 35 additions & 6 deletions packages/client-node/__tests__/integration/node_exec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,42 @@ describe('[Node.js] exec', () => {
}),
)
})
})

function decompress(stream: Stream.Readable) {
return Stream.pipeline(stream, Zlib.createGunzip(), (err) => {
if (err) {
console.error(err)
}
describe('ignore error response', () => {
beforeEach(() => {
client = createTestClient({
compression: {
response: true,
},
})
}
})

it('should get a decompressed response stream if ignore_error_response is true and default decompression config is passed', async () => {
const result = await client.exec({
query: 'invalid',
ignore_error_response: true,
})
const text = await getAsText(result.stream)
expect(text).toContain('Syntax error')
})

it('should get a compressed response stream if ignore_error_response is true and decompression is disabled', async () => {
const result = await client.exec({
query: 'invalid',
decompress_response_stream: false,
ignore_error_response: true,
})
const text = await getAsText(decompress(result.stream))
expect(text).toContain('Syntax error')
})
})
})

function decompress(stream: Stream.Readable) {
return Stream.pipeline(stream, Zlib.createGunzip(), (err) => {
if (err) {
console.error(err)
}
})
}
13 changes: 11 additions & 2 deletions packages/client-node/src/connection/node_base_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ export interface RequestParams {
enable_request_compression?: boolean
// if there are compression headers, attempt to decompress it
try_decompress_response_stream?: boolean
// if the response contains an error, ignore it and return the stream as-is
ignore_error_response?: boolean
parse_summary?: boolean
query: string
}
Expand Down Expand Up @@ -468,6 +470,7 @@ export abstract class NodeBaseConnection
: // there is nothing useful in the response stream for the `Command` operation,
// and it is immediately destroyed; never decompress it
false
const ignoreErrorResponse = params.ignore_error_response ?? false
try {
const { stream, summary, response_headers } = await this.request(
{
Expand All @@ -480,6 +483,7 @@ export abstract class NodeBaseConnection
enable_response_compression:
this.params.compression.decompress_response,
try_decompress_response_stream: tryDecompressResponseStream,
ignore_error_response: ignoreErrorResponse,
headers: this.buildRequestHeaders(params),
query: params.query,
},
Expand Down Expand Up @@ -537,9 +541,13 @@ export abstract class NodeBaseConnection
this.logResponse(op, request, params, _response, start)
const tryDecompressResponseStream =
params.try_decompress_response_stream ?? true
const ignoreErrorResponse = params.ignore_error_response ?? false
// even if the stream decompression is disabled, we have to decompress it in case of an error
const isFailedResponse = !isSuccessfulResponse(_response.statusCode)
if (tryDecompressResponseStream || isFailedResponse) {
if (
tryDecompressResponseStream ||
(isFailedResponse && !ignoreErrorResponse)
) {
const decompressionResult = decompressResponse(_response, this.logger)
if (isDecompressionError(decompressionResult)) {
const err = enhanceStackTrace(
Expand All @@ -552,7 +560,7 @@ export abstract class NodeBaseConnection
} else {
responseStream = _response
}
if (isFailedResponse) {
if (isFailedResponse && !ignoreErrorResponse) {
try {
const errorMessage = await getAsText(responseStream)
const err = enhanceStackTrace(
Expand Down Expand Up @@ -795,6 +803,7 @@ type RunExecParams = ConnBaseQueryParams & {
op: 'Exec' | 'Command'
values?: ConnExecParams<Stream.Readable>['values']
decompress_response_stream?: boolean
ignore_error_response?: boolean
}

const PingQuery = `SELECT 'ping'`
Loading