Skip to content

Commit

Permalink
FIx style
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoryPervakov committed Feb 26, 2024
1 parent ad74e63 commit 866c01e
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions src/Storages/Kafka/StorageKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ namespace
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
const String CONFIG_NAME_TAG = "name";

void setKafkaConfigValue(LoggerPtr log, cppkafka::Configuration & kafka_config, const String& key, const String& value) {
void setKafkaConfigValue(LoggerPtr log, cppkafka::Configuration & kafka_config, const String & key, const String & value)
{
if (key.starts_with(CONFIG_KAFKA_TOPIC_TAG) || key == CONFIG_NAME_TAG) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
return; /// used by new per-topic configuration, ignore

Expand All @@ -260,11 +261,13 @@ namespace
/// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration
void loadFromConfig(LoggerPtr log, cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String& collection_name, const String & config_prefix)
{
if (!collection_name.empty()){
if (!collection_name.empty())
{
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(-1, config_prefix)) {
for (const auto & key : collection->getKeys(-1, config_prefix))
{
// Cut prefix with '.' before actual config tag.
const auto param_name = key.substr(config_prefix.size() + 1);
auto param_name = key.substr(config_prefix.size() + 1);
setKafkaConfigValue(log, kafka_config, std::move(param_name), collection->get<String>(key));
}
return;
Expand All @@ -277,8 +280,7 @@ namespace
for (const auto & tag : tags)
{
const String setting_path = fmt::format("{}.{}", config_prefix, tag);
const String setting_value = config.getString(setting_path);
setKafkaConfigValue(log, kafka_config, tag, std::move(setting_value));
setKafkaConfigValue(log, kafka_config, tag, config.getString(setting_path));
}
}

Expand All @@ -289,14 +291,15 @@ namespace
{
const auto topic_prefix = fmt::format("{}.{}", config_prefix, CONFIG_KAFKA_TOPIC_TAG);
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(1, config_prefix)) {
for (const auto & key : collection->getKeys(1, config_prefix))
{
/// Only consider key <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!key.starts_with(topic_prefix))
continue;

const String kafka_topic_path = config_prefix + "." + key;
const String kafpa_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
if (topic == collection->get<String>(kafpa_topic_name_path))
const String kafka_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
if (topic == collection->get<String>(kafka_topic_name_path))
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(log, kafka_config, config, collection_name, kafka_topic_path);
}
Expand All @@ -315,9 +318,9 @@ namespace

/// Read topic name between <name>...</name>
const String kafka_topic_path = fmt::format("{}.{}", config_prefix, tag);
const String kafpa_topic_name_path = fmt::format("{}.{}", kafka_topic_path, CONFIG_NAME_TAG);
const String kafka_topic_name_path = fmt::format("{}.{}", kafka_topic_path, CONFIG_NAME_TAG);

const String topic_name = config.getString(kafpa_topic_name_path);
const String topic_name = config.getString(kafka_topic_name_path);
if (topic_name == topic)
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(log, kafka_config, config, collection_name, kafka_topic_path);
Expand Down

0 comments on commit 866c01e

Please sign in to comment.