Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always read and insert Kafka messages as a whole #6950

Merged
merged 6 commits into from Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/SquashingTransform.cpp
Expand Up @@ -16,7 +16,7 @@ SquashingTransform::Result SquashingTransform::add(MutableColumns && columns)
if (columns.empty())
return Result(std::move(accumulated_columns));

/// Just read block is alredy enough.
/// Just read block is already enough.
if (isEnoughSize(columns))
{
/// If no accumulated data, return just read block.
Expand Down
8 changes: 2 additions & 6 deletions dbms/src/Formats/FormatFactory.cpp
Expand Up @@ -83,7 +83,6 @@ BlockInputStreamPtr FormatFactory::getInput(
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback) const
{
if (name == "Native")
Expand All @@ -98,11 +97,10 @@ BlockInputStreamPtr FormatFactory::getInput(
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings);

return input_getter(
buf, sample, context, max_block_size, rows_portion_size, callback ? callback : ReadCallback(), format_settings);
return input_getter(buf, sample, context, max_block_size, callback ? callback : ReadCallback(), format_settings);
}

auto format = getInputFormat(name, buf, sample, context, max_block_size, rows_portion_size, std::move(callback));
auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));
return std::make_shared<InputStreamFromInputFormat>(std::move(format));
}

Expand Down Expand Up @@ -150,7 +148,6 @@ InputFormatPtr FormatFactory::getInputFormat(
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback) const
{
const auto & input_getter = getCreators(name).input_processor_creator;
Expand All @@ -164,7 +161,6 @@ InputFormatPtr FormatFactory::getInputFormat(
params.max_block_size = max_block_size;
params.allow_errors_num = format_settings.input_allow_errors_num;
params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
params.rows_portion_size = rows_portion_size;
params.callback = std::move(callback);
params.max_execution_time = settings.max_execution_time;
params.timeout_overflow_mode = settings.timeout_overflow_mode;
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Formats/FormatFactory.h
Expand Up @@ -51,7 +51,6 @@ class FormatFactory final : private boost::noncopyable
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback,
const FormatSettings & settings)>;

Expand Down Expand Up @@ -96,7 +95,6 @@ class FormatFactory final : private boost::noncopyable
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size = 0,
ReadCallback callback = {}) const;

BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
Expand All @@ -108,7 +106,6 @@ class FormatFactory final : private boost::noncopyable
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size = 0,
ReadCallback callback = {}) const;

OutputFormatPtr getOutputFormat(
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Formats/NativeFormat.cpp
Expand Up @@ -13,7 +13,6 @@ void registerInputFormatNative(FormatFactory & factory)
const Block & sample,
const Context &,
UInt64 /* max_block_size */,
UInt64 /* min_read_rows */,
FormatFactory::ReadCallback /* callback */,
const FormatSettings &)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Formats/tests/tab_separated_streams.cpp
Expand Up @@ -39,7 +39,7 @@ try

FormatSettings format_settings;

RowInputFormatParams params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, 0, []{}};
RowInputFormatParams params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, []{}};

InputFormatPtr input_format = std::make_shared<TabSeparatedRowInputFormat>(sample, in_buf, params, false, false, format_settings);
BlockInputStreamPtr block_input = std::make_shared<InputStreamFromInputFormat>(std::move(input_format));
Expand Down
39 changes: 4 additions & 35 deletions dbms/src/Processors/Formats/IRowInputFormat.cpp
Expand Up @@ -20,8 +20,10 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}

namespace
{

static bool isParseError(int code)
bool isParseError(int code)
{
return code == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED
|| code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING
Expand All @@ -33,34 +35,8 @@ static bool isParseError(int code)
|| code == ErrorCodes::TOO_LARGE_STRING_SIZE;
}


static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}


