Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better tests for Kafka engine #4117

Merged
merged 16 commits into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
url = https://github.com/ClickHouse-Extras/libgsasl.git
[submodule "contrib/cppkafka"]
path = contrib/cppkafka
url = https://github.com/mfontanini/cppkafka.git
url = https://github.com/ClickHouse-Extras/cppkafka.git
[submodule "contrib/pdqsort"]
path = contrib/pdqsort
url = https://github.com/orlp/pdqsort
2 changes: 1 addition & 1 deletion contrib/cppkafka
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/IBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class IBlockInputStream
Block extremes;


void addChild(BlockInputStreamPtr & child)
void addChild(const BlockInputStreamPtr & child)
{
std::unique_lock lock(children_mutex);
children.push_back(child);
Expand Down
52 changes: 52 additions & 0 deletions dbms/src/IO/DelimitedReadBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <IO/ReadBuffer.h>
#include <Common/typeid_cast.h>

namespace DB
{
/// Consistently reads from one sub-buffer in a circle, and delimits its output with a character.
/// Owns sub-buffer.
class DelimitedReadBuffer : public ReadBuffer
{
public:
DelimitedReadBuffer(ReadBuffer * buffer_, char delimiter_) : ReadBuffer(nullptr, 0), buffer(buffer_), delimiter(delimiter_)
{
// TODO: check that `buffer_` is not nullptr.
}

template <class BufferType>
BufferType * subBufferAs()
{
return typeid_cast<BufferType *>(buffer.get());
}

protected:
// XXX: don't know how to guarantee that the next call to this method is done after we read all previous data.
bool nextImpl() override
{
if (put_delimiter)
{
BufferBase::set(&delimiter, 1, 0);
put_delimiter = false;
}
else
{
if (!buffer->next())
return false;

BufferBase::set(buffer->position(), buffer->available(), 0);
put_delimiter = true;
}

return true;
}

private:
std::unique_ptr<ReadBuffer> buffer; // FIXME: should be `const`, but `ReadBuffer` doesn't allow
char delimiter; // FIXME: should be `const`, but `ReadBuffer` doesn't allow

bool put_delimiter = false;
};

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/IO/ReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ReadBuffer : public BufferBase
*/
ReadBuffer(Position ptr, size_t size, size_t offset) : BufferBase(ptr, size, offset) {}

// FIXME: behavior differs greately from `BufferBase::set()` and it's very confusing.
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); working_buffer.resize(0); }

/** read next data and fill a buffer with it; set position to the beginning;
Expand Down
69 changes: 69 additions & 0 deletions dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include <Storages/Kafka/KafkaBlockInputStream.h>

#include <Formats/FormatFactory.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>

namespace DB
{

KafkaBlockInputStream::KafkaBlockInputStream(
StorageKafka & storage_, const Context & context_, const String & schema, UInt64 max_block_size_)
: storage(storage_), context(context_), max_block_size(max_block_size_)
{
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_allow_errors_ratio", 0.);
context.setSetting("input_format_allow_errors_num", storage.skip_broken);

if (!schema.empty())
context.setSetting("format_schema", schema);
}

KafkaBlockInputStream::~KafkaBlockInputStream()
{
if (!claimed)
return;

if (broken)
{
LOG_TRACE(storage.log, "Re-joining claimed consumer after failure");
consumer->unsubscribe();
}

storage.pushConsumer(consumer);
}

void KafkaBlockInputStream::readPrefixImpl()
{
consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
claimed = !!consumer;

if (!consumer)
consumer = std::make_shared<cppkafka::Consumer>(storage.createConsumerConfiguration());

// While we wait for an assignment after subscribtion, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (consumer->get_subscription().empty())
{
using namespace std::chrono_literals;

consumer->pause(); // don't accidentally read any messages
consumer->subscribe(storage.topics);
consumer->poll(5s);
consumer->resume();
}

buffer = std::make_unique<DelimitedReadBuffer>(
new ReadBufferFromKafkaConsumer(consumer, storage.log, max_block_size), storage.row_delimiter);
addChild(FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size));

broken = true;
}

void KafkaBlockInputStream::readSuffixImpl()
{
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit();

broken = false;
}

} // namespace DB
35 changes: 35 additions & 0 deletions dbms/src/Storages/Kafka/KafkaBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <DataStreams/IBlockInputStream.h>
#include <IO/DelimitedReadBuffer.h>
#include <Interpreters/Context.h>

#include <Storages/Kafka/StorageKafka.h>

namespace DB
{

class KafkaBlockInputStream : public IBlockInputStream
{
public:
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_);
~KafkaBlockInputStream() override;

String getName() const override { return storage.getName(); }
Block readImpl() override { return children.back()->read(); }
Block getHeader() const override { return storage.getSampleBlock(); }

void readPrefixImpl() override;
void readSuffixImpl() override;

private:
StorageKafka & storage;
Context context;
UInt64 max_block_size;

ConsumerPtr consumer;
std::unique_ptr<DelimitedReadBuffer> buffer;
bool broken = true, claimed = false;
};

} // namespace DB
53 changes: 53 additions & 0 deletions dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>

namespace DB
{
namespace
{
const auto READ_POLL_MS = 500; /// How long to wait for a batch of messages.
} // namespace

void ReadBufferFromKafkaConsumer::commit()
{
if (messages.empty() || current == messages.begin())
return;

auto & previous = *std::prev(current);
LOG_TRACE(log, "Committing message with offset " << previous.get_offset());
consumer->async_commit(previous);
}

/// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl()
{
if (current == messages.end())
{
commit();
messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(READ_POLL_MS));
current = messages.begin();

LOG_TRACE(log, "Polled batch of " << messages.size() << " messages");
}

if (messages.empty() || current == messages.end())
return false;

if (auto err = current->get_error())
{
++current;

// TODO: should throw exception instead
LOG_ERROR(log, "Consumer error: " << err);
return false;
}

// XXX: very fishy place with const casting.
auto new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
BufferBase::set(new_position, current->get_payload().get_size(), 0);

++current;

return true;
}

} // namespace DB
36 changes: 36 additions & 0 deletions dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <IO/ReadBuffer.h>
#include <common/logger_useful.h>

#include <cppkafka/cppkafka.h>

namespace DB
{
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;

class ReadBufferFromKafkaConsumer : public ReadBuffer
{
public:
ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size)
: ReadBuffer(nullptr, 0), consumer(consumer_), log(log_), batch_size(max_batch_size), current(messages.begin())
{
}

// Commit all processed messages.
void commit();

private:
using Messages = std::vector<cppkafka::Message>;

ConsumerPtr consumer;
Poco::Logger * log;
const size_t batch_size = 1;

Messages messages;
Messages::const_iterator current;

bool nextImpl() override;
};

} // namespace DB