Skip to content

Commit

Permalink
Include metadata refactor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed May 16, 2023
1 parent 8e2f470 commit 3f50a9d
Show file tree
Hide file tree
Showing 8 changed files with 979 additions and 976 deletions.
10 changes: 7 additions & 3 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,7 @@ static int ut_assignors(void) {
/* Run through test cases */
for (i = 0; tests[i].name; i++) {
int ie, it, im;
rd_kafka_metadata_internal_t metadata_internal;
rd_kafka_metadata_t metadata;
rd_kafka_group_member_t *members;

Expand Down Expand Up @@ -1338,9 +1339,12 @@ static int ut_assignors(void) {
}

/* Run assignor */
err = rd_kafka_assignor_run(
rk->rk_cgrp, rkas, &metadata, members,
tests[i].member_cnt, errstr, sizeof(errstr));
metadata_internal.metadata = metadata;
err = rd_kafka_assignor_run(
rk->rk_cgrp, rkas,
(rd_kafka_metadata_t *)(&metadata_internal),
members, tests[i].member_cnt, errstr,
sizeof(errstr));

RD_UT_ASSERT(!err, "Assignor case %s for %s failed: %s",
tests[i].name,
Expand Down
7 changes: 4 additions & 3 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
/**
* @brief Id comparator for rd_kafka_metadata_broker_internal_t
*/
int rd_kafka_metadata_broker_internal_cmp(const void *_a,
const void *_b) {
int rd_kafka_metadata_broker_internal_cmp(const void *_a, const void *_b) {
const rd_kafka_metadata_broker_internal_t *a = _a;
const rd_kafka_metadata_broker_internal_t *b = _b;
return RD_CMP(a->id, b->id);
Expand Down Expand Up @@ -1453,6 +1452,8 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics,
memset(mdi, 0, sizeof(*mdi));
md = &mdi->metadata;

md->broker_cnt = num_brokers;

md->topic_cnt = (int)topic_cnt;
md->topics =
rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*md->topics));
Expand Down Expand Up @@ -1483,7 +1484,7 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics,
md->topics[i].partitions[j].id = j;
mdi->topics[i].partitions[j].id = j;
mdi->topics[i].partitions[j].leader_epoch = -1;
md->topics[i].partitions[j].id = j;
md->topics[i].partitions[j].id = j;

/* In case replication_factor is not given, don't set
* replicas. */
Expand Down
1,907 changes: 938 additions & 969 deletions src/rdkafka_range_assignor.c

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/rdkafka_sticky_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -3148,5 +3148,6 @@ rd_kafka_resp_err_t rd_kafka_sticky_assignor_init(rd_kafka_t *rk) {
rd_kafka_sticky_assignor_get_metadata,
rd_kafka_sticky_assignor_on_assignment_cb,
rd_kafka_sticky_assignor_state_destroy,
rd_kafka_sticky_assignor_unittest, NULL);
rd_kafka_sticky_assignor_unittest,
NULL);
}
9 changes: 9 additions & 0 deletions src/rdlist.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ void *rd_list_add(rd_list_t *rl, void *elem) {
return rl->rl_elems[rl->rl_cnt++];
}

void *rd_list_add_const(rd_list_t *rl, const void *elem) {
if (rl->rl_cnt == rl->rl_size)
rd_list_grow(rl, rl->rl_size ? rl->rl_size * 2 : 16);
rl->rl_flags &= ~RD_LIST_F_SORTED;
if (elem)
rl->rl_elems[rl->rl_cnt] = elem;
return rl->rl_elems[rl->rl_cnt++];
}

void rd_list_set(rd_list_t *rl, int idx, void *ptr) {
if (idx >= rl->rl_size)
rd_list_grow(rl, idx + 1);
Expand Down
9 changes: 9 additions & 0 deletions src/rdlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ void rd_list_free_cb(rd_list_t *rl, void *ptr);
void *rd_list_add(rd_list_t *rl, void *elem);


/**
* @brief Append const pointer element to list
*
* @returns \p elem. If \p elem is NULL the default element for that index
* will be returned (for use with set_elems).
*/
void *rd_list_add_const(rd_list_t *rl, const void *elem);


/**
* @brief Set element at \p idx to \p ptr.
*
Expand Down
8 changes: 8 additions & 0 deletions src/rdstring.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ int rd_strcmp(const char *a, const char *b) {
}


/**
* @brief Same as rd_strcmp() but works with rd_list comparator.
*/
int rd_strcmp2(const void *a, const void *b) {
return rd_strcmp((const char *)a, (const char *)b);
}



/**
* @brief Case-insensitive strstr() for platforms where strcasestr()
Expand Down
2 changes: 2 additions & 0 deletions src/rdstring.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ unsigned int rd_string_hash(const char *str, ssize_t len);

int rd_strcmp(const char *a, const char *b);

int rd_strcmp2(const void *a, const void *b);

char *_rd_strcasestr(const char *haystack, const char *needle);

char **rd_string_split(const char *input,
Expand Down

0 comments on commit 3f50a9d

Please sign in to comment.