From 71a9d8568edf3044af28756d22b44b7bcb1ac7ca Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Mon, 23 Mar 2020 12:21:58 +0100 Subject: [PATCH] max.poll.interval.ms should only be enforced when using subscribe() For https://github.com/confluentinc/confluent-kafka-dotnet/issues/1220 --- src/rdkafka.c | 2 +- src/rdkafka_cgrp.c | 27 +++--- tests/0091-max_poll_interval_timeout.c | 115 +++++++++++++++++++++++-- 3 files changed, 126 insertions(+), 18 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index 7fdf3f095..9980bdcb8 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2036,7 +2036,7 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf, rd_interval_init(&rk->rk_suppress.sparse_connect_random); mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain); - rd_atomic64_init(&rk->rk_ts_last_poll, INT64_MAX); + rd_atomic64_init(&rk->rk_ts_last_poll, rd_clock()); rk->rk_rep = rd_kafka_q_new(rk); rk->rk_ops = rd_kafka_q_new(rk); diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index a6c4b2f7a..54aad9360 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1884,18 +1884,21 @@ rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg, rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_STARTED); - /* Start a timer to enforce `max.poll.interval.ms`. - * Instead of restarting the timer on each ...poll() call, - * which would be costly (once per message), set up an - * intervalled timer that checks a timestamp - * (that is updated on ..poll()). - * The timer interval is 2 hz. */ - - rd_kafka_timer_start(&rkcg->rkcg_rk->rk_timers, - &rkcg->rkcg_max_poll_interval_tmr, - 500 * 1000ll /* 500ms */, - rd_kafka_cgrp_max_poll_interval_check_tmr_cb, - rkcg); + if (rkcg->rkcg_subscription) { + /* If using subscribe(), start a timer to enforce + * `max.poll.interval.ms`. + * Instead of restarting the timer on each ...poll() + * call, which would be costly (once per message), + * set up an intervalled timer that checks a timestamp + * (that is updated on ..poll()). + * The timer interval is 2 hz. */ + rd_kafka_timer_start( + &rkcg->rkcg_rk->rk_timers, + &rkcg->rkcg_max_poll_interval_tmr, + 500 * 1000ll /* 500ms */, + rd_kafka_cgrp_max_poll_interval_check_tmr_cb, + rkcg); + } for (i = 0 ; i < assignment->cnt ; i++) { rd_kafka_topic_partition_t *rktpar = diff --git a/tests/0091-max_poll_interval_timeout.c b/tests/0091-max_poll_interval_timeout.c index 00dc049f2..b624c2f8e 100644 --- a/tests/0091-max_poll_interval_timeout.c +++ b/tests/0091-max_poll_interval_timeout.c @@ -113,21 +113,19 @@ static void rebalance_cb (rd_kafka_t *rk, #define _CONSUMER_CNT 2 -int main_0091_max_poll_interval_timeout (int argc, char **argv) { - const char *topic = test_mk_topic_name("0091_max_poll_interval_tmout", - 1); +static void do_test_with_subscribe (const char *topic) { int64_t testid; const int msgcnt = 3; struct _consumer c[_CONSUMER_CNT] = RD_ZERO_INIT; rd_kafka_conf_t *conf; + TEST_SAY(_C_MAG "[ Test max.poll.interval.ms with subscribe() ]\n"); + testid = test_id_generate(); test_conf_init(&conf, NULL, 10 + (int)(processing_time/1000000) * msgcnt); - test_create_topic(NULL, topic, 2, 1); - /* Produce extra messages since we can't fully rely on the * random partitioner to provide exact distribution. */ test_produce_msgs_easy(topic, testid, -1, msgcnt * _CONSUMER_CNT * 2); @@ -194,5 +192,112 @@ int main_0091_max_poll_interval_timeout (int argc, char **argv) { rd_kafka_destroy(c[0].rk); rd_kafka_destroy(c[1].rk); + TEST_SAY(_C_GRN + "[ Test max.poll.interval.ms with subscribe(): PASS ]\n"); +} + + +/** + * @brief Verify that max.poll.interval.ms does NOT kick in + * when just using assign() and not subscribe(). + */ +static void do_test_with_assign (const char *topic) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + rd_kafka_message_t *rkm; + + TEST_SAY(_C_MAG "[ Test max.poll.interval.ms with assign() ]\n"); + + test_conf_init(&conf, NULL, 60); + + test_create_topic(NULL, topic, 2, 1); + + test_conf_set(conf, "session.timeout.ms", "6000"); + test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/); + + rk = test_create_consumer(topic, NULL, conf, NULL); + + test_consumer_assign_partition("ASSIGN", rk, topic, 0, + RD_KAFKA_OFFSET_END); + + + /* Sleep for longer than max.poll.interval.ms */ + rd_sleep(10); + + /* Make sure no error was raised */ + while ((rkm = rd_kafka_consumer_poll(rk, 0))) { + TEST_ASSERT(!rkm->err, + "Unexpected consumer error: %s: %s", + rd_kafka_err2name(rkm->err), + rd_kafka_message_errstr(rkm)); + + rd_kafka_message_destroy(rkm); + } + + + test_consumer_close(rk); + rd_kafka_destroy(rk); + + TEST_SAY(_C_GRN + "[ Test max.poll.interval.ms with assign(): PASS ]\n"); +} + + +/** + * @brief Verify that max.poll.interval.ms kicks in even if + * the application hasn't called poll once. + */ +static void do_test_no_poll (const char *topic) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + rd_kafka_message_t *rkm; + rd_bool_t raised = rd_false; + + TEST_SAY(_C_MAG "[ Test max.poll.interval.ms without calling poll ]\n"); + + test_conf_init(&conf, NULL, 60); + + test_create_topic(NULL, topic, 2, 1); + + test_conf_set(conf, "session.timeout.ms", "6000"); + test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/); + + rk = test_create_consumer(topic, NULL, conf, NULL); + + test_consumer_subscribe(rk, topic); + + /* Sleep for longer than max.poll.interval.ms */ + rd_sleep(10); + + /* Make sure the error is raised */ + while ((rkm = rd_kafka_consumer_poll(rk, 0))) { + if (rkm->err == RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED) + raised = rd_true; + + rd_kafka_message_destroy(rkm); + } + + TEST_ASSERT(raised, "Expected to have seen ERR__MAX_POLL_EXCEEDED"); + + test_consumer_close(rk); + rd_kafka_destroy(rk); + + TEST_SAY(_C_GRN + "[ Test max.poll.interval.ms without calling poll: PASS ]\n"); +} + + +int main_0091_max_poll_interval_timeout (int argc, char **argv) { + const char *topic = test_mk_topic_name("0091_max_poll_interval_tmout", + 1); + + test_create_topic(NULL, topic, 2, 1); + + do_test_with_subscribe(topic); + + do_test_with_assign(topic); + + do_test_no_poll(topic); + return 0; }