Skip to content

Commit

Permalink
Support sql created named collections in Kafka Storage for librdkafka…
Browse files Browse the repository at this point in the history
… settings
  • Loading branch information
GrigoryPervakov committed Feb 6, 2024
1 parent f68d7f9 commit 4492bc6
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 62 deletions.
123 changes: 64 additions & 59 deletions src/Storages/Kafka/StorageKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,64 +246,81 @@ namespace
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
const String CONFIG_NAME_TAG = "name";

void setKafkaConfigValue(LoggerPtr log, cppkafka::Configuration & kafka_config, String key, String value) {
if (key.starts_with(CONFIG_KAFKA_TOPIC_TAG) || key == CONFIG_NAME_TAG) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
return; /// used by new per-topic configuration, ignore

/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (key == "log_level") ? key : boost::replace_all_copy(key, "_", ".");
LOG_INFO(log, "Set kafka_config {}={}", setting_name_in_kafka_config, value);
kafka_config.set(setting_name_in_kafka_config, value);
}

/// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
void loadFromConfig(LoggerPtr log, cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, String collection_name, const String & config_prefix)
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);

for (const auto & tag : tags)
if (!collection_name.empty()){
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(-1, config_prefix)) {
// Cut prefix with '.' before actual config tag.
const auto param_name = key.substr(config_prefix.size() + 1);
setKafkaConfigValue(log, kafka_config, std::move(param_name), collection->get<String>(key));
}
}
else
{
if (tag.starts_with(CONFIG_KAFKA_TOPIC_TAG)) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
continue; /// used by new per-topic configuration, ignore
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);

const String setting_path = config_prefix + "." + tag;
const String setting_value = config.getString(setting_path);

/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (tag == "log_level") ? tag : boost::replace_all_copy(tag, "_", ".");
kafka_config.set(setting_name_in_kafka_config, setting_value);
for (const auto & tag : tags)
{
const String setting_path = config_prefix + "." + tag;
const String setting_value = config.getString(setting_path);
setKafkaConfigValue(log, kafka_config, tag, std::move(setting_value));
}
}
}

/// Read server configuration into cppkafa configuration, used by new per-topic configuration
void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const String & topic)
void loadTopicConfig(LoggerPtr log, cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, String collection_name, const String & config_prefix, const String & topic)
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);

for (const auto & tag : tags)
if (!collection_name.empty())
{
/// Only consider tag <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!tag.starts_with(CONFIG_KAFKA_TOPIC_TAG))
continue;
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(1, config_prefix)) {
/// Only consider key <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!key.starts_with(config_prefix + "." + CONFIG_KAFKA_TOPIC_TAG))
continue;

/// Read topic name between <name>...</name>
const String kafka_topic_path = config_prefix + "." + tag;
const String kafpa_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
const String kafka_topic_path = config_prefix + "." + key;
const String kafpa_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
if (topic == collection->get<String>(kafpa_topic_name_path))
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(log, kafka_config, config, collection_name, kafka_topic_path);
}
}
else
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);

const String topic_name = config.getString(kafpa_topic_name_path);
if (topic_name == topic)
for (const auto & tag : tags)
{
/// Found it! Now read the per-topic configuration into cppkafka.
Poco::Util::AbstractConfiguration::Keys inner_tags;
config.keys(kafka_topic_path, inner_tags);
for (const auto & inner_tag : inner_tags)
{
if (inner_tag == CONFIG_NAME_TAG)
continue; // ignore <name>
/// Only consider tag <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!tag.starts_with(CONFIG_KAFKA_TOPIC_TAG))
continue;

/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_path = kafka_topic_path + "." + inner_tag;
const String setting_value = config.getString(setting_path);
/// Read topic name between <name>...</name>
const String kafka_topic_path = config_prefix + "." + tag;
const String kafpa_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;

const String setting_name_in_kafka_config = (inner_tag == "log_level") ? inner_tag : boost::replace_all_copy(inner_tag, "_", ".");
kafka_config.set(setting_name_in_kafka_config, setting_value);
}
const String topic_name = config.getString(kafpa_topic_name_path);
if (topic_name == topic)
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(log, kafka_config, config, collection_name, kafka_topic_path);
}
}
}
Expand Down Expand Up @@ -728,13 +745,6 @@ size_t StorageKafka::getPollTimeoutMillisecond() const
: getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds();
}

