Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cgrp commit cnt and session timeout enforcement #3218

Merged
merged 2 commits into from Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -76,6 +76,9 @@ librdkafka v1.6.0 is feature release:
a number of edge cases for the consumer where the behaviour was previously
undefined.
* Partition fetch state was not set to STOPPED if OffsetCommit failed.
* The session timeout is now enforced locally also when the coordinator
connection is down, which was not previously the case.


### Producer fixes

Expand Down
20 changes: 11 additions & 9 deletions src/rdkafka_cgrp.c
Expand Up @@ -2878,6 +2878,12 @@ static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
rd_kafka_buf_t *rkbuf;
rd_kafka_op_t *reply;

if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) {
/* wait_commit_cnt has already been increased for
* reprocessed ops. */
rkcg->rkcg_rk->rk_consumer.wait_commit_cnt++;
}

/* If offsets is NULL we shall use the current assignment
* (not the group assignment). */
if (!rko->rko_u.offset_commit.partitions &&
Expand Down Expand Up @@ -2909,12 +2915,6 @@ static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_has_absolute_offset, NULL);
}

if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) {
/* wait_commit_cnt has already been increased for
* reprocessed ops. */
rkcg->rkcg_rk->rk_consumer.wait_commit_cnt++;
}

if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
/* Commits are not allowed when a fatal error has been raised */
err = RD_KAFKA_RESP_ERR__FATAL;
Expand Down Expand Up @@ -4945,9 +4945,6 @@ static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) {
break;

case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
if (rd_kafka_cgrp_session_timeout_check(rkcg, now))
return;
/* FALLTHRU */
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL:
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
Expand Down Expand Up @@ -4993,6 +4990,11 @@ void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg) {
if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk)))
return;

/* Check session timeout regardless of current coordinator
* connection state (rkcg_state) */
if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY)
rd_kafka_cgrp_session_timeout_check(rkcg, now);

retry:
switch (rkcg->rkcg_state)
{
Expand Down
71 changes: 71 additions & 0 deletions tests/0106-cgrp_sess_timeout.c
Expand Up @@ -220,6 +220,75 @@ static void do_test_session_timeout (const char *use_commit_type) {
}


/**
* @brief Attempt manual commit when assignment has been lost (#3217)
*/
static void do_test_commit_on_lost (void) {
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_conf_t *conf;
rd_kafka_t *c;
const char *groupid = "mygroup";
const char *topic = "test";
rd_kafka_resp_err_t err;

SUB_TEST();

test_curr->is_fatal_cb = test_error_is_not_fatal_cb;

mcluster = test_mock_cluster_new(3, &bootstraps);

rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);

/* Seed the topic with messages */
test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10,
"bootstrap.servers", bootstraps,
"batch.num.messages", "10",
NULL);

test_conf_init(&conf, NULL, 30);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "security.protocol", "PLAINTEXT");
test_conf_set(conf, "group.id", groupid);
test_conf_set(conf, "session.timeout.ms", "5000");
test_conf_set(conf, "heartbeat.interval.ms", "1000");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "enable.auto.commit", "false");

c = test_create_consumer(groupid, test_rebalance_cb, conf, NULL);

test_consumer_subscribe(c, topic);

/* Consume a couple of messages so that we have something to commit */
test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);

/* Make the coordinator unreachable, this will cause a local session
* timeout followed by a revoke and assignment lost. */
rd_kafka_mock_broker_set_down(mcluster, 1);

/* Wait until the assignment is lost */
TEST_SAY("Waiting for assignment to be lost...\n");
while (!rd_kafka_assignment_lost(c))
rd_sleep(1);

TEST_SAY("Assignment is lost, committing\n");
/* Perform manual commit */
err = rd_kafka_commit(c, NULL, 0/*sync*/);
TEST_SAY("commit() returned: %s\n", rd_kafka_err2name(err));
TEST_ASSERT(err, "expected commit to fail");

test_consumer_close(c);

rd_kafka_destroy(c);

test_mock_cluster_destroy(mcluster);

test_curr->is_fatal_cb = NULL;

SUB_TEST_PASS();
}


int main_0106_cgrp_sess_timeout (int argc, char **argv) {

if (test_needs_auth()) {
Expand All @@ -231,5 +300,7 @@ int main_0106_cgrp_sess_timeout (int argc, char **argv) {
do_test_session_timeout("async");
do_test_session_timeout("auto");

do_test_commit_on_lost();

return 0;
}
9 changes: 9 additions & 0 deletions tests/test.c
Expand Up @@ -554,6 +554,13 @@ void test_socket_enable (rd_kafka_conf_t *conf) {
}
#endif /* WITH_SOCKEM */

/**
* @brief For use as the is_fatal_cb(), treating no errors as test-fatal.
*/
int test_error_is_not_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
const char *reason) {
return 0;
}

static void test_error_cb (rd_kafka_t *rk, int err,
const char *reason, void *opaque) {
Expand Down Expand Up @@ -6192,6 +6199,7 @@ void test_fail0 (const char *file, int line, const char *function,
TEST_LOCK();
test_curr->state = TEST_FAILED;
test_curr->failcnt += 1;
test_curr->is_fatal_cb = NULL;

if (!*test_curr->failstr) {
strncpy(test_curr->failstr, buf, sizeof(test_curr->failstr));
Expand Down Expand Up @@ -6301,4 +6309,5 @@ void test_sub_pass (void) {

TEST_SAY(_C_GRN "[ %s: PASS ]\n", test_curr->subtest);
*test_curr->subtest = '\0';
test_curr->is_fatal_cb = NULL;
}
5 changes: 5 additions & 0 deletions tests/test.h
Expand Up @@ -668,6 +668,11 @@ rd_kafka_mock_cluster_t *test_mock_cluster_new (int broker_cnt,
const char **bootstraps);



int test_error_is_not_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
const char *reason);


/**
* @brief Calls rdkafka function (with arguments)
* and checks its return value (must be rd_kafka_resp_err_t) for
Expand Down