Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sql created named collections in Kafka Storage for librdkafka settings #59710

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
117 changes: 62 additions & 55 deletions src/Storages/Kafka/StorageKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,64 +246,83 @@ namespace
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
const String CONFIG_NAME_TAG = "name";

void setKafkaConfigValue(cppkafka::Configuration & kafka_config, const String & key, const String & value)
{
if (key.starts_with(CONFIG_KAFKA_TOPIC_TAG) || key == CONFIG_NAME_TAG) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
return; /// used by new per-topic configuration, ignore

/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (key == "log_level") ? key : boost::replace_all_copy(key, "_", ".");
kafka_config.set(setting_name_in_kafka_config, value);
}

/// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String& collection_name, const String & config_prefix)
{
if (!collection_name.empty())
{
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(-1, config_prefix))
{
// Cut prefix with '.' before actual config tag.
const auto param_name = key.substr(config_prefix.size() + 1);
setKafkaConfigValue(kafka_config, param_name, collection->get<String>(key));
}
return;
}

/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);

for (const auto & tag : tags)
{
if (tag.starts_with(CONFIG_KAFKA_TOPIC_TAG)) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
continue; /// used by new per-topic configuration, ignore

const String setting_path = config_prefix + "." + tag;
const String setting_value = config.getString(setting_path);

/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (tag == "log_level") ? tag : boost::replace_all_copy(tag, "_", ".");
kafka_config.set(setting_name_in_kafka_config, setting_value);
const String setting_path = fmt::format("{}.{}", config_prefix, tag);
setKafkaConfigValue(kafka_config, tag, config.getString(setting_path));
}
}

/// Read server configuration into cppkafa configuration, used by new per-topic configuration
void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const String & topic)
void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String& collection_name, const String& config_prefix, const String& topic)
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);

for (const auto & tag : tags)
if (!collection_name.empty())
{
/// Only consider tag <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!tag.starts_with(CONFIG_KAFKA_TOPIC_TAG))
continue;
const auto topic_prefix = fmt::format("{}.{}", config_prefix, CONFIG_KAFKA_TOPIC_TAG);
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(1, config_prefix))
{
/// Only consider key <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!key.starts_with(topic_prefix))
continue;

/// Read topic name between <name>...</name>
const String kafka_topic_path = config_prefix + "." + tag;
const String kafpa_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
const String kafka_topic_path = config_prefix + "." + key;
const String kafka_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
if (topic == collection->get<String>(kafka_topic_name_path))
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(kafka_config, config, collection_name, kafka_topic_path);
}
}
else
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);

const String topic_name = config.getString(kafpa_topic_name_path);
if (topic_name == topic)
for (const auto & tag : tags)
{
/// Found it! Now read the per-topic configuration into cppkafka.
Poco::Util::AbstractConfiguration::Keys inner_tags;
config.keys(kafka_topic_path, inner_tags);
for (const auto & inner_tag : inner_tags)
{
if (inner_tag == CONFIG_NAME_TAG)
continue; // ignore <name>
/// Only consider tag <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!tag.starts_with(CONFIG_KAFKA_TOPIC_TAG))
continue;

/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_path = kafka_topic_path + "." + inner_tag;
const String setting_value = config.getString(setting_path);
/// Read topic name between <name>...</name>
const String kafka_topic_path = fmt::format("{}.{}", config_prefix, tag);
const String kafka_topic_name_path = fmt::format("{}.{}", kafka_topic_path, CONFIG_NAME_TAG);

const String setting_name_in_kafka_config = (inner_tag == "log_level") ? inner_tag : boost::replace_all_copy(inner_tag, "_", ".");
kafka_config.set(setting_name_in_kafka_config, setting_value);
}
const String topic_name = config.getString(kafka_topic_name_path);
if (topic_name == topic)
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(kafka_config, config, collection_name, kafka_topic_path);
}
}
}
Expand Down Expand Up @@ -728,13 +747,6 @@ size_t StorageKafka::getPollTimeoutMillisecond() const
: getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds();
}

String StorageKafka::getConfigPrefix() const
{
if (!collection_name.empty())
return "named_collections." + collection_name + "." + CONFIG_KAFKA_TAG; /// Add one more level to separate librdkafka configuration.
return CONFIG_KAFKA_TAG;
}

void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
{
// Update consumer configuration from the configuration. Example:
Expand All @@ -743,9 +755,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka>
const auto & config = getContext()->getConfigRef();
auto config_prefix = getConfigPrefix();
if (config.has(config_prefix))
loadFromConfig(kafka_config, config, config_prefix);
loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG);

#if USE_KRB5
if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
Expand Down Expand Up @@ -784,9 +794,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
// as <kafka> are ugly.
for (const auto & topic : topics)
{
const auto topic_config_key = config_prefix + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(kafka_config, config, topic_config_key);
loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG + "_" + topic);
}

// Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball":
Expand All @@ -805,8 +813,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
// Advantages: The period restriction no longer applies (e.g. <name>sports.football</name> will work), everything
// Kafka-related is below <kafka>.
for (const auto & topic : topics)
if (config.has(config_prefix))
loadTopicConfig(kafka_config, config, config_prefix, topic);
loadTopicConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topic);

// No need to add any prefix, messages can be distinguished
kafka_config.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message)
Expand All @@ -817,7 +824,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)

/// NOTE: statistics should be consumed, otherwise it creates too much
/// entries in the queue, that leads to memory leak and slow shutdown.
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
if (!kafka_config.has_property("statistics.interval.ms"))
{
// every 3 seconds by default. set to 0 to disable.
kafka_config.set("statistics.interval.ms", "3000");
Expand Down
1 change: 0 additions & 1 deletion src/Storages/Kafka/StorageKafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ class StorageKafka final : public IStorage, WithContext
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & kafka_config);

String getConfigPrefix() const;
void threadFunc(size_t idx);

size_t getPollMaxBatchSize() const;
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_storage_kafka/configs/kafka.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
<debug>cgrp,consumer,topic,protocol</debug>

<!-- librdkafka stat in system.kafka_consumers -->
<!-- default 3000 (every three second) seems too long for test -->
<statistics_interval_ms>600</statistics_interval_ms>
<!-- default 3000 (every three second) seems too long for test -->
<statistics_interval_ms>600</statistics_interval_ms>

<kafka_topic>
<name>consumer_hang</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ create_keytabs() {
kadmin.local -q "addprinc -randkey kafkauser/instance@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@${REALM}"

kadmin.local -q "addprinc -randkey anotherkafkauser/instance@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab anotherkafkauser/instance@${REALM}"

chmod g+r /tmp/keytab/clickhouse.keytab

}
Expand Down
52 changes: 52 additions & 0 deletions tests/integration/test_storage_kerberized_kafka/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,58 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
assert instance.contains_in_log("KerberosInit failure:")


def test_kafka_config_from_sql_named_collection(kafka_cluster):
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
'{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
],
)

instance.query(
"""
CREATE NAMED COLLECTION kafka_config AS
kafka.security_protocol = 'SASL_PLAINTEXT',
kafka.sasl_mechanism = 'GSSAPI',
kafka.sasl_kerberos_service_name = 'kafka',
kafka.sasl_kerberos_keytab = '/tmp/keytab/clickhouse.keytab',
kafka.sasl_kerberos_principal = 'anotherkafkauser/instance@TEST.CLICKHOUSE.TECH',
kafka.debug = 'security',
kafka.api_version_request = 'false',

kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
instance.query(
"""
CREATE TABLE test.kafka (field String)
ENGINE = Kafka(kafka_config);
"""
)

time.sleep(3)

result = instance.query("SELECT * FROM test.kafka;")
expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
)


if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
Expand Down