Skip to content

Commit

Permalink
master: Fix duplicate message with
Browse files Browse the repository at this point in the history
cooperative assignor
  • Loading branch information
emasab committed Apr 3, 2024
1 parent 595189c commit c6c9d2e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
17 changes: 15 additions & 2 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -2299,7 +2299,21 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp,
int flag,
rd_kafka_replyq_t replyq) {
int32_t version;
rd_kafka_op_t *rko;
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);

if (!pause) {
/* If partitions isn't paused, avoid bumping its version,
* as it'll result in resuming fetches from a stale
* next_fetch_start */
rd_bool_t paused = rd_false;
rd_kafka_toppar_lock(rktp);
paused = RD_KAFKA_TOPPAR_IS_PAUSED(rktp);
rd_kafka_toppar_unlock(rktp);
if (!paused) {
rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
}

/* Bump version barrier. */
version = rd_kafka_toppar_version_new_barrier(rktp);
Expand All @@ -2310,7 +2324,6 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, version);

rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
rko->rko_version = version;
rko->rko_u.pause.pause = pause;
rko->rko_u.pause.flag = flag;
Expand Down
19 changes: 18 additions & 1 deletion tests/0050-subscribe_adds.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@
* * Verify that all messages from all three topics are consumed
* * Subscribe to T1,T3
* * Verify that there were no duplicate messages.
*
* @param partition_assignment_strategy Assignment strategy to test.
*/
static void
test_no_duplicate_messages(const char *partition_assignment_strategy) {

int main_0050_subscribe_adds(int argc, char **argv) {
SUB_TEST("%s", partition_assignment_strategy);
rd_kafka_t *rk;
#define TOPIC_CNT 3
char *topic[TOPIC_CNT] = {
Expand Down Expand Up @@ -81,6 +85,8 @@ int main_0050_subscribe_adds(int argc, char **argv) {

test_conf_init(&conf, &tconf, 60);
test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
test_conf_set(conf, "partition.assignment.strategy",
partition_assignment_strategy);

rk = test_create_consumer(topic[0], NULL, conf, tconf);

Expand Down Expand Up @@ -121,5 +127,16 @@ int main_0050_subscribe_adds(int argc, char **argv) {
for (i = 0; i < TOPIC_CNT; i++)
rd_free(topic[i]);

SUB_TEST_PASS();
}

int main_0050_subscribe_adds(int argc, char **argv) {

test_no_duplicate_messages("range");

test_no_duplicate_messages("roundrobin");

test_no_duplicate_messages("cooperative-sticky");

return 0;
}

0 comments on commit c6c9d2e

Please sign in to comment.