Skip to content

Commit

Permalink
Backport #52853 to 23.8: Output valid JSON/XML on exception during HT…
Browse files Browse the repository at this point in the history
…TP query execution
  • Loading branch information
robot-clickhouse committed Dec 8, 2023
1 parent 32bfa93 commit 324be20
Show file tree
Hide file tree
Showing 30 changed files with 1,080 additions and 78 deletions.
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class IColumn;
\
M(UInt64, http_headers_progress_interval_ms, 100, "Do not send HTTP headers X-ClickHouse-Progress more frequently than at each specified interval.", 0) \
M(Bool, http_wait_end_of_query, false, "Enable HTTP response buffering on the server-side.", 0) \
M(Bool, http_write_exception_in_output_format, false, "Write exception in output format to produce valid output. Works with JSON and XML formats.", 0) \
M(UInt64, http_response_buffer_size, 0, "The number of bytes to buffer in the server memory before sending a HTTP response to the client or flushing to disk (when http_wait_end_of_query is enabled).", 0) \
\
M(Bool, fsync_metadata, true, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.", 0) \
Expand Down
7 changes: 6 additions & 1 deletion src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include <Formats/FormatFactory.h>

#include <algorithm>
#include <Core/Settings.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
Expand Down Expand Up @@ -229,6 +228,12 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
context->getRemoteHostFilter().checkURL(avro_schema_registry_url);
}

if (context->getClientInfo().interface == ClientInfo::Interface::HTTP && context->getSettingsRef().http_write_exception_in_output_format.value)
{
format_settings.json.valid_output_on_exception = true;
format_settings.xml.valid_output_on_exception = true;
}

return format_settings;
}

Expand Down
6 changes: 6 additions & 0 deletions src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ struct FormatSettings
bool validate_types_from_metadata = true;
bool validate_utf8 = false;
bool allow_object_type = false;
bool valid_output_on_exception = false;
bool compact_allow_variable_number_of_columns = false;
} json;

Expand Down Expand Up @@ -405,6 +406,11 @@ struct FormatSettings
{
bool allow_types_conversion = true;
} native;

struct
{
bool valid_output_on_exception = false;
} xml;
};

}
6 changes: 6 additions & 0 deletions src/Formats/JSONUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@ namespace JSONUtils
}
}

void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent)
{
writeTitle("exception", out, indent, " ");
writeJSONString(exception_message, out, settings);
}

Strings makeNamesValidJSONStrings(const Strings & names, const FormatSettings & settings, bool validate_utf8)
{
Strings result;
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/JSONUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ namespace JSONUtils
bool write_statistics,
WriteBuffer & out);

void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent = 0);

void skipColon(ReadBuffer & in);
void skipComma(ReadBuffer & in);

Expand Down
85 changes: 85 additions & 0 deletions src/IO/PeekableWriteBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include <IO/PeekableWriteBuffer.h>

namespace DB
{

PeekableWriteBuffer::PeekableWriteBuffer(DB::WriteBuffer & sub_buf_) : BufferWithOwnMemory(0), sub_buf(sub_buf_)
{
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin() + sub_buf.offset(), sub_working.size() - sub_buf.offset(), 0);
}

void PeekableWriteBuffer::nextImpl()
{
if (checkpoint)
{
if (write_to_own_memory)
{
size_t prev_size = position() - memory.data();
size_t new_size = memory.size() * 2;
memory.resize(new_size);
BufferBase::set(memory.data(), memory.size(), prev_size);
return;
}

if (memory.size() == 0)
memory.resize(DBMS_DEFAULT_BUFFER_SIZE);

sub_buf.position() = position();
BufferBase::set(memory.data(), memory.size(), 0);
write_to_own_memory = true;
return;
}

sub_buf.position() = position();
sub_buf.next();
BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
}


void PeekableWriteBuffer::dropCheckpoint()
{
assert(checkpoint);
checkpoint = std::nullopt;
/// If we have saved data in own memory, write it to sub-buf.
if (write_to_own_memory)
{
try
{
sub_buf.next();
sub_buf.write(memory.data(), position() - memory.data());
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
write_to_own_memory = false;
}
catch (...)
{
/// If exception happened during writing to sub buffer, we should
/// update buffer to not leave it in invalid state.
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
write_to_own_memory = false;
}
}

}