static bool checkTimeLimit(const IRowInputFormat::Params & params, const Stopwatch & stopwatch)
{
if (params.max_execution_time != 0
&& stopwatch.elapsed() > static_cast<UInt64>(params.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(params.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(params.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);

return true;
}


Chunk IRowInputFormat::generate()
{
if (total_rows == 0)
Expand All @@ -76,15 +52,8 @@ Chunk IRowInputFormat::generate()

try
{
for (size_t rows = 0, batch = 0; rows < params.max_block_size; ++rows, ++batch)
for (size_t rows = 0; rows < params.max_block_size; ++rows)
{
if (params.rows_portion_size && batch == params.rows_portion_size)
{
batch = 0;
if (!checkTimeLimit(params, total_stopwatch) || isCancelled())
break;
}

try
{
++total_rows;
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Processors/Formats/IRowInputFormat.h
Expand Up @@ -27,8 +27,6 @@ struct RowInputFormatParams
UInt64 allow_errors_num;
Float64 allow_errors_ratio;

UInt64 rows_portion_size;

using ReadCallback = std::function<void()>;
ReadCallback callback;

Expand Down Expand Up @@ -85,4 +83,3 @@ class IRowInputFormat : public IInputFormat
};

}

88 changes: 65 additions & 23 deletions dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp
Expand Up @@ -49,10 +49,13 @@ void KafkaBlockInputStream::readPrefixImpl()

buffer->subscribe(storage.getTopics());

const auto & limits_ = getLimits();
const size_t poll_timeout = buffer->pollTimeout();
size_t rows_portion_size = poll_timeout ? std::min<size_t>(max_block_size, limits_.max_execution_time.totalMilliseconds() / poll_timeout) : max_block_size;
rows_portion_size = std::max(rows_portion_size, 1ul);
broken = true;
}

Block KafkaBlockInputStream::readImpl()
{
if (!buffer)
return Block();

auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support
auto read_callback = [this]
Expand All @@ -67,33 +70,72 @@ void KafkaBlockInputStream::readPrefixImpl()
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
};

auto child = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size, read_callback);
child->setLimits(limits_);
addChild(child);
auto merge_blocks = [] (Block & block1, Block && block2)
{
if (!block1)
{
// Need to make sure that resulting block has the same structure
block1 = std::move(block2);
return;
}

if (!block2)
return;

auto columns1 = block1.mutateColumns();
auto columns2 = block2.mutateColumns();
for (size_t i = 0, s = columns1.size(); i < s; ++i)
columns1[i]->insertRangeFrom(*columns2[i], 0, columns2[i]->size());
block1.setColumns(std::move(columns1));
};

broken = true;
}
auto read_kafka_message = [&, this]
{
Block result;
auto child = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback);
const auto virtual_header = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"});

Block KafkaBlockInputStream::readImpl()
{
if (!buffer)
return Block();
while (auto block = child->read())
{
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
virtual_columns = virtual_header.cloneEmptyColumns();

Block block = children.back()->read();
if (!block)
return block;
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
block.insert(column);

Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneWithColumns(std::move(virtual_columns));
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns();
/// FIXME: materialize MATERIALIZED columns here.

merge_blocks(result, std::move(block));
}

return result;
};

Block single_block;

UInt64 total_rows = 0;
while (total_rows < max_block_size)
{
auto new_block = read_kafka_message();
auto new_rows = new_block.rows();
total_rows += new_rows;
merge_blocks(single_block, std::move(new_block));

buffer->allowNext();

for (const auto & column : virtual_block.getColumnsWithTypeAndName())
block.insert(column);
if (!new_rows || !checkTimeLimit())
break;
}

/// FIXME: materialize MATERIALIZED columns here.
if (!single_block)
return Block();

return ConvertingBlockInputStream(
context, std::make_shared<OneBlockInputStream>(block), getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name)
context,
std::make_shared<OneBlockInputStream>(single_block),
getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name)
.read();
}

Expand Down
13 changes: 2 additions & 11 deletions dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
Expand Up @@ -13,15 +13,13 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
char delimiter_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer(consumer_)
, log(log_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, intermediate_commit(intermediate_commit_)
, delimiter(delimiter_)
, stopped(stopped_)
, current(messages.begin())
{
Expand Down Expand Up @@ -140,16 +138,9 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
if (stalled || stopped)
if (stalled || stopped || !allowed)
return false;

if (put_delimiter)
{
BufferBase::set(&delimiter, 1, 0);
put_delimiter = false;
return true;
}

if (current == messages.end())
{
if (intermediate_commit)
Expand Down Expand Up @@ -181,7 +172,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
// XXX: very fishy place with const casting.
auto new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
BufferBase::set(new_position, current->get_payload().get_size(), 0);
put_delimiter = (delimiter != 0);
allowed = false;

/// Since we can poll more messages than we already processed - commit only processed messages.
consumer->store_offset(*current);
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h
Expand Up @@ -25,10 +25,10 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
char delimiter_,
const std::atomic<bool> & stopped_);
~ReadBufferFromKafkaConsumer() override;

void allowNext() { allowed = true; } // Allow to read next message.
void commit(); // Commit all processed messages.
void subscribe(const Names & topics); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
Expand All @@ -51,9 +51,7 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
const size_t poll_timeout = 0;
bool stalled = false;
bool intermediate_commit = true;

char delimiter;
bool put_delimiter = false;
bool allowed = true;

const std::atomic<bool> & stopped;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Kafka/StorageKafka.cpp
Expand Up @@ -278,7 +278,7 @@ ConsumerBufferPtr StorageKafka::createReadBuffer()
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();

/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit, row_delimiter, stream_cancelled);
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit, stream_cancelled);
}


Expand Down