Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #57438 to 23.3: Fix working with read buffers in StreamingFormatExecutor #58433

Merged
merged 4 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 0 additions & 27 deletions src/IO/PeekableReadBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,6 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_
checkStateCorrect();
}

void PeekableReadBuffer::reset()
{
checkStateCorrect();
}

void PeekableReadBuffer::setSubBuffer(ReadBuffer & sub_buf_)
{
sub_buf = &sub_buf_;
resetImpl();
}

void PeekableReadBuffer::resetImpl()
{
peeked_size = 0;
checkpoint = std::nullopt;
checkpoint_in_own_memory = false;
use_stack_memory = true;

if (!currentlyReadFromOwnMemory())
sub_buf->position() = pos;

Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());

checkStateCorrect();
}

bool PeekableReadBuffer::peekNext()
{
checkStateCorrect();
Expand Down
6 changes: 0 additions & 6 deletions src/IO/PeekableReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer>
/// This data will be lost after destruction of peekable buffer.
bool hasUnreadData() const;

// for streaming reading (like in Kafka) we need to restore initial state of the buffer
// without recreating the buffer.
void reset();

void setSubBuffer(ReadBuffer & sub_buf_);

const ReadBuffer & getSubBuffer() const { return *sub_buf; }

private:
Expand Down
6 changes: 0 additions & 6 deletions src/Interpreters/AsynchronousInsertQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ try
log_elements.reserve(data->entries.size());

StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
std::unique_ptr<ReadBuffer> last_buffer;
auto chunk_info = std::make_shared<ChunkOffsets>();
for (const auto & entry : data->entries)
{
Expand All @@ -440,10 +439,6 @@ try
total_rows += executor.execute(*buffer);
chunk_info->offsets.push_back(total_rows);

/// Keep buffer, because it still can be used
/// in destructor, while resetting buffer at next iteration.
last_buffer = std::move(buffer);

if (insert_log)
{
AsynchronousInsertLogElement elem;
Expand All @@ -470,7 +465,6 @@ try
}
}

format->addBuffer(std::move(last_buffer));
auto insert_query_id = insert_context->getCurrentQueryId();

if (total_rows == 0)
Expand Down
11 changes: 5 additions & 6 deletions src/Processors/Executors/StreamingFormatExecutor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <base/scope_guard.h>
#include <iostream>

namespace DB
Expand Down Expand Up @@ -35,15 +36,13 @@ MutableColumns StreamingFormatExecutor::getResultColumns()

size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
{
auto & initial_buf = format->getReadBuffer();
format->setReadBuffer(buffer);
size_t rows = execute();

/// Format destructor can touch read buffer (for example when we use PeekableReadBuffer),
/// but we cannot control lifetime of provided read buffer. To avoid heap use after free
/// we can set initial read buffer back, because initial read buffer was created before
/// format, so it will be destructed after it.
format->setReadBuffer(initial_buf);
return rows;
/// we call format->resetReadBuffer() method that resets all buffers inside format.
SCOPE_EXIT(format->resetReadBuffer());
return execute();
}

size_t StreamingFormatExecutor::execute()
Expand Down
4 changes: 3 additions & 1 deletion src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class IInputFormat : public ISource
virtual void resetParser();

virtual void setReadBuffer(ReadBuffer & in_);
ReadBuffer & getReadBuffer() const { return *in; }
virtual void resetReadBuffer() { in = nullptr; }

virtual const BlockMissingValues & getMissingValues() const
{
Expand All @@ -61,6 +61,8 @@ class IInputFormat : public ISource
virtual size_t getApproxBytesReadForChunk() const { return 0; }

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

ColumnMappingPtr column_mapping{};

InputFormatErrorsLoggerPtr errors_logger;
Expand Down
9 changes: 5 additions & 4 deletions src/Processors/Formats/Impl/CSVRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ void CSVRowInputFormat::syncAfterError()

void CSVRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
}

void CSVRowInputFormat::resetParser()
void CSVRowInputFormat::resetReadBuffer()
{
RowInputFormatWithNamesAndTypes::resetParser();
buf->reset();
buf.reset();
RowInputFormatWithNamesAndTypes::resetReadBuffer();
}

static void skipEndOfLine(ReadBuffer & in)
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/CSVRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CSVRowInputFormat : public RowInputFormatWithNamesAndTypes
String getName() const override { return "CSVRowInputFormat"; }

void setReadBuffer(ReadBuffer & in_) override;
void resetParser() override;
void resetReadBuffer() override;

protected:
CSVRowInputFormat(const Block & header_, std::shared_ptr<PeekableReadBuffer> in_, const Params & params_,
Expand Down
15 changes: 8 additions & 7 deletions src/Processors/Formats/Impl/CustomSeparatedRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,20 @@ void CustomSeparatedRowInputFormat::syncAfterError()

void CustomSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
}

CustomSeparatedFormatReader::CustomSeparatedFormatReader(
PeekableReadBuffer & buf_, bool ignore_spaces_, const FormatSettings & format_settings_)
: FormatWithNamesAndTypesReader(buf_, format_settings_), buf(&buf_), ignore_spaces(ignore_spaces_)
void CustomSeparatedRowInputFormat::resetReadBuffer()
{
buf.reset();
RowInputFormatWithNamesAndTypes::resetReadBuffer();
}

void CustomSeparatedRowInputFormat::resetParser()
CustomSeparatedFormatReader::CustomSeparatedFormatReader(
PeekableReadBuffer & buf_, bool ignore_spaces_, const FormatSettings & format_settings_)
: FormatWithNamesAndTypesReader(buf_, format_settings_), buf(&buf_), ignore_spaces(ignore_spaces_)
{
RowInputFormatWithNamesAndTypes::resetParser();
buf->reset();
}

void CustomSeparatedFormatReader::skipPrefixBeforeHeader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class CustomSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTyp
const Params & params_,
bool with_names_, bool with_types_, bool ignore_spaces_, const FormatSettings & format_settings_);

