From c6c9d2ec1ff219c62f3856e8c884559c48ad7fc4 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 26 Feb 2024 19:21:07 +0100 Subject: [PATCH] master: Fix duplicate message with cooperative assignor --- src/rdkafka_partition.c | 17 +++++++++++++++-- tests/0050-subscribe_adds.c | 19 ++++++++++++++++++- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index e4da331b24..96bd74ed85 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -2299,7 +2299,21 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp, int flag, rd_kafka_replyq_t replyq) { int32_t version; - rd_kafka_op_t *rko; + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE); + + if (!pause) { + /* If partitions isn't paused, avoid bumping its version, + * as it'll result in resuming fetches from a stale + * next_fetch_start */ + rd_bool_t paused = rd_false; + rd_kafka_toppar_lock(rktp); + paused = RD_KAFKA_TOPPAR_IS_PAUSED(rktp); + rd_kafka_toppar_unlock(rktp); + if (!paused) { + rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } /* Bump version barrier. */ version = rd_kafka_toppar_version_new_barrier(rktp); @@ -2310,7 +2324,6 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition, version); - rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE); rko->rko_version = version; rko->rko_u.pause.pause = pause; rko->rko_u.pause.flag = flag; diff --git a/tests/0050-subscribe_adds.c b/tests/0050-subscribe_adds.c index d1e9fc3059..5aeb6541ec 100644 --- a/tests/0050-subscribe_adds.c +++ b/tests/0050-subscribe_adds.c @@ -41,9 +41,13 @@ * * Verify that all messages from all three topics are consumed * * Subscribe to T1,T3 * * Verify that there were no duplicate messages. + * + * @param partition_assignment_strategy Assignment strategy to test. */ +static void +test_no_duplicate_messages(const char *partition_assignment_strategy) { -int main_0050_subscribe_adds(int argc, char **argv) { + SUB_TEST("%s", partition_assignment_strategy); rd_kafka_t *rk; #define TOPIC_CNT 3 char *topic[TOPIC_CNT] = { @@ -81,6 +85,8 @@ int main_0050_subscribe_adds(int argc, char **argv) { test_conf_init(&conf, &tconf, 60); test_topic_conf_set(tconf, "auto.offset.reset", "smallest"); + test_conf_set(conf, "partition.assignment.strategy", + partition_assignment_strategy); rk = test_create_consumer(topic[0], NULL, conf, tconf); @@ -121,5 +127,16 @@ int main_0050_subscribe_adds(int argc, char **argv) { for (i = 0; i < TOPIC_CNT; i++) rd_free(topic[i]); + SUB_TEST_PASS(); +} + +int main_0050_subscribe_adds(int argc, char **argv) { + + test_no_duplicate_messages("range"); + + test_no_duplicate_messages("roundrobin"); + + test_no_duplicate_messages("cooperative-sticky"); + return 0; }