Skip to content

Commit

Permalink
Strip changes from cooperative rebalance fix
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Jun 5, 2023
1 parent cb809f7 commit fb2c94f
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 167 deletions.
21 changes: 9 additions & 12 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -3126,12 +3126,11 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
errcnt = rd_kafka_cgrp_update_committed_offsets(rkcg, err, offsets);

if (err != RD_KAFKA_RESP_ERR__DESTROY &&
!((err == RD_KAFKA_RESP_ERR__NO_OFFSET ||
err == RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS) &&
!(err == RD_KAFKA_RESP_ERR__NO_OFFSET &&
rko_orig->rko_u.offset_commit.silent_empty)) {
/* Propagate commit results (success or permanent error)
* unless we're shutting down or commit was empty, or if there
* was a rebalance in progress. */
* unless we're shutting down or commit was empty, or if
* there was a rebalance in progress. */
rd_kafka_cgrp_propagate_commit_result(rkcg, rko_orig, err,
errcnt, offsets);
}
Expand Down Expand Up @@ -3185,14 +3184,6 @@ static void rd_kafka_cgrp_offsets_commit(rd_kafka_cgrp_t *rkcg,
rkcg->rkcg_rk->rk_consumer.wait_commit_cnt++;
}

/* Don't attempt commit when rebalancing or initializing since
* the rkcg_generation_id is most likely in flux. */
if (rkcg->rkcg_subscription &&
rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
err = RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS;
goto err;
}

/* If offsets is NULL we shall use the current assignment
* (not the group assignment). */
if (!rko->rko_u.offset_commit.partitions &&
Expand Down Expand Up @@ -3356,6 +3347,12 @@ static void rd_kafka_cgrp_offset_commit_tmr_cb(rd_kafka_timers_t *rkts,
void *arg) {
rd_kafka_cgrp_t *rkcg = arg;

/* Don't attempt auto commit when rebalancing or initializing since
* the rkcg_generation_id is most likely in flux. */
if (rkcg->rkcg_subscription &&
rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY)
return;

rd_kafka_cgrp_assigned_offsets_commit(
rkcg, NULL, rd_true /*set offsets*/, "cgrp auto commit timer");
}
Expand Down
6 changes: 3 additions & 3 deletions tests/0106-cgrp_sess_timeout.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ static void do_test_session_timeout(const char *use_commit_type) {
/* Consume a couple of messages so that we have something to commit */
test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);

/* The commit in the rebalance callback should fail because we're not
* allowed to commit while rebalancing. */
commit_exp_err = RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS;
/* The commit in the rebalance callback should fail when the
* member has timed out from the group. */
commit_exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;

expect_rebalance("session timeout revoke", c,
RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, 2 + 5 + 2);
Expand Down
148 changes: 0 additions & 148 deletions tests/0141-cooperative_commit_rebalance.c

This file was deleted.

1 change: 0 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ set(
0138-admin_mock.c
0139-offset_validation_mock.c
0140-commit_metadata.cpp
0141-cooperative_commit_rebalance.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
Expand Down
2 changes: 0 additions & 2 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ _TEST_DECL(0137_barrier_batch_consume);
_TEST_DECL(0138_admin_mock);
_TEST_DECL(0139_offset_validation_mock);
_TEST_DECL(0140_commit_metadata);
_TEST_DECL(0141_cooperative_commit_rebalance);


/* Manual tests */
Expand Down Expand Up @@ -499,7 +498,6 @@ struct test tests[] = {
_TEST(0138_admin_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)),
_TEST(0139_offset_validation_mock, 0),
_TEST(0140_commit_metadata, 0),
_TEST(0141_cooperative_commit_rebalance, 0),

/* Manual tests */
_TEST(8000_idle, TEST_F_MANUAL),
Expand Down
1 change: 0 additions & 1 deletion win32/tests/tests.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@
<ClCompile Include="..\..\tests\0138-admin_mock.c" />
<ClCompile Include="..\..\tests\0139-offset_validation_mock.c" />
<ClCompile Include="..\..\tests\0140-commit_metadata.cpp" />
<ClCompile Include="..\..\tests\0141_cooperative_commit_rebalance.c" />
<ClCompile Include="..\..\tests\8000-idle.cpp" />
<ClCompile Include="..\..\tests\8001-fetch_from_follower_mock_manual.c" />
<ClCompile Include="..\..\tests\test.c" />
Expand Down

0 comments on commit fb2c94f

Please sign in to comment.