Skip to content

Commit

Permalink
max.poll.interval.ms should only be enforced when using subscribe()
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Mar 25, 2020
1 parent f810e8b commit 71a9d85
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/rdkafka.c
Expand Up @@ -2036,7 +2036,7 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
rd_interval_init(&rk->rk_suppress.sparse_connect_random);
mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain);

rd_atomic64_init(&rk->rk_ts_last_poll, INT64_MAX);
rd_atomic64_init(&rk->rk_ts_last_poll, rd_clock());

rk->rk_rep = rd_kafka_q_new(rk);
rk->rk_ops = rd_kafka_q_new(rk);
Expand Down
27 changes: 15 additions & 12 deletions src/rdkafka_cgrp.c
Expand Up @@ -1884,18 +1884,21 @@ rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg,
rd_kafka_cgrp_set_join_state(rkcg,
RD_KAFKA_CGRP_JOIN_STATE_STARTED);

/* Start a timer to enforce `max.poll.interval.ms`.
* Instead of restarting the timer on each ...poll() call,
* which would be costly (once per message), set up an
* intervalled timer that checks a timestamp
* (that is updated on ..poll()).
* The timer interval is 2 hz. */

rd_kafka_timer_start(&rkcg->rkcg_rk->rk_timers,
&rkcg->rkcg_max_poll_interval_tmr,
500 * 1000ll /* 500ms */,
rd_kafka_cgrp_max_poll_interval_check_tmr_cb,
rkcg);
if (rkcg->rkcg_subscription) {
/* If using subscribe(), start a timer to enforce
* `max.poll.interval.ms`.
* Instead of restarting the timer on each ...poll()
* call, which would be costly (once per message),
* set up an intervalled timer that checks a timestamp
* (that is updated on ..poll()).
* The timer interval is 2 hz. */
rd_kafka_timer_start(
&rkcg->rkcg_rk->rk_timers,
&rkcg->rkcg_max_poll_interval_tmr,
500 * 1000ll /* 500ms */,
rd_kafka_cgrp_max_poll_interval_check_tmr_cb,
rkcg);
}

for (i = 0 ; i < assignment->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar =
Expand Down
115 changes: 110 additions & 5 deletions tests/0091-max_poll_interval_timeout.c
Expand Up @@ -113,21 +113,19 @@ static void rebalance_cb (rd_kafka_t *rk,


#define _CONSUMER_CNT 2
int main_0091_max_poll_interval_timeout (int argc, char **argv) {
const char *topic = test_mk_topic_name("0091_max_poll_interval_tmout",
1);
static void do_test_with_subscribe (const char *topic) {
int64_t testid;
const int msgcnt = 3;
struct _consumer c[_CONSUMER_CNT] = RD_ZERO_INIT;
rd_kafka_conf_t *conf;

TEST_SAY(_C_MAG "[ Test max.poll.interval.ms with subscribe() ]\n");

testid = test_id_generate();

test_conf_init(&conf, NULL,
10 + (int)(processing_time/1000000) * msgcnt);

test_create_topic(NULL, topic, 2, 1);

/* Produce extra messages since we can't fully rely on the
* random partitioner to provide exact distribution. */
test_produce_msgs_easy(topic, testid, -1, msgcnt * _CONSUMER_CNT * 2);
Expand Down Expand Up @@ -194,5 +192,112 @@ int main_0091_max_poll_interval_timeout (int argc, char **argv) {
rd_kafka_destroy(c[0].rk);
rd_kafka_destroy(c[1].rk);

TEST_SAY(_C_GRN
"[ Test max.poll.interval.ms with subscribe(): PASS ]\n");
}


/**
* @brief Verify that max.poll.interval.ms does NOT kick in
* when just using assign() and not subscribe().
*/
static void do_test_with_assign (const char *topic) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_message_t *rkm;

TEST_SAY(_C_MAG "[ Test max.poll.interval.ms with assign() ]\n");

test_conf_init(&conf, NULL, 60);

test_create_topic(NULL, topic, 2, 1);

test_conf_set(conf, "session.timeout.ms", "6000");
test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);

rk = test_create_consumer(topic, NULL, conf, NULL);

test_consumer_assign_partition("ASSIGN", rk, topic, 0,
RD_KAFKA_OFFSET_END);


/* Sleep for longer than max.poll.interval.ms */
rd_sleep(10);

/* Make sure no error was raised */
while ((rkm = rd_kafka_consumer_poll(rk, 0))) {
TEST_ASSERT(!rkm->err,
"Unexpected consumer error: %s: %s",
rd_kafka_err2name(rkm->err),
rd_kafka_message_errstr(rkm));

rd_kafka_message_destroy(rkm);
}


test_consumer_close(rk);
rd_kafka_destroy(rk);

TEST_SAY(_C_GRN
"[ Test max.poll.interval.ms with assign(): PASS ]\n");
}


/**
* @brief Verify that max.poll.interval.ms kicks in even if
* the application hasn't called poll once.
*/
static void do_test_no_poll (const char *topic) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_message_t *rkm;
rd_bool_t raised = rd_false;

TEST_SAY(_C_MAG "[ Test max.poll.interval.ms without calling poll ]\n");

test_conf_init(&conf, NULL, 60);

test_create_topic(NULL, topic, 2, 1);

test_conf_set(conf, "session.timeout.ms", "6000");
test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);

rk = test_create_consumer(topic, NULL, conf, NULL);

test_consumer_subscribe(rk, topic);

/* Sleep for longer than max.poll.interval.ms */
rd_sleep(10);

/* Make sure the error is raised */
while ((rkm = rd_kafka_consumer_poll(rk, 0))) {
if (rkm->err == RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED)
raised = rd_true;

rd_kafka_message_destroy(rkm);
}

TEST_ASSERT(raised, "Expected to have seen ERR__MAX_POLL_EXCEEDED");

test_consumer_close(rk);
rd_kafka_destroy(rk);

TEST_SAY(_C_GRN
"[ Test max.poll.interval.ms without calling poll: PASS ]\n");
}


int main_0091_max_poll_interval_timeout (int argc, char **argv) {
const char *topic = test_mk_topic_name("0091_max_poll_interval_tmout",
1);

test_create_topic(NULL, topic, 2, 1);

do_test_with_subscribe(topic);

do_test_with_assign(topic);

do_test_no_poll(topic);

return 0;
}

0 comments on commit 71a9d85

Please sign in to comment.