Skip to content

Commit

Permalink
Merge pull request #52853 from Avogar/http-valid-json-on-exception
Browse files Browse the repository at this point in the history
Output valid JSON/XML on excetpion during HTTP query execution
  • Loading branch information
Avogar committed Sep 26, 2023
2 parents 6cd32eb + e163670 commit 69a17bb
Show file tree
Hide file tree
Showing 32 changed files with 1,163 additions and 79 deletions.
81 changes: 81 additions & 0 deletions docs/en/interfaces/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -720,3 +720,84 @@ $ curl -vv -H 'XXX:xxx' 'http://localhost:8123/get_relative_path_static_handler'
<html><body>Relative Path File</body></html>
* Connection #0 to host localhost left intact
```
## Valid JSON/XML response on exception during HTTP streaming {valid-output-on-exception-http-streaming}
While query execution over HTTP an exception can happen when part of the data has already been sent. Usually an exception is sent to the client in plain text
even if some specific data format was used to output data and the output may become invalid in terms of specified data format.
To prevent it, you can use setting `http_write_exception_in_output_format` (enabled by default) that will tell ClickHouse to write an exception in specified format (currently supported for XML and JSON* formats).
Examples:
```bash
$ curl 'http://localhost:8123/?query=SELECT+number,+throwIf(number>3)+from+system.numbers+format+JSON+settings+max_block_size=1&http_write_exception_in_output_format=1'
{
"meta":
[
{
"name": "number",
"type": "UInt64"
},
{
"name": "throwIf(greater(number, 2))",
"type": "UInt8"
}
],
"data":
[
{
"number": "0",
"throwIf(greater(number, 2))": 0
},
{
"number": "1",
"throwIf(greater(number, 2))": 0
},
{
"number": "2",
"throwIf(greater(number, 2))": 0
}
],
"rows": 3,
"exception": "Code: 395. DB::Exception: Value passed to 'throwIf' function is non-zero: while executing 'FUNCTION throwIf(greater(number, 2) :: 2) -> throwIf(greater(number, 2)) UInt8 : 1'. (FUNCTION_THROW_IF_VALUE_IS_NON_ZERO) (version 23.8.1.1)"
}
```
```bash
$ curl 'http://localhost:8123/?query=SELECT+number,+throwIf(number>2)+from+system.numbers+format+XML+settings+max_block_size=1&http_write_exception_in_output_format=1'
<?xml version='1.0' encoding='UTF-8' ?>
<result>
<meta>
<columns>
<column>
<name>number</name>
<type>UInt64</type>
</column>
<column>
<name>throwIf(greater(number, 2))</name>
<type>UInt8</type>
</column>
</columns>
</meta>
<data>
<row>
<number>0</number>
<field>0</field>
</row>
<row>
<number>1</number>
<field>0</field>
</row>
<row>
<number>2</number>
<field>0</field>
</row>
</data>
<rows>3</rows>
<exception>Code: 395. DB::Exception: Value passed to 'throwIf' function is non-zero: while executing 'FUNCTION throwIf(greater(number, 2) :: 2) -> throwIf(greater(number, 2)) UInt8 : 1'. (FUNCTION_THROW_IF_VALUE_IS_NON_ZERO) (version 23.8.1.1)</exception>
</result>
```
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class IColumn;
\
M(UInt64, http_headers_progress_interval_ms, 100, "Do not send HTTP headers X-ClickHouse-Progress more frequently than at each specified interval.", 0) \
M(Bool, http_wait_end_of_query, false, "Enable HTTP response buffering on the server-side.", 0) \
M(Bool, http_write_exception_in_output_format, true, "Write exception in output format to produce valid output. Works with JSON and XML formats.", 0) \
M(UInt64, http_response_buffer_size, 0, "The number of bytes to buffer in the server memory before sending a HTTP response to the client or flushing to disk (when http_wait_end_of_query is enabled).", 0) \
\
M(Bool, fsync_metadata, true, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.", 0) \
Expand Down
3 changes: 2 additions & 1 deletion src/Core/SettingsChangesHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.9", {{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"}}},
{"23.9", {{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"},
{"http_write_exception_in_output_format", false, true, "Output valid JSON/XML on exception in HTTP streaming."}}},
{"23.8", {{"rewrite_count_distinct_if_with_count_distinct_implementation", false, true, "Rewrite countDistinctIf with count_distinct_implementation configuration"}}},
{"23.7", {{"function_sleep_max_microseconds_per_block", 0, 3000000, "In previous versions, the maximum sleep time of 3 seconds was applied only for `sleep`, but not for `sleepEachRow` function. In the new version, we introduce this setting. If you set compatibility with the previous versions, we will disable the limit altogether."}}},
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},
Expand Down
7 changes: 6 additions & 1 deletion src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include <Formats/FormatFactory.h>

#include <algorithm>
#include <Core/Settings.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
Expand Down Expand Up @@ -230,6 +229,12 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
context->getRemoteHostFilter().checkURL(avro_schema_registry_url);
}

if (context->getClientInfo().interface == ClientInfo::Interface::HTTP && context->getSettingsRef().http_write_exception_in_output_format.value)
{
format_settings.json.valid_output_on_exception = true;
format_settings.xml.valid_output_on_exception = true;
}

return format_settings;
}

Expand Down
6 changes: 6 additions & 0 deletions src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ struct FormatSettings
bool validate_types_from_metadata = true;
bool validate_utf8 = false;
bool allow_object_type = false;
bool valid_output_on_exception = false;
bool compact_allow_variable_number_of_columns = false;
} json;

Expand Down Expand Up @@ -414,6 +415,11 @@ struct FormatSettings
bool allow_types_conversion = true;
} native;

struct
{
bool valid_output_on_exception = false;
} xml;

struct
{
bool escape_special_characters = false;
Expand Down
6 changes: 6 additions & 0 deletions src/Formats/JSONUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@ namespace JSONUtils
}
}

void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent)
{
writeTitle("exception", out, indent, " ");
writeJSONString(exception_message, out, settings);
}

Strings makeNamesValidJSONStrings(const Strings & names, const FormatSettings & settings, bool validate_utf8)
{
Strings result;
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/JSONUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ namespace JSONUtils
bool write_statistics,
WriteBuffer & out);

void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent = 0);

void skipColon(ReadBuffer & in);
void skipComma(ReadBuffer & in);

Expand Down
85 changes: 85 additions & 0 deletions src/IO/PeekableWriteBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include <IO/PeekableWriteBuffer.h>

namespace DB
{

PeekableWriteBuffer::PeekableWriteBuffer(DB::WriteBuffer & sub_buf_) : BufferWithOwnMemory(0), sub_buf(sub_buf_)
{
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin() + sub_buf.offset(), sub_working.size() - sub_buf.offset(), 0);
}

void PeekableWriteBuffer::nextImpl()
{
if (checkpoint)
{
if (write_to_own_memory)
{
size_t prev_size = position() - memory.data();
size_t new_size = memory.size() * 2;
memory.resize(new_size);
BufferBase::set(memory.data(), memory.size(), prev_size);
return;
}

if (memory.size() == 0)
memory.resize(DBMS_DEFAULT_BUFFER_SIZE);

sub_buf.position() = position();
BufferBase::set(memory.data(), memory.size(), 0);
write_to_own_memory = true;
return;
}

sub_buf.position() = position();
sub_buf.next();
BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
}


void PeekableWriteBuffer::dropCheckpoint()
{
assert(checkpoint);
checkpoint = std::nullopt;
/// If we have saved data in own memory, write it to sub-buf.
if (write_to_own_memory)
{
try
{
sub_buf.next();
sub_buf.write(memory.data(), position() - memory.data());
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
write_to_own_memory = false;
}
catch (...)
{
/// If exception happened during writing to sub buffer, we should
/// update buffer to not leave it in invalid state.
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
write_to_own_memory = false;
}
}

}

void PeekableWriteBuffer::rollbackToCheckpoint(bool drop)
{
assert(checkpoint);

/// Just ignore all data written after checkpoint.
if (write_to_own_memory)
{
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
write_to_own_memory = false;
}

position() = *checkpoint;

if (drop)
checkpoint = std::nullopt;
}

}
59 changes: 59 additions & 0 deletions src/IO/PeekableWriteBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <stack>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

/// Similar to PeekableReadBuffer.
/// Allows to set checkpoint at some position in stream and come back to this position later.
/// When next() is called, saves data between checkpoint and current position to own memory instead of writing it to sub-buffer.
/// So, all the data after checkpoint won't be written in sub-buffer until checkpoint is dropped.
/// Rollback to checkpoint means that all data after checkpoint will be ignored and not sent to sub-buffer.
/// Sub-buffer should not be accessed directly during the lifetime of peekable buffer (unless
/// you reset() the state of peekable buffer after each change of underlying buffer)
/// If position() of peekable buffer is explicitly set to some position before checkpoint
/// (e.g. by istr.position() = prev_pos), behavior is undefined.
class PeekableWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
friend class PeekableWriteBufferCheckpoint;
public:
explicit PeekableWriteBuffer(WriteBuffer & sub_buf_);

/// Sets checkpoint at current position
ALWAYS_INLINE inline void setCheckpoint()
{
if (checkpoint)
throw Exception(ErrorCodes::LOGICAL_ERROR, "PeekableWriteBuffer does not support recursive checkpoints.");

checkpoint.emplace(pos);
}

/// Forget checkpoint and send all data from checkpoint to position to sub-buffer.
void dropCheckpoint();

/// Sets position at checkpoint and forget all data written from checkpoint to position.
/// All pointers (such as this->buffer().end()) may be invalidated
void rollbackToCheckpoint(bool drop = false);

void finalizeImpl() override
{
assert(!checkpoint);
sub_buf.position() = position();
}

private:
void nextImpl() override;

WriteBuffer & sub_buf;
bool write_to_own_memory = false;
std::optional<Position> checkpoint = std::nullopt;
};

}
18 changes: 11 additions & 7 deletions src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,8 @@ void executeQuery(
bool allow_into_outfile,
ContextMutablePtr context,
SetResultDetailsFunc set_result_details,
const std::optional<FormatSettings> & output_format_settings)
const std::optional<FormatSettings> & output_format_settings,
HandleExceptionInOutputFormatFunc handle_exception_in_output_format)
{
PODArray<char> parse_buf;
const char * begin;
Expand Down Expand Up @@ -1324,6 +1325,7 @@ void executeQuery(

ASTPtr ast;
BlockIO streams;
OutputFormatPtr output_format;

std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, &istr);
auto & pipeline = streams.pipeline;
Expand Down Expand Up @@ -1366,30 +1368,30 @@ void executeQuery(
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();

auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(
output_format = FormatFactory::instance().getOutputFormatParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
materializeBlock(pipeline.getHeader()),
context,
output_format_settings);

out->setAutoFlush();
output_format->setAutoFlush();

/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context->getProgressCallback();

/// NOTE Progress callback takes shared ownership of 'out'.
pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
pipeline.setProgressCallback([output_format, previous_progress_callback] (const Progress & progress)
{
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
output_format->onProgress(progress);
});

result_details.content_type = out->getContentType();
result_details.content_type = output_format->getContentType();
result_details.format = format_name;

pipeline.complete(std::move(out));
pipeline.complete(output_format);
}
else
{
Expand Down Expand Up @@ -1419,6 +1421,8 @@ void executeQuery(
}
catch (...)
{
if (handle_exception_in_output_format && output_format)
handle_exception_in_output_format(*output_format);
streams.onException();
throw;
}
Expand Down

0 comments on commit 69a17bb

Please sign in to comment.