Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setting to Kafka Engine to control number of parsing errors #4094

Merged
merged 3 commits into from Jan 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions dbms/src/Storages/Kafka/KafkaSettings.h
Expand Up @@ -23,11 +23,12 @@ struct KafkaSettings
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
M(SettingString, kafka_format, "", "Message format for Kafka engine.") \
M(SettingString, kafka_format, "", "The message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.")
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block")

#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};
Expand Down
35 changes: 32 additions & 3 deletions dbms/src/Storages/Kafka/StorageKafka.cpp
Expand Up @@ -139,6 +139,12 @@ class KafkaBlockInputStream : public IProfilingBlockInputStream
{
// Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_skip_unknown_fields", 1u);

// We don't use ratio since the number of Kafka messages may vary from stream to stream.
// Thus, ratio is meaningless.
context.setSetting("input_format_allow_errors_ratio", 1.);
context.setSetting("input_format_allow_errors_num", storage.skip_broken);

if (schema.size() > 0)
context.setSetting("format_schema", schema);
}
Expand Down Expand Up @@ -248,7 +254,7 @@ StorageKafka::StorageKafka(
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, size_t max_block_size_)
size_t num_consumers_, size_t max_block_size_, size_t skip_broken_)
: IStorage{columns_},
table_name(table_name_), database_name(database_name_), global_context(context_),
topics(global_context.getMacros()->expand(topics_)),
Expand All @@ -258,7 +264,8 @@ StorageKafka::StorageKafka(
row_delimiter(row_delimiter_),
schema_name(global_context.getMacros()->expand(schema_name_)),
num_consumers(num_consumers_), max_block_size(max_block_size_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_), mutex(), consumers()
semaphore(0, num_consumers_), mutex(), consumers(),
skip_broken(skip_broken_)
{
task = global_context.getSchedulePool().createTask(log->name(), [this]{ streamThread(); });
task->deactivate();
Expand Down Expand Up @@ -538,6 +545,8 @@ void registerStorageKafka(StorageFactory & factory)
* - Row delimiter
* - Schema (optional, if the format supports it)
* - Number of consumers
* - Max block size for background consumption
* - Skip (at least) unreadable messages number
*/

// Check arguments and settings
Expand Down Expand Up @@ -571,6 +580,7 @@ void registerStorageKafka(StorageFactory & factory)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size)
CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages)
#undef CHECK_KAFKA_STORAGE_ARGUMENT

// Get and check broker list
Expand Down Expand Up @@ -728,6 +738,7 @@ void registerStorageKafka(StorageFactory & factory)
}
else
{
// TODO: no check if the integer is really positive
throw Exception("Maximum block size must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
Expand All @@ -736,9 +747,27 @@ void registerStorageKafka(StorageFactory & factory)
max_block_size = static_cast<size_t>(kafka_settings.kafka_max_block_size.value);
}

size_t skip_broken = 0;
if (args_count >= 9)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[8].get());
if (ast && ast->value.getType() == Field::Types::UInt64)
{
skip_broken = static_cast<size_t>(safeGet<UInt64>(ast->value));
}
else
{
throw Exception("Number of broken messages to skip must be a non-negative integer", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_skip_broken_messages.changed)
{
skip_broken = static_cast<size_t>(kafka_settings.kafka_skip_broken_messages.value);
}

return StorageKafka::create(
args.table_name, args.database_name, args.context, args.columns,
brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size);
brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken);
});
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Kafka/StorageKafka.h
Expand Up @@ -80,6 +80,8 @@ friend class KafkaBlockOutputStream;
std::mutex mutex;
std::vector<ConsumerPtr> consumers; /// Available consumers

size_t skip_broken;

// Stream thread
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
Expand All @@ -101,7 +103,7 @@ friend class KafkaBlockOutputStream;
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, size_t max_block_size_);
size_t num_consumers_, size_t max_block_size_, size_t skip_broken);
};

}
Expand Down
45 changes: 45 additions & 0 deletions dbms/tests/instructions/kafka.txt
@@ -0,0 +1,45 @@
Use this config for docker-compose:

version: '3'

services:

kafka:
depends_on:
- zookeeper
hostname: kafka
image: wurstmeister/kafka
environment:
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ports:
- "9092:9092"
- "9094:9094"

security_opt:
- label:disable

zookeeper:
hostname: zookeeper
image: zookeeper

security_opt:
- label:disable

Start containers with `docker-compose up`.

In clickhouse-client create table like:

CREATE TABLE kafka ( a UInt8, b String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'CSV') SETTINGS kafka_row_delimiter = '\n';

Login inside Kafka container and stream some data:

docker exec -it <kafka_container_id> bash --login
vi data.csv
cat data.csv | /opt/kafka/bin/kafka-console-producer.sh --topic topic --broker-list localhost:9092

Read data in clickhouse:

SELECT * FROM kafka;