Skip to content

Commit

Permalink
Fail early on duplicate ConfigResource or
Browse files Browse the repository at this point in the history
ConfigEntry before it fails on the broker or
when parsing the result
  • Loading branch information
emasab committed Jun 1, 2023
1 parent 3c746e5 commit fddd40e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
61 changes: 60 additions & 1 deletion src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3364,6 +3364,7 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req,
return reply->rkbuf_err;
}

typedef RD_MAP_TYPE(const char *, const rd_bool_t *) map_str_bool;


void rd_kafka_IncrementalAlterConfigs(rd_kafka_t *rk,
Expand All @@ -3375,6 +3376,8 @@ void rd_kafka_IncrementalAlterConfigs(rd_kafka_t *rk,
size_t i;
rd_kafka_resp_err_t err;
char errstr[256];
rd_bool_t value = rd_true;

static const struct rd_kafka_admin_worker_cbs cbs = {
rd_kafka_IncrementalAlterConfigsRequest,
rd_kafka_IncrementalAlterConfigsResponse_parse,
Expand All @@ -3390,9 +3393,65 @@ void rd_kafka_IncrementalAlterConfigs(rd_kafka_t *rk,
rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt,
rd_kafka_ConfigResource_free);

for (i = 0; i < config_cnt; i++)
/* Check duplicate ConfigResource */
map_str_bool configs_map = RD_MAP_INITIALIZER(
config_cnt, rd_map_str_cmp, rd_map_str_hash, NULL, NULL);

for (i = 0; i < config_cnt; i++) {
size_t len = 4 + strlen(configs[i]->name);
char *key = rd_alloca(len);
const rd_kafka_ConfigEntry_t **entries;
size_t entry_cnt, j;

rd_snprintf(key, len - 1, "%d,%s", configs[i]->restype,
configs[i]->name);
if (RD_MAP_GET(&configs_map, key)) {
/* Duplicate ConfigResource found */
break;
}
RD_MAP_SET(&configs_map, key, &value);
entries =
rd_kafka_ConfigResource_configs(configs[i], &entry_cnt);

/* Check duplicate ConfigEntry */
map_str_bool entries_map = RD_MAP_INITIALIZER(
entry_cnt, rd_map_str_cmp, rd_map_str_hash, NULL, NULL);

for (j = 0; j < entry_cnt; j++) {
const rd_kafka_ConfigEntry_t *entry = entries[j];
const char *key = rd_kafka_ConfigEntry_name(entry);

if (RD_MAP_GET(&entries_map, key)) {
/* Duplicate ConfigEntry found */
break;
}
RD_MAP_SET(&entries_map, key, &value);
}
RD_MAP_DESTROY(&entries_map);

if (j != entry_cnt) {
RD_MAP_DESTROY(&configs_map);
rd_kafka_admin_result_fail(
rko, RD_KAFKA_RESP_ERR__INVALID_ARG,
"Duplicate ConfigEntry found");
rd_kafka_admin_common_worker_destroy(
rk, rko, rd_true /*destroy*/);
return;
}

rd_list_add(&rko->rko_u.admin_request.args,
rd_kafka_ConfigResource_copy(configs[i]));
}

RD_MAP_DESTROY(&configs_map);

if (i != config_cnt) {
rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG,
"Duplicate ConfigResource found");
rd_kafka_admin_common_worker_destroy(rk, rko,
rd_true /*destroy*/);
return;
}

/* If there's a BROKER resource in the list we need to
* speak directly to that broker rather than the controller.
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@


#include "rdstring.h"
#include "rdmap.h"
#include "rdkafka_error.h"
#include "rdkafka_confval.h"

Expand Down

0 comments on commit fddd40e

Please sign in to comment.