Skip to content

Commit

Permalink
Return broker error messages for create topic, create partitions
Browse files Browse the repository at this point in the history
On failing to create a Topic or adding partitions, if the broker returns an error message,
this is now passed back into the JS error.

fixes #538

Note : currently librdkafka is returning a fixed error message when create partitions fails,
that will being fixed with a separate PR to librdkafka see
confluentinc/librdkafka#2154

Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
  • Loading branch information
edoardocomar and mimaison committed Dec 13, 2018
1 parent a176dc9 commit f51b951
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions src/admin.cc
Expand Up @@ -232,9 +232,14 @@ Baton AdminClient::CreateTopic(rd_kafka_NewTopic_t* topic, int timeout_ms) {
for (int i = 0 ; i < static_cast<int>(created_topic_count) ; i++) {
const rd_kafka_topic_result_t *terr = restopics[i];
const rd_kafka_resp_err_t errcode = rd_kafka_topic_result_error(terr);
const char *errmsg = rd_kafka_topic_result_error_string(terr);

if (errcode != RD_KAFKA_EVENT_CREATETOPICS_RESULT) {
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
if (errmsg) {
return Baton(static_cast<RdKafka::ErrorCode>(errcode), std::string(errmsg));
} else {
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
}
}
}

Expand Down Expand Up @@ -375,9 +380,14 @@ Baton AdminClient::CreatePartitions(
for (int i = 0 ; i < static_cast<int>(created_partitions_topic_count) ; i++) { // NOLINT
const rd_kafka_topic_result_t *terr = restopics[i];
const rd_kafka_resp_err_t errcode = rd_kafka_topic_result_error(terr);
const char *errmsg = rd_kafka_topic_result_error_string(terr);

if (errcode != RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT) {
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
if (errmsg) {
return Baton(static_cast<RdKafka::ErrorCode>(errcode), std::string(errmsg));
} else {
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
}
}
}

Expand Down

0 comments on commit f51b951

Please sign in to comment.