void resetParser() override;
String getName() const override { return "CustomSeparatedRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
CustomSeparatedRowInputFormat(
Expand Down
19 changes: 10 additions & 9 deletions src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ JSONAsRowInputFormat::JSONAsRowInputFormat(const Block & header_, std::unique_pt
header_.columns());
}

void JSONAsRowInputFormat::resetParser()

void JSONAsRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IRowInputFormat::setReadBuffer(*buf);
}

void JSONAsRowInputFormat::resetReadBuffer()
{
IRowInputFormat::resetParser();
buf->reset();
buf.reset();
IRowInputFormat::resetReadBuffer();
}

void JSONAsRowInputFormat::readPrefix()
Expand Down Expand Up @@ -97,12 +104,6 @@ bool JSONAsRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
return !buf->eof();
}

void JSONAsRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
}


JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_)
: JSONAsRowInputFormat(header_, in_, params_)
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class JSONAsRowInputFormat : public IRowInputFormat
public:
JSONAsRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);

void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
JSONAsRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_);
Expand Down
18 changes: 12 additions & 6 deletions src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,21 @@ MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, std::unique_
void MsgPackRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf->reset();
visitor.reset();
}

void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IRowInputFormat::setReadBuffer(*buf);
}

void MsgPackRowInputFormat::resetReadBuffer()
{
buf.reset();
IRowInputFormat::resetReadBuffer();
}

void MsgPackVisitor::set_info(IColumn & column, DataTypePtr type, UInt8 & read) // NOLINT
{
while (!info_stack.empty())
Expand Down Expand Up @@ -523,11 +534,6 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
return true;
}

void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
}

MsgPackSchemaReader::MsgPackSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IRowSchemaReader(buf, format_settings_), buf(in_), number_of_columns(format_settings_.msgpack.number_of_columns)
{
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/MsgPackRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class MsgPackRowInputFormat : public IRowInputFormat
String getName() const override { return "MagPackRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
MsgPackRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_, const FormatSettings & settings);
Expand Down
17 changes: 9 additions & 8 deletions src/Processors/Formats/Impl/RegexpRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,16 @@ RegexpRowInputFormat::RegexpRowInputFormat(
{
}

void RegexpRowInputFormat::resetParser()
void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IRowInputFormat::setReadBuffer(*buf);
}

void RegexpRowInputFormat::resetReadBuffer()
{
IRowInputFormat::resetParser();
buf->reset();
buf.reset();
IRowInputFormat::resetReadBuffer();
}

bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns)
Expand Down Expand Up @@ -129,11 +135,6 @@ bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
return true;
}

void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
}

RegexpSchemaReader::RegexpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IRowSchemaReader(
buf,
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/RegexpRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class RegexpRowInputFormat final : public IRowInputFormat
RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_);

String getName() const override { return "RegexpRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
RegexpRowInputFormat(std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, Params params_, const FormatSettings & format_settings_);
Expand Down
9 changes: 5 additions & 4 deletions src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(

void TabSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
}

void TabSeparatedRowInputFormat::resetParser()
void TabSeparatedRowInputFormat::resetReadBuffer()
{
RowInputFormatWithNamesAndTypes::resetParser();
buf->reset();
buf.reset();
RowInputFormatWithNamesAndTypes::resetReadBuffer();
}

TabSeparatedFormatReader::TabSeparatedFormatReader(PeekableReadBuffer & in_, const FormatSettings & format_settings_, bool is_raw_)
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class TabSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes
String getName() const override { return "TabSeparatedRowInputFormat"; }

void setReadBuffer(ReadBuffer & in_) override;
void resetParser() override;
void resetReadBuffer() override;

private:
TabSeparatedRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> in_, const Params & params_,
Expand Down
11 changes: 9 additions & 2 deletions src/Processors/Formats/Impl/TemplateRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,19 @@ void TemplateRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
end_of_stream = false;
buf->reset();
}

void TemplateRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithDiagnosticInfo::setReadBuffer(*buf);
format_reader->setReadBuffer(*buf);
}

void TemplateRowInputFormat::resetReadBuffer()
{
buf.reset();
RowInputFormatWithDiagnosticInfo::resetReadBuffer();
}

TemplateFormatReader::TemplateFormatReader(
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Formats/Impl/TemplateRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class TemplateRowInputFormat final : public RowInputFormatWithDiagnosticInfo
String getName() const override { return "TemplateRowInputFormat"; }

void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;

private:
TemplateRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_,
Expand All @@ -50,8 +52,6 @@ class TemplateRowInputFormat final : public RowInputFormatWithDiagnosticInfo

bool isGarbageAfterField(size_t after_col_idx, ReadBuffer::Position pos) override;

void setReadBuffer(ReadBuffer & in_) override;

std::unique_ptr<PeekableReadBuffer> buf;
const DataTypes data_types;

Expand Down