Skip to content

Commit

Permalink
Merge pull request #58422 from ClickHouse/cherrypick/23.3/e69bda7f083…
Browse files Browse the repository at this point in the history
…e2ddee678e9ed99002753f8aa0d9b

Cherry pick #57438 to 23.3: Fix working with read buffers in StreamingFormatExecutor
  • Loading branch information
robot-clickhouse-ci-2 committed Jan 2, 2024
2 parents b179392 + 0cefc4f commit f1f6a41
Show file tree
Hide file tree
Showing 25 changed files with 192 additions and 96 deletions.
27 changes: 0 additions & 27 deletions src/IO/PeekableReadBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,6 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_
checkStateCorrect();
}

void PeekableReadBuffer::reset()
{
checkStateCorrect();
}

void PeekableReadBuffer::setSubBuffer(ReadBuffer & sub_buf_)
{
sub_buf = &sub_buf_;
resetImpl();
}

void PeekableReadBuffer::resetImpl()
{
peeked_size = 0;
checkpoint = std::nullopt;
checkpoint_in_own_memory = false;
use_stack_memory = true;

if (!currentlyReadFromOwnMemory())
sub_buf->position() = pos;

Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());

checkStateCorrect();
}

bool PeekableReadBuffer::peekNext()
{
checkStateCorrect();
Expand Down
6 changes: 0 additions & 6 deletions src/IO/PeekableReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer>
/// This data will be lost after destruction of peekable buffer.
bool hasUnreadData() const;

// for streaming reading (like in Kafka) we need to restore initial state of the buffer
// without recreating the buffer.
void reset();

void setSubBuffer(ReadBuffer & sub_buf_);

const ReadBuffer & getSubBuffer() const { return *sub_buf; }

private:
Expand Down
6 changes: 0 additions & 6 deletions src/Interpreters/AsynchronousInsertQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ try
log_elements.reserve(data->entries.size());

StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
std::unique_ptr<ReadBuffer> last_buffer;
auto chunk_info = std::make_shared<ChunkOffsets>();
for (const auto & entry : data->entries)
{
Expand All @@ -440,10 +439,6 @@ try
total_rows += executor.execute(*buffer);
chunk_info->offsets.push_back(total_rows);

/// Keep buffer, because it still can be used
/// in destructor, while resetting buffer at next iteration.
last_buffer = std::move(buffer);

if (insert_log)
{
AsynchronousInsertLogElement elem;
Expand All @@ -470,7 +465,6 @@ try
}
}

format->addBuffer(std::move(last_buffer));
auto insert_query_id = insert_context->getCurrentQueryId();

if (total_rows == 0)
Expand Down
10 changes: 4 additions & 6 deletions src/Processors/Executors/StreamingFormatExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ MutableColumns StreamingFormatExecutor::getResultColumns()

size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
{
auto & initial_buf = format->getReadBuffer();
format->setReadBuffer(buffer);
size_t rows = execute();

/// Format destructor can touch read buffer (for example when we use PeekableReadBuffer),
/// but we cannot control lifetime of provided read buffer. To avoid heap use after free
/// we can set initial read buffer back, because initial read buffer was created before
/// format, so it will be destructed after it.
format->setReadBuffer(initial_buf);
return rows;
/// we call format->resetReadBuffer() method that resets all buffers inside format.
SCOPE_EXIT(format->resetReadBuffer());
return execute();
}

size_t StreamingFormatExecutor::execute()
Expand Down
4 changes: 3 additions & 1 deletion src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class IInputFormat : public ISource
virtual void resetParser();

virtual void setReadBuffer(ReadBuffer & in_);
ReadBuffer & getReadBuffer() const { return *in; }
virtual void resetReadBuffer() { in = nullptr; }

virtual const BlockMissingValues & getMissingValues() const
{
Expand All @@ -61,6 +61,8 @@ class IInputFormat : public ISource
virtual size_t getApproxBytesReadForChunk() const { return 0; }

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

ColumnMappingPtr column_mapping{};

InputFormatErrorsLoggerPtr errors_logger;
Expand Down
1 change: 0 additions & 1 deletion src/Processors/Formats/Impl/ArrowBufferedStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
#include <IO/PeekableReadBuffer.h>
#include <arrow/buffer.h>
#include <arrow/util/future.h>
#include <arrow/io/memory.h>
Expand Down
9 changes: 5 additions & 4 deletions src/Processors/Formats/Impl/CSVRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ void CSVRowInputFormat::syncAfterError()

void CSVRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
}

void CSVRowInputFormat::resetParser()
void CSVRowInputFormat::resetReadBuffer()
{
RowInputFormatWithNamesAndTypes::resetParser();
buf->reset();
buf.reset();
RowInputFormatWithNamesAndTypes::resetReadBuffer();
}

static void skipEndOfLine(ReadBuffer & in)
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/CSVRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CSVRowInputFormat : public RowInputFormatWithNamesAndTypes
String getName() const override { return "CSVRowInputFormat"; }

void setReadBuffer(ReadBuffer & in_) override;
void resetParser() override;
void resetReadBuffer() override;

protected:
CSVRowInputFormat(const Block & header_, std::shared_ptr<PeekableReadBuffer> in_, const Params & params_,
Expand Down
15 changes: 8 additions & 7 deletions src/Processors/Formats/Impl/CustomSeparatedRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,20 @@ void CustomSeparatedRowInputFormat::syncAfterError()

void CustomSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
}

CustomSeparatedFormatReader::CustomSeparatedFormatReader(
PeekableReadBuffer & buf_, bool ignore_spaces_, const FormatSettings & format_settings_)
: FormatWithNamesAndTypesReader(buf_, format_settings_), buf(&buf_), ignore_spaces(ignore_spaces_)
void CustomSeparatedRowInputFormat::resetReadBuffer()
{
buf.reset();
RowInputFormatWithNamesAndTypes::resetReadBuffer();
}

void CustomSeparatedRowInputFormat::resetParser()
CustomSeparatedFormatReader::CustomSeparatedFormatReader(
PeekableReadBuffer & buf_, bool ignore_spaces_, const FormatSettings & format_settings_)
: FormatWithNamesAndTypesReader(buf_, format_settings_), buf(&buf_), ignore_spaces(ignore_spaces_)
{
RowInputFormatWithNamesAndTypes::resetParser();
buf->reset();
}

void CustomSeparatedFormatReader::skipPrefixBeforeHeader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class CustomSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTyp
const Params & params_,
bool with_names_, bool with_types_, bool ignore_spaces_, const FormatSettings & format_settings_);