String StorageKafka::getConfigPrefix() const
{
if (!collection_name.empty())
return "named_collections." + collection_name + "." + CONFIG_KAFKA_TAG; /// Add one more level to separate librdkafka configuration.
return CONFIG_KAFKA_TAG;
}

void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
{
// Update consumer configuration from the configuration. Example:
Expand All @@ -743,9 +753,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka>
const auto & config = getContext()->getConfigRef();
auto config_prefix = getConfigPrefix();
if (config.has(config_prefix))
loadFromConfig(kafka_config, config, config_prefix);
loadFromConfig(log, kafka_config, config, collection_name, CONFIG_KAFKA_TAG);

#if USE_KRB5
if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
Expand Down Expand Up @@ -784,9 +792,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
// as <kafka> are ugly.
for (const auto & topic : topics)
{
const auto topic_config_key = config_prefix + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(kafka_config, config, topic_config_key);
loadFromConfig(log, kafka_config, config, collection_name, CONFIG_KAFKA_TAG + "_" + topic);
}

// Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball":
Expand All @@ -805,8 +811,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
// Advantages: The period restriction no longer applies (e.g. <name>sports.football</name> will work), everything
// Kafka-related is below <kafka>.
for (const auto & topic : topics)
if (config.has(config_prefix))
loadTopicConfig(kafka_config, config, config_prefix, topic);
loadTopicConfig(log, kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topic);

// No need to add any prefix, messages can be distinguished
kafka_config.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message)
Expand All @@ -817,7 +822,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)

/// NOTE: statistics should be consumed, otherwise it creates too much
/// entries in the queue, that leads to memory leak and slow shutdown.
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
if (!kafka_config.has_property("statistics.interval.ms"))
{
// every 3 seconds by default. set to 0 to disable.
kafka_config.set("statistics.interval.ms", "3000");
Expand Down
1 change: 0 additions & 1 deletion src/Storages/Kafka/StorageKafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ class StorageKafka final : public IStorage, WithContext
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & kafka_config);

String getConfigPrefix() const;
void threadFunc(size_t idx);

size_t getPollMaxBatchSize() const;
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_storage_kafka/configs/kafka.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
<debug>cgrp,consumer,topic,protocol</debug>

<!-- librdkafka stat in system.kafka_consumers -->
<!-- default 3000 (every three second) seems too long for test -->
<statistics_interval_ms>600</statistics_interval_ms>
<!-- default 3000 (every three second) seems too long for test -->
<statistics_interval_ms>600</statistics_interval_ms>

<kafka_topic>
<name>consumer_hang</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ create_keytabs() {
kadmin.local -q "addprinc -randkey kafkauser/instance@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@${REALM}"

kadmin.local -q "addprinc -randkey anotherkafkauser/instance@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab anotherkafkauser/instance@${REALM}"

chmod g+r /tmp/keytab/clickhouse.keytab

}
Expand Down
52 changes: 52 additions & 0 deletions tests/integration/test_storage_kerberized_kafka/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,58 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
assert instance.contains_in_log("KerberosInit failure:")


def test_kafka_config_from_sql_named_collection(kafka_cluster):
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
'{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
],
)

instance.query(
"""
CREATE NAMED COLLECTION kafka_config AS
kafka.security_protocol = 'SASL_PLAINTEXT',
kafka.sasl_mechanism = 'GSSAPI',
kafka.sasl_kerberos_service_name = 'kafka',
kafka.sasl_kerberos_keytab = '/tmp/keytab/clickhouse.keytab',
kafka.sasl_kerberos_principal = 'anotherkafkauser/instance@TEST.CLICKHOUSE.TECH',
kafka.debug = 'security',
kafka.api_version_request = 'false',
kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
instance.query(
"""
CREATE TABLE test.kafka (field String)
ENGINE = Kafka(kafka_config);
"""
)

time.sleep(3)

result = instance.query("SELECT * FROM test.kafka;")
expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
)


if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
Expand Down

0 comments on commit 4492bc6

Please sign in to comment.