Skip to content

Commit

Permalink
Enforce session timeout even if coordinator connection is down
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Jan 18, 2021
1 parent 5de22a7 commit 863a50b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -76,6 +76,9 @@ librdkafka v1.6.0 is feature release:
a number of edge cases for the consumer where the behaviour was previously
undefined.
* Partition fetch state was not set to STOPPED if OffsetCommit failed.
* The session timeout is now enforced locally also when the coordinator
connection is down, which was not previously the case.


### Producer fixes

Expand Down
8 changes: 5 additions & 3 deletions src/rdkafka_cgrp.c
Expand Up @@ -4945,9 +4945,6 @@ static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) {
break;

case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
if (rd_kafka_cgrp_session_timeout_check(rkcg, now))
return;
/* FALLTHRU */
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL:
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
Expand Down Expand Up @@ -4993,6 +4990,11 @@ void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg) {
if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk)))
return;

/* Check session timeout regardless of current coordinator
* connection state (rkcg_state) */
if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY)
rd_kafka_cgrp_session_timeout_check(rkcg, now);

retry:
switch (rkcg->rkcg_state)
{
Expand Down

0 comments on commit 863a50b

Please sign in to comment.