Skip to content
This repository has been archived by the owner on Feb 16, 2022. It is now read-only.

Commit

Permalink
BIT-1586 Fixed thread-safety issues with Kafka Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
nickwallen committed May 10, 2016
1 parent 6bd2ac4 commit b9f1f35
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
64 changes: 40 additions & 24 deletions kafka/src/KafkaWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,35 @@ using namespace writer;

KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), rd_producer(NULL)
{
topic_name.assign(
(const char*)BifConst::Kafka::topic_name->Bytes(),
BifConst::Kafka::topic_name->Len());
// need thread-local copy of all user-defined settings coming from
// bro scripting land. accessing these is not thread-safe and 'DoInit'
// is potentially accessed from multiple threads.

// tag_json - thread local copy
tag_json = BifConst::Kafka::tag_json;

// topic name - thread local copy
topic_name.assign(
(const char*)BifConst::Kafka::topic_name->Bytes(),
BifConst::Kafka::topic_name->Len());

// kafka_conf - thread local copy
Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
IterCookie* c = val->AsTable()->InitForIteration();
HashKey* k;
TableEntryVal* v;
while ((v = val->AsTable()->NextEntry(k, c))) {

// fetch the key and value
ListVal* index = val->AsTableVal()->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString();
string val = v->Value()->AsString()->CheckString();
kafka_conf.insert (kafka_conf.begin(), pair<string, string> (key, val));

// cleanup
Unref(index);
delete k;
}
}

KafkaWriter::~KafkaWriter()
Expand All @@ -27,7 +53,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
}

// initialize the formatter
if(BifConst::Kafka::tag_json) {
if(tag_json) {
formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH);
} else {
formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH);
Expand All @@ -38,26 +64,16 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
rd_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

// apply the user-defined settings to kafka
Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
IterCookie* c = val->AsTable()->InitForIteration();
HashKey* k;
TableEntryVal* v;
while ((v = val->AsTable()->NextEntry(k, c))) {

// fetch the key and value
ListVal* index = val->AsTableVal()->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString();
string val = v->Value()->AsString()->CheckString();

// apply setting to kafka
if (RdKafka::Conf::CONF_OK != rd_conf->set(key, val, err)) {
reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
return false;
}

// cleanup
Unref(index);
delete k;
map<string,string>::iterator i;
for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) {
string key = i->first;
string val = i->second;

// apply setting to kafka
if (RdKafka::Conf::CONF_OK != rd_conf->set(key, val, err)) {
reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
return false;
}
}

// create kafka producer
Expand Down
2 changes: 2 additions & 0 deletions kafka/src/KafkaWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class KafkaWriter : public WriterBackend {
static const string default_topic_key;
string stream_id;
string topic_name;
bool tag_json;
map<string, string> kafka_conf;
threading::formatter::Formatter *formatter;
RdKafka::Producer* rd_producer;
RdKafka::Topic* rd_topic;
Expand Down

0 comments on commit b9f1f35

Please sign in to comment.