void resetParser() override;
String getName() const override { return "CustomSeparatedRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
CustomSeparatedRowInputFormat(
Expand Down
19 changes: 10 additions & 9 deletions src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ JSONAsRowInputFormat::JSONAsRowInputFormat(const Block & header_, std::unique_pt
header_.columns());
}

void JSONAsRowInputFormat::resetParser()

void JSONAsRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
JSONEachRowRowInputFormat::setReadBuffer(*buf);
}

void JSONAsRowInputFormat::resetReadBuffer()
{
IRowInputFormat::resetParser();
buf->reset();
buf.reset();
JSONEachRowRowInputFormat::resetReadBuffer();
}

void JSONAsRowInputFormat::readPrefix()
Expand Down Expand Up @@ -97,12 +104,6 @@ bool JSONAsRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
return !buf->eof();
}

void JSONAsRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
}


JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_)
: JSONAsRowInputFormat(header_, in_, params_)
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class JSONAsRowInputFormat : public IRowInputFormat
public:
JSONAsRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);

void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
JSONAsRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_);
Expand Down
18 changes: 12 additions & 6 deletions src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,21 @@ MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, std::unique_
void MsgPackRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf->reset();
visitor.reset();
}

void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IRowInputFormat::setReadBuffer(*buf);
}

void MsgPackRowInputFormat::resetReadBuffer()
{
buf.reset();
IRowInputFormat::resetReadBuffer();
}

void MsgPackVisitor::set_info(IColumn & column, DataTypePtr type, UInt8 & read) // NOLINT
{
while (!info_stack.empty())
Expand Down Expand Up @@ -523,11 +534,6 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
return true;
}

void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
}

MsgPackSchemaReader::MsgPackSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IRowSchemaReader(buf, format_settings_), buf(in_), number_of_columns(format_settings_.msgpack.number_of_columns)
{
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/MsgPackRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class MsgPackRowInputFormat : public IRowInputFormat
String getName() const override { return "MagPackRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
MsgPackRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_, const FormatSettings & settings);
Expand Down
17 changes: 9 additions & 8 deletions src/Processors/Formats/Impl/RegexpRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,16 @@ RegexpRowInputFormat::RegexpRowInputFormat(
{
}

void RegexpRowInputFormat::resetParser()
void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IRowInputFormat::setReadBuffer(*buf);
}

void RegexpRowInputFormat::resetReadBuffer()
{
IRowInputFormat::resetParser();
buf->reset();
buf.reset();
IRowInputFormat::resetReadBuffer();
}

bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns)
Expand Down Expand Up @@ -129,11 +135,6 @@ bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
return true;
}

void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
}

RegexpSchemaReader::RegexpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IRowSchemaReader(
buf,
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/RegexpRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class RegexpRowInputFormat final : public IRowInputFormat
RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_);

String getName() const override { return "RegexpRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
RegexpRowInputFormat(std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, Params params_, const FormatSettings & format_settings_);
Expand Down
9 changes: 5 additions & 4 deletions src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(

void TabSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
}

void TabSeparatedRowInputFormat::resetParser()
void TabSeparatedRowInputFormat::resetReadBuffer()
{
RowInputFormatWithNamesAndTypes::resetParser();
buf->reset();
buf.reset();
RowInputFormatWithNamesAndTypes::resetReadBuffer();
}

TabSeparatedFormatReader::TabSeparatedFormatReader(PeekableReadBuffer & in_, const FormatSettings & format_settings_, bool is_raw_)
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class TabSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes
String getName() const override { return "TabSeparatedRowInputFormat"; }

void setReadBuffer(ReadBuffer & in_) override;
void resetParser() override;
void resetReadBuffer() override;

private:
TabSeparatedRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> in_, const Params & params_,
Expand Down
11 changes: 9 additions & 2 deletions src/Processors/Formats/Impl/TemplateRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,19 @@ void TemplateRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
end_of_stream = false;
buf->reset();
}

void TemplateRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithDiagnosticInfo::setReadBuffer(*buf);
format_reader->setReadBuffer(*buf);
}

void TemplateRowInputFormat::resetReadBuffer()
{
buf.reset();
RowInputFormatWithDiagnosticInfo::resetReadBuffer();
}

TemplateFormatReader::TemplateFormatReader(
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Formats/Impl/TemplateRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class TemplateRowInputFormat final : public RowInputFormatWithDiagnosticInfo
String getName() const override { return "TemplateRowInputFormat"; }

void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
TemplateRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_,
Expand All @@ -50,8 +52,6 @@ class TemplateRowInputFormat final : public RowInputFormatWithDiagnosticInfo

bool isGarbageAfterField(size_t after_col_idx, ReadBuffer::Position pos) override;

void setReadBuffer(ReadBuffer & in_) override;

std::unique_ptr<PeekableReadBuffer> buf;
const DataTypes data_types;

Expand Down

0 comments on commit f1f6a41

Please sign in to comment.