New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kip 345: Static group membership #2525
Conversation
oddy enough I'm unable to reproduce the [0050_subscribe_adds / 20.764s] Closing consumer TEST 20190913130820 (bare) SUMMARY #==================================================================# | | PASSED | 21.044s | | 0050_subscribe_adds | PASSED | 20.769s | #==================================================================# [ / 21.045s] # Test report written to test_report_20190913130820.json [ / 21.045s] 0 thread(s) in use by librdkafka [ / 21.045s] ============== ALL TESTS PASSED ============== ./merged in bare mode PASSED! |
re 0050_subscribe_adds: if you find an intermittent test failure you can use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's quite a bit of protocol updates you need to run the test suite on an older unsupported broker version (preferably matching the maximum ApiVersion that was used previously) to make sure there is no regression.
Add a small chapter in INTRODUCTION.md on how static membership works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
mostly a bunch of nitty comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there!
tests/0102-static_group_rebalance.c
Outdated
rd_kafka_message_t *rkm; | ||
|
||
rkm = rd_kafka_consumer_poll(cons->rk, 100+(timeout_s*1000)); | ||
if (!rkm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want to be more stringent here, if you're expecting a message and don't get one, then fail the test.
Tests should not be forgiving.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually not interested in the messages at all; rather I want to ensure my callbacks are served from the op queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the test should also verify that messages are received as expected, not only that rebalances happen.
Otherwise we don't have any test coverage for a static member consumer that consumes messages, which is the functionality that we're offering.
tests/0102-static_group_rebalance.c
Outdated
/* 3x heartbeat interval to give time for c[0] to recognize rebalance */ | ||
rd_sleep(9); | ||
|
||
rd_kafka_flush(c[0].rk, 5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flush is really a producer API.
src/rdkafka_cgrp.c
Outdated
@@ -2227,7 +2227,10 @@ static void rd_kafka_cgrp_unassign_done (rd_kafka_cgrp_t *rkcg, | |||
if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)) | |||
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN; | |||
|
|||
/* KIP-345: Static group members do not send LeaveGroupRequests */ | |||
/* | |||
* KIP-345: Static group members will call unsubscribe to release their partions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't referring to the comment itself, but what I meant was if unsubscribe() should trigger a Leave?
Since unsubscribe is an explicit action I think it might make sense to actually do send Leave to speed up reassignment this consumer's partition. What does the Java client do, or does the kip mention anything about this case?
I don't think the new comment is helpful, static group members may or may not call unsubscribe, we dont know and it doesnt matter here, so I like the previous comment better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, that makes much more sense. I was curious why we would want to comment about this after the fact but wasn't sure it was worth questioning.
I agree we should only disable this flag when RD_KAFKA_CGRP_F_TERMINATE
is set. I'll add an additional check and retest.
tests/0102-static_group_rebalance.c
Outdated
rd_kafka_message_t *rkm; | ||
|
||
rkm = rd_kafka_consumer_poll(cons->rk, 100+(timeout_s*1000)); | ||
if (!rkm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the test should also verify that messages are received as expected, not only that rebalances happen.
Otherwise we don't have any test coverage for a static member consumer that consumes messages, which is the functionality that we're offering.
tests/0102-static_group_rebalance.c
Outdated
@@ -177,7 +177,8 @@ int main_0102_static_group_rebalance (int argc, char **argv) { | |||
/* 3x heartbeat interval to give time for c[0] to recognize rebalance */ | |||
rd_sleep(9); | |||
|
|||
rd_kafka_flush(c[0].rk, 5000); | |||
do_consume(c[0].rk, 5/*5s*/); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the function name is not suitable for what we are doing here I suggest you add a comment saying: Wait for rebalance, or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very good!
Just a bunch of small things
src/rdkafka_conf.c
Outdated
@@ -843,6 +843,14 @@ static const struct rd_kafka_property rd_kafka_properties[] = { | |||
_RK(group_id_str), | |||
"Client group id string. All clients sharing the same group.id " | |||
"belong to the same group." }, | |||
{ _RK_GLOBAL|_RK_CGRP|_RK_MED, "group.instance.id", _RK_C_STR, | |||
_RK(group_instance_id), | |||
"Enables static group membership. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Enable", to be consistent with existing properties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort of , I took the liberty of adding a s
when changing this to use Enable
as it sounded better to me. Fixed to your exact specification this time.
INTRODUCTION.md
Outdated
By default Kafka consumers are rebalanced each time a new consumer joins | ||
the group or an existing member leaves. This is what is known as a dynamic | ||
membership. Apache Kafka >= 2.3.0 introduces static membership. | ||
Unlike dynamic membership, members can leave and rejoin a group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe "..membership, static members can leave .." to make it clearer
INTRODUCTION.md
Outdated
the group or an existing member leaves. This is what is known as a dynamic | ||
membership. Apache Kafka >= 2.3.0 introduces static membership. | ||
Unlike dynamic membership, members can leave and rejoin a group | ||
within the `session.timeout.ms` without triggering a rebalance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
" and without loosing their partition assignment", but in proper english, which is the net effect
src/rdkafka.c
Outdated
@@ -3744,7 +3744,6 @@ char *rd_kafka_memberid (const rd_kafka_t *rk) { | |||
return memberid; | |||
} | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespace diff
&c[0].assigned_at, -1); | ||
|
||
TEST_SAY("== Testing max poll violation ==\n"); | ||
/* max.poll.interval.ms should still be enforced by the consumer */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is usually better to move this into its own test (in the same file), which allows the tests to run in parallel (faster!) and makes this test more focused.
But don't change it, just wanted to point it out for future tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noted
tests/0102-static_group_rebalance.c
Outdated
int64_t *target, int timeout_ms) { | ||
int64_t tmout = test_clock() + (timeout_ms * 1000); | ||
|
||
while(timeout_ms < 0 ? 1 : test_clock() <= tmout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whitespace
tests/0102-static_group_rebalance.c
Outdated
break; | ||
|
||
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: | ||
TEST_ASSERT(++c->rebalance_cnt == c->max_rebalance_cnt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never pass arguments with side-effects to (evident) macros, since they may be evaluated multiple times.
Instead do:
c->rebalance_cnt++;
TEST_ASSERT(c->rebalance_cnt == ...
Also update the protocol versions and supported KIPs in INTRODUCTION.md |
Bump to latest Kafka version in .travis.yml since your test depends on >=2.3.0 |
CONFIGURATION.md
Outdated
@@ -88,6 +88,7 @@ oauthbearer_token_refresh_cb | * | | | |||
plugin.library.paths | * | | | low | List of plugin libraries to load (; separated). The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the platform-specific extension (such as .dll or .so) will be appended automatically. <br>*Type: string* | |||
interceptors | * | | | low | Interceptors added through rd_kafka_conf_interceptor_add_..() and any configuration handled by interceptors. <br>*Type: * | |||
group.id | C | | | high | Client group id string. All clients sharing the same group.id belong to the same group. <br>*Type: string* | |||
group.instance.id | C | | | medium | Enables static group membership. Static group members are able to leave and rejoin a group within the configured `session.timeout.ms` without prompting a group rebalance. Each member of the group must have a unique group instance id.Requires broker version >= 2.3.0 <br>*Type: string* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not re-generated after conf.c change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still not addressed.
Please only use Resolve conversation
when things are fixed since it puts more work on the reviewer to untangle the current state of things.
tests/0102-static_group_rebalance.c
Outdated
|
||
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: | ||
c->rebalance_cnt++; | ||
TEST_ASSERT(c->rebalance_cnt == c->max_rebalance_cnt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be rebalance_cnt <= max_rebalance_cnt
?
Otherwise max is not a max
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about renaming it expected_rebalance_cnt
. We want to ensure we do not rebalance too often or not often enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
practically it can only be set to 1 (or rebalance_cnt+1), right?
So maybe change the logic to check for which rebalance err you are getting, .e.g above the switch do:
TEST_ASSERT(c->expected_rebalance_event == err)
which is either ASSIGN_PART.. or REVOKE_PA..
This also makes the test more rigid since it will verify the rebalance event ordering (revoke, assign, revoke, assign)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I miss a rebalance that I expect to happen but it didn't I could end up with a rebalance_cnt of 1 which is less than the expected value of 3. This would falsely advertise that static group membership is working as expected.
We could track assign and revoke although it seems like overkill since I won't have an assignment without a revocation and I won't actually be a member of that group if I don't have an assignment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's sort of my point: we don't want the test to assume and skim over important details such as the order of rebalance events, the more rigid the test the better the test coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider it done
tests/0102-static_group_rebalance.c
Outdated
|
||
test_conf_init(&conf, NULL, 60); | ||
test_conf_set(conf, "session.timeout.ms", "5000"); | ||
test_conf_set(conf, "max.poll.interval.ms", "10001"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that is a bug in the finalization, fix it now or it won't happen.
CONFIGURATION.md
Outdated
@@ -88,6 +88,7 @@ oauthbearer_token_refresh_cb | * | | | |||
plugin.library.paths | * | | | low | List of plugin libraries to load (; separated). The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the platform-specific extension (such as .dll or .so) will be appended automatically. <br>*Type: string* | |||
interceptors | * | | | low | Interceptors added through rd_kafka_conf_interceptor_add_..() and any configuration handled by interceptors. <br>*Type: * | |||
group.id | C | | | high | Client group id string. All clients sharing the same group.id belong to the same group. <br>*Type: string* | |||
group.instance.id | C | | | medium | Enables static group membership. Static group members are able to leave and rejoin a group within the configured `session.timeout.ms` without prompting a group rebalance. Each member of the group must have a unique group instance id.Requires broker version >= 2.3.0 <br>*Type: string* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still not addressed.
Please only use Resolve conversation
when things are fixed since it puts more work on the reviewer to untangle the current state of things.
src/rdkafka_conf.c
Outdated
"Static group members are able to leave and rejoin a group " | ||
"within the configured `session.timeout.ms` without prompting a " | ||
"group rebalance. Each member of the group must have a " | ||
"unique group instance id. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the recommendation (in the KIP?) to also increase the session timeout when this is used?
The current defaults is quite low and optimized for the dynamic group membership.
The timeout should allow for the application to restart (through whatever sluggish orchestration environment that runs it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point.
tests/0102-static_group_rebalance.c
Outdated
1); | ||
char *topics = rd_strdup(tsprintf("^%s.*", topic)); | ||
|
||
test_conf_init(&conf, NULL, 60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the runtime of the test today? Is it close to 60?
It is usually a good idea to allow an extra 20-30s for long tests like this to make it more robust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically 34-40 seconds locally. Which would be 20-30s less than 60. I can check the travis output though to ensure it aligns with that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bumped to 70 since the additional session timeout test brings the total test run time closer 40 seconds consistently.
src/rdkafka_conf.c
Outdated
@@ -843,6 +843,14 @@ static const struct rd_kafka_property rd_kafka_properties[] = { | |||
_RK(group_id_str), | |||
"Client group id string. All clients sharing the same group.id " | |||
"belong to the same group." }, | |||
{ _RK_GLOBAL|_RK_CGRP|_RK_MED, "group.instance.id", _RK_C_STR, | |||
_RK(group_instance_id), | |||
"Enables static group membership. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not resolved
src/rdkafka_conf.c
Outdated
"group rebalance. In order to take advantage of this new behavior " | ||
"`session.timeout.ms` should be raised to a larger value. How large " | ||
"you set this value will largely be contingent on your orchestration " | ||
"environment's ability to detect and react to process failure." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing space after .
src/rdkafka_conf.c
Outdated
"group rebalance. Each member of the group must have a " | ||
"group rebalance. In order to take advantage of this new behavior " | ||
"`session.timeout.ms` should be raised to a larger value. How large " | ||
"you set this value will largely be contingent on your orchestration " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't beat about the bush, this is too vague. Tell them what the timeout should be set to.
E.g., the maximum allowed time for a group member application to restart without triggering a rebalance, or whatever it is.
We're not using "you" form in any other configuration properties, formalize.
tests/0102-static_group_rebalance.c
Outdated
@@ -182,9 +183,10 @@ int main_0102_static_group_rebalance (int argc, char **argv) { | |||
|
|||
TEST_SAY("== Testing consumer restart ==\n"); | |||
conf = rd_kafka_conf_dup(rd_kafka_conf(c[1].rk)); | |||
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespaces from line
TIMING_STOP(&t_close); | ||
|
||
/* Catch slow close issues earlier (don't wait to call poll on c[0]) */ | ||
TIMING_ASSERT(&t_close, 0, 6000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's 6000? Is it an arbitrary value or based off some sleep or config?
tests/0102-static_group_rebalance.c
Outdated
static_member_wait_rebalance(&c[1], rebalance_start, | ||
&c[1].assigned_at, 2000); | ||
|
||
/* Should take at least as long as `session.timeout.ms but less than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unbalanced backtick
src/rdkafka_conf.c
Outdated
"environment's ability to detect and react to process failure." | ||
"Each member of the group must have a " | ||
"unique group instance id. " | ||
"group rebalance. This be used in combination with a larger " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"This be used", is there a missing can, should, may, must?
CONFIGURATION.md
Outdated
@@ -88,6 +88,7 @@ oauthbearer_token_refresh_cb | * | | | |||
plugin.library.paths | * | | | low | List of plugin libraries to load (; separated). The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the platform-specific extension (such as .dll or .so) will be appended automatically. <br>*Type: string* | |||
interceptors | * | | | low | Interceptors added through rd_kafka_conf_interceptor_add_..() and any configuration handled by interceptors. <br>*Type: * | |||
group.id | C | | | high | Client group id string. All clients sharing the same group.id belong to the same group. <br>*Type: string* | |||
group.instance.id | C | | | medium | Enables static group membership. Static group members are able to leave and rejoin a group within the configured `session.timeout.ms` without prompting a group rebalance. In order to take advantage of this new behavior `session.timeout.ms` should be raised to a larger value. How large you set this value will largely be contingent on your orchestration environment's ability to detect and react to process failure.Each member of the group must have a unique group instance id. Requires broker version >= 2.3.0. <br>*Type: string* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not regenerated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
We'll wait with merging this until v2.2.1 is out.
Great work, Ryan!
The new KAFKA_GROUP_INSTANCE_ID env variable can be set on journal clients to set the group.instance.id enable support for Kafka Static Consumer Groups. confluentinc/librdkafka#2525 https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances Combined with larger values for the session.timeout.ms and max.poll.interval.ms settings, this setting informs the Group Coordinator broker that the consumer group has static membership, and that the disappearance of a given member of the consumer group should not immediately trigger a rebalance; This allows crashing consumers to re-join the consumer group and start consuming from their assigned partitions immediately. This setting is implemented as an environment variable so that several consumers in the same group can share a configuration file, and still override the value (e.g. by setting `Environment=KAFKA_GROUP_INSTANCE_ID=groupname-%i` in a systemd template unit). When this setting is enabled, we also up the relevant values for session.timeout.ms and max.poll.interval.ms.
No description provided.