Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Apr 15, 2024
1 parent e97028d commit e517019
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ partition.assignment.strategy | C | | range,roundro
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`. <br>*Type: integer*
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval. <br>*Type: integer*
group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol | C | classic, consumer | classic | high | Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases. <br>*Type: enum value*
group.remote.assignor | C | | | medium | Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null <br>*Type: string*
coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment. <br>*Type: integer*
max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information. <br>*Type: integer*
enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). <br>*Type: boolean*
Expand Down
56 changes: 39 additions & 17 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2184,7 +2184,6 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
int ret_errno = 0;
const char *conf_err;
char *group_remote_assignor_override = NULL;
rd_kafka_assignor_t *cooperative_assignor;
#ifndef _WIN32
sigset_t newset, oldset;
#endif
Expand Down Expand Up @@ -2376,28 +2375,51 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
goto fail;
}

/* Detect if chosen assignor is cooperative */
cooperative_assignor = rd_kafka_assignor_find(rk, "cooperative-sticky");
rk->rk_conf.partition_assignors_cooperative =
!rk->rk_conf.partition_assignors.rl_cnt ||
(cooperative_assignor && cooperative_assignor->rkas_enabled);

if (!rk->rk_conf.group_remote_assignor) {
/* Default remote assignor to the chosen local one. */
if (rk->rk_conf.partition_assignors_cooperative) {
group_remote_assignor_override = rd_strdup("uniform");
rk->rk_conf.group_remote_assignor =
group_remote_assignor_override;
} else {
rd_kafka_assignor_t *range_assignor =
rd_kafka_assignor_find(rk, "range");
if (range_assignor && range_assignor->rkas_enabled) {
rd_kafka_assignor_t *cooperative_assignor;

/* Detect if chosen assignor is cooperative */
cooperative_assignor =
rd_kafka_assignor_find(rk, "cooperative-sticky");
rk->rk_conf.partition_assignors_cooperative =
!rk->rk_conf.partition_assignors.rl_cnt ||
(cooperative_assignor &&
cooperative_assignor->rkas_enabled);

if (rk->rk_conf.group_protocol ==
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
/* Default remote assignor to the chosen local one. */
if (rk->rk_conf.partition_assignors_cooperative) {
group_remote_assignor_override =
rd_strdup("range");
rd_strdup("uniform");
rk->rk_conf.group_remote_assignor =
group_remote_assignor_override;
} else {
rd_kafka_assignor_t *range_assignor =
rd_kafka_assignor_find(rk, "range");
if (range_assignor &&
range_assignor->rkas_enabled) {
group_remote_assignor_override =
rd_strdup("range");
rk->rk_conf.group_remote_assignor =
group_remote_assignor_override;
} else {
rd_kafka_log(
rk, LOG_WARNING, "ASSIGNOR",
"roundrobin assignor isn't "
"available"
"with group protocol CONSUMER, "
"reverting group protocol "
"to CLASSIC");
rk->rk_conf.group_protocol =
RD_KAFKA_GROUP_PROTOCOL_CLASSIC;
}
}
}
} else {
/* When users starts setting properties of the new protocol,
* they can only use incremental_assign/unassign. */
rk->rk_conf.partition_assignors_cooperative = rd_true;
}

/* Create Mock cluster */
Expand Down
3 changes: 0 additions & 3 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -6015,9 +6015,6 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) {

case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
/* If an updated/next subscription is available, schedule it. */
if (rd_kafka_trigger_waiting_subscribe_maybe(rkcg))
break;

if (rkcg->rkcg_rebalance_rejoin) {
rkcg->rkcg_rebalance_rejoin = rd_false;
Expand Down
19 changes: 9 additions & 10 deletions src/rdkafka_cgrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,24 +288,23 @@ typedef struct rd_kafka_cgrp_s {
* target assignment. Cleared when an HB succeeds
* after reconciliation finishes. */
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK 0x1
/** Member is sending an acknowledgement for a reconciled assignment */
#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK 0x2
/** A new subscription needs to be sent to the Coordinator. */
#define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x2
#define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x4
/** A new subscription is being sent to the Coordinator. */
#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION 0x4
#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION 0x8
/** Consumer has subscribed at least once,
* if it didn't happen rebalance protocol is still
* considered NONE, otherwise it depends on the
* configured partition assignors. */
#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8
/** Send a complete request in next heartbeat,
* but don't send the acknowledgement if it's not required */
#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x10
#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x10
/** Send a complete request in next heartbeat */
#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x20
/** Member is fenced, need to rejoin */
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x20
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x40
/** Member is fenced, rejoining */
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x40
/** Member is sending an acknowledgement for a reconciled assignment */
#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK 0x80
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x80


/** Rejoin the group following a currently in-progress
Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"the only supported group "
"protocol type is `consumer`.",
.sdef = "consumer"},
{_RK_GLOBAL | _RK_CGRP | _RK_HIGH | _RK_HIDDEN, "group.protocol", _RK_C_S2I,
{_RK_GLOBAL | _RK_CGRP | _RK_HIGH, "group.protocol", _RK_C_S2I,
_RK(group_protocol),
"Group protocol to use. Use `classic` for the original protocol and "
"`consumer` for the new "
Expand All @@ -1146,8 +1146,8 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
.vdef = RD_KAFKA_GROUP_PROTOCOL_CLASSIC,
.s2i = {{RD_KAFKA_GROUP_PROTOCOL_CLASSIC, "classic"},
{RD_KAFKA_GROUP_PROTOCOL_CONSUMER, "consumer"}}},
{_RK_GLOBAL | _RK_CGRP | _RK_MED | _RK_HIDDEN, "group.remote.assignor",
_RK_C_STR, _RK(group_remote_assignor),
{_RK_GLOBAL | _RK_CGRP | _RK_MED, "group.remote.assignor", _RK_C_STR,
_RK(group_remote_assignor),
"Server side assignor to use. Keep it null to make server select a "
"suitable assignor for the group. "
"Available assignors: uniform or range. Default is null",
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -3252,7 +3252,7 @@ int rd_kafka_topic_partition_list_find_idx(
* @brief Search 'rktparlist' for \p topic_id and \p partition.
* @returns the elems[] index or -1 on miss.
*/
int rd_kafka_topic_partition_list_find_by_id_idx(
int rd_kafka_topic_partition_list_find_idx_by_id(
const rd_kafka_topic_partition_list_t *rktparlist,
rd_kafka_Uuid_t topic_id,
int32_t partition) {
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_by_id(
rd_kafka_Uuid_t topic_id,
int32_t partition);

int rd_kafka_topic_partition_list_find_by_id_idx(
int rd_kafka_topic_partition_list_find_idx_by_id(
const rd_kafka_topic_partition_list_t *rktparlist,
rd_kafka_Uuid_t topic_id,
int32_t partition);
Expand Down

0 comments on commit e517019

Please sign in to comment.