void PeekableWriteBuffer::rollbackToCheckpoint(bool drop)
{
assert(checkpoint);

/// Just ignore all data written after checkpoint.
if (write_to_own_memory)
{
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
write_to_own_memory = false;
}

position() = *checkpoint;

if (drop)
checkpoint = std::nullopt;
}

}
59 changes: 59 additions & 0 deletions src/IO/PeekableWriteBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <stack>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

/// Similar to PeekableReadBuffer.
/// Allows to set checkpoint at some position in stream and come back to this position later.
/// When next() is called, saves data between checkpoint and current position to own memory instead of writing it to sub-buffer.
/// So, all the data after checkpoint won't be written in sub-buffer until checkpoint is dropped.
/// Rollback to checkpoint means that all data after checkpoint will be ignored and not sent to sub-buffer.
/// Sub-buffer should not be accessed directly during the lifetime of peekable buffer (unless
/// you reset() the state of peekable buffer after each change of underlying buffer)
/// If position() of peekable buffer is explicitly set to some position before checkpoint
/// (e.g. by istr.position() = prev_pos), behavior is undefined.
class PeekableWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
friend class PeekableWriteBufferCheckpoint;
public:
explicit PeekableWriteBuffer(WriteBuffer & sub_buf_);

/// Sets checkpoint at current position
ALWAYS_INLINE inline void setCheckpoint()
{
if (checkpoint)
throw Exception(ErrorCodes::LOGICAL_ERROR, "PeekableWriteBuffer does not support recursive checkpoints.");

checkpoint.emplace(pos);
}

/// Forget checkpoint and send all data from checkpoint to position to sub-buffer.
void dropCheckpoint();

/// Sets position at checkpoint and forget all data written from checkpoint to position.
/// All pointers (such as this->buffer().end()) may be invalidated
void rollbackToCheckpoint(bool drop = false);

void finalizeImpl() override
{
assert(!checkpoint);
sub_buf.position() = position();
}

private:
void nextImpl() override;

WriteBuffer & sub_buf;
bool write_to_own_memory = false;
std::optional<Position> checkpoint = std::nullopt;
};

}
18 changes: 11 additions & 7 deletions src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,8 @@ void executeQuery(
bool allow_into_outfile,
ContextMutablePtr context,
SetResultDetailsFunc set_result_details,
const std::optional<FormatSettings> & output_format_settings)
const std::optional<FormatSettings> & output_format_settings,
HandleExceptionInOutputFormatFunc handle_exception_in_output_format)
{
PODArray<char> parse_buf;
const char * begin;
Expand Down Expand Up @@ -1319,6 +1320,7 @@ void executeQuery(

ASTPtr ast;
BlockIO streams;
OutputFormatPtr output_format;

std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, &istr);
auto & pipeline = streams.pipeline;
Expand Down Expand Up @@ -1361,30 +1363,30 @@ void executeQuery(
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();

auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(
output_format = FormatFactory::instance().getOutputFormatParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
materializeBlock(pipeline.getHeader()),
context,
output_format_settings);

out->setAutoFlush();
output_format->setAutoFlush();

/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context->getProgressCallback();

/// NOTE Progress callback takes shared ownership of 'out'.
pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
pipeline.setProgressCallback([output_format, previous_progress_callback] (const Progress & progress)
{
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
output_format->onProgress(progress);
});

result_details.content_type = out->getContentType();
result_details.content_type = output_format->getContentType();
result_details.format = format_name;

pipeline.complete(std::move(out));
pipeline.complete(output_format);
}
else
{
Expand Down Expand Up @@ -1414,6 +1416,8 @@ void executeQuery(
}
catch (...)
{
if (handle_exception_in_output_format && output_format)
handle_exception_in_output_format(*output_format);
streams.onException();
throw;
}
Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/executeQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace DB
class IInterpreter;
class ReadBuffer;
class WriteBuffer;
class IOutputFormat;
struct QueryStatusInfo;

struct QueryResultDetails
Expand All @@ -26,6 +27,7 @@ struct QueryResultDetails
};

using SetResultDetailsFunc = std::function<void(const QueryResultDetails &)>;
using HandleExceptionInOutputFormatFunc = std::function<void(IOutputFormat & output_format)>;

/// Parse and execute a query.
void executeQuery(
Expand All @@ -34,7 +36,8 @@ void executeQuery(
bool allow_into_outfile, /// If true and the query contains INTO OUTFILE section, redirect output to that file.
ContextMutablePtr context, /// DB, tables, data types, storage engines, functions, aggregate functions...
SetResultDetailsFunc set_result_details, /// If a non-empty callback is passed, it will be called with the query id, the content-type, the format, and the timezone.
const std::optional<FormatSettings> & output_format_settings = std::nullopt /// Format settings for output format, will be calculated from the context if not set.
const std::optional<FormatSettings> & output_format_settings = std::nullopt, /// Format settings for output format, will be calculated from the context if not set.
HandleExceptionInOutputFormatFunc handle_exception_in_output_format = {} /// If a non-empty callback is passed, it will be called on exception with created output format.
);


Expand Down
8 changes: 8 additions & 0 deletions src/Processors/Formats/IOutputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class IOutputFormat : public IProcessor
consumeExtremes(Chunk(extremes.getColumns(), extremes.rows()));
}

virtual bool supportsWritingException() const { return false; }
virtual void setException(const String & /*exception_message*/) {}

size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_bytes; }

Expand Down Expand Up @@ -162,6 +165,11 @@ class IOutputFormat : public IProcessor
/// outputs them in finalize() method.
virtual bool areTotalsAndExtremesUsedInFinalize() const { return false; }

/// Derived classes can use some wrappers around out WriteBuffer
/// and can override this method to return wrapper
/// that should be used in its derived classes.
virtual WriteBuffer * getWriteBufferPtr() { return &out; }

WriteBuffer & out;

Chunk current_chunk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ namespace DB

JSONColumnsBlockOutputFormatBase::JSONColumnsBlockOutputFormatBase(
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool validate_utf8)
: OutputFormatWithUTF8ValidationAdaptor(validate_utf8, header_, out_)
: OutputFormatWithUTF8ValidationAdaptor(header_, out_, validate_utf8)
, format_settings(format_settings_)
, serializations(header_.getSerializations())
{
ostr = OutputFormatWithUTF8ValidationAdaptor::getWriteBufferPtr();
}

void JSONColumnsBlockOutputFormatBase::consume(Chunk chunk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class JSONColumnsBlockOutputFormatBase : public OutputFormatWithUTF8ValidationAd
Chunk mono_chunk;

size_t written_rows = 0;
WriteBuffer * ostr;
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer
bool with_names_,
bool with_types_,
bool yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_)
: RowOutputFormatWithExceptionHandlerAdaptor<RowOutputFormatWithUTF8ValidationAdaptor, bool>(header_, out_, settings_.json.valid_output_on_exception, settings_.json.validate_utf8)
, settings(settings_)
, with_names(with_names_)
, with_types(with_types_)
, yield_strings(yield_strings_)
{
ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
}


Expand Down Expand Up @@ -102,6 +103,25 @@ void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)
IRowOutputFormat::consumeTotals(std::move(chunk));
}

void JSONCompactEachRowRowOutputFormat::writeSuffix()
{
if (!exception_message.empty())
{
if (haveWrittenData())
writeRowBetweenDelimiter();

writeRowStartDelimiter();
writeJSONString(exception_message, *ostr, settings);
writeRowEndDelimiter();
}
}

void JSONCompactEachRowRowOutputFormat::resetFormatterImpl()
{
RowOutputFormatWithExceptionHandlerAdaptor::resetFormatterImpl();
ostr = RowOutputFormatWithExceptionHandlerAdaptor::getWriteBufferPtr();
}

void registerOutputFormatJSONCompactEachRow(FormatFactory & factory)
{
for (bool yield_strings : {false, true})
Expand Down

0 comments on commit 324be20

Please sign in to comment.