Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Add a Kafka "metadata.broker.list" for each log writer filter.
Browse files Browse the repository at this point in the history
   If a new log filter is added in bro and a specific kafka broker list is defined as:
       $config = table(["metadata.broker.list"] = "host:port")
   this will override the default broker list (only for this specific writer).

   If no specific "metadata.broker.list" for the writer is defined in the log filter, the
   default will be applied as in
      redef Kafka::kafka_conf = table(["metadata.broker.list"] = "host:port");

   Note: all other configuration settings will not be changed.
  • Loading branch information
mauropalumbo75 committed Nov 7, 2019
1 parent abbbc9b commit 7955304
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
12 changes: 12 additions & 0 deletions src/KafkaWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,18 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
}
}

// Allow overriding of the kafka list of brokers via the Bro script constant 'metadata.broker.list'
// which can be applied when adding a new Bro log filter as $config = table(["metadata.broker.list"] = "host:port").
metadata_broker_list_override = GetConfigValue(info, "metadata.broker.list");
if ( !metadata_broker_list_override.empty() ) {
MsgThread::Info(Fmt("Overriding default metadata.broker.list with %s for writer %s.", metadata_broker_list_override.c_str(), info.path));
// apply overriding setting metadata.broker.list to kafka
if (RdKafka::Conf::CONF_OK != conf->set("metadata.broker.list", metadata_broker_list_override, err)) {
Error(Fmt("Failed to set '%s'='%s': %s", "metadata.broker.list", metadata_broker_list_override.c_str(), err.c_str()));
return false;
}
}

if(is_debug) {
string key("debug");
string val(debug);
Expand Down
1 change: 1 addition & 0 deletions src/KafkaWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class KafkaWriter : public WriterBackend {
map<string, string> kafka_conf;
string topic_name;
string topic_name_override;
string metadata_broker_list_override;
threading::formatter::Formatter *formatter;
RdKafka::Producer* producer;
RdKafka::Topic* topic;
Expand Down

0 comments on commit 7955304

Please sign in to comment.