diff --git a/CONFIGURATION.md b/CONFIGURATION.md index bceacd208..ae01d16dd 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -110,6 +110,8 @@ partition.assignment.strategy | C | | range,roundro session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`.
*Type: integer* heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval.
*Type: integer* group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`.
*Type: string* +group.protocol | C | classic, consumer | classic | high | Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases.
*Type: enum value* +group.remote.assignor | C | | | medium | Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null
*Type: string* coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
*Type: integer* max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information.
*Type: integer* enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().
*Type: boolean* diff --git a/src/rdkafka.c b/src/rdkafka.c index 280dd624f..ea7912dd9 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2184,7 +2184,6 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, int ret_errno = 0; const char *conf_err; char *group_remote_assignor_override = NULL; - rd_kafka_assignor_t *cooperative_assignor; #ifndef _WIN32 sigset_t newset, oldset; #endif @@ -2376,28 +2375,51 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, goto fail; } - /* Detect if chosen assignor is cooperative */ - cooperative_assignor = rd_kafka_assignor_find(rk, "cooperative-sticky"); - rk->rk_conf.partition_assignors_cooperative = - !rk->rk_conf.partition_assignors.rl_cnt || - (cooperative_assignor && cooperative_assignor->rkas_enabled); - if (!rk->rk_conf.group_remote_assignor) { - /* Default remote assignor to the chosen local one. */ - if (rk->rk_conf.partition_assignors_cooperative) { - group_remote_assignor_override = rd_strdup("uniform"); - rk->rk_conf.group_remote_assignor = - group_remote_assignor_override; - } else { - rd_kafka_assignor_t *range_assignor = - rd_kafka_assignor_find(rk, "range"); - if (range_assignor && range_assignor->rkas_enabled) { + rd_kafka_assignor_t *cooperative_assignor; + + /* Detect if chosen assignor is cooperative */ + cooperative_assignor = + rd_kafka_assignor_find(rk, "cooperative-sticky"); + rk->rk_conf.partition_assignors_cooperative = + !rk->rk_conf.partition_assignors.rl_cnt || + (cooperative_assignor && + cooperative_assignor->rkas_enabled); + + if (rk->rk_conf.group_protocol == + RD_KAFKA_GROUP_PROTOCOL_CONSUMER) { + /* Default remote assignor to the chosen local one. */ + if (rk->rk_conf.partition_assignors_cooperative) { group_remote_assignor_override = - rd_strdup("range"); + rd_strdup("uniform"); rk->rk_conf.group_remote_assignor = group_remote_assignor_override; + } else { + rd_kafka_assignor_t *range_assignor = + rd_kafka_assignor_find(rk, "range"); + if (range_assignor && + range_assignor->rkas_enabled) { + group_remote_assignor_override = + rd_strdup("range"); + rk->rk_conf.group_remote_assignor = + group_remote_assignor_override; + } else { + rd_kafka_log( + rk, LOG_WARNING, "ASSIGNOR", + "roundrobin assignor isn't " + "available" + "with group protocol CONSUMER, " + "reverting group protocol " + "to CLASSIC"); + rk->rk_conf.group_protocol = + RD_KAFKA_GROUP_PROTOCOL_CLASSIC; + } } } + } else { + /* When users starts setting properties of the new protocol, + * they can only use incremental_assign/unassign. */ + rk->rk_conf.partition_assignors_cooperative = rd_true; } /* Create Mock cluster */ diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index a59b64255..0a62fa39f 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -6015,9 +6015,6 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) { case RD_KAFKA_CGRP_JOIN_STATE_STEADY: rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); - /* If an updated/next subscription is available, schedule it. */ - if (rd_kafka_trigger_waiting_subscribe_maybe(rkcg)) - break; if (rkcg->rkcg_rebalance_rejoin) { rkcg->rkcg_rebalance_rejoin = rd_false; diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 28e772018..73d1335e2 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -288,24 +288,23 @@ typedef struct rd_kafka_cgrp_s { * target assignment. Cleared when an HB succeeds * after reconciliation finishes. */ #define RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK 0x1 +/** Member is sending an acknowledgement for a reconciled assignment */ +#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK 0x2 /** A new subscription needs to be sent to the Coordinator. */ -#define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x2 +#define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x4 /** A new subscription is being sent to the Coordinator. */ -#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION 0x4 +#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION 0x8 /** Consumer has subscribed at least once, * if it didn't happen rebalance protocol is still * considered NONE, otherwise it depends on the * configured partition assignors. */ -#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8 -/** Send a complete request in next heartbeat, - * but don't send the acknowledgement if it's not required */ -#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x10 +#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x10 +/** Send a complete request in next heartbeat */ +#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x20 /** Member is fenced, need to rejoin */ -#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x20 +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x40 /** Member is fenced, rejoining */ -#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x40 -/** Member is sending an acknowledgement for a reconciled assignment */ -#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK 0x80 +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x80 /** Rejoin the group following a currently in-progress diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 2e1531522..8244b4a04 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1136,7 +1136,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "the only supported group " "protocol type is `consumer`.", .sdef = "consumer"}, - {_RK_GLOBAL | _RK_CGRP | _RK_HIGH | _RK_HIDDEN, "group.protocol", _RK_C_S2I, + {_RK_GLOBAL | _RK_CGRP | _RK_HIGH, "group.protocol", _RK_C_S2I, _RK(group_protocol), "Group protocol to use. Use `classic` for the original protocol and " "`consumer` for the new " @@ -1146,8 +1146,8 @@ static const struct rd_kafka_property rd_kafka_properties[] = { .vdef = RD_KAFKA_GROUP_PROTOCOL_CLASSIC, .s2i = {{RD_KAFKA_GROUP_PROTOCOL_CLASSIC, "classic"}, {RD_KAFKA_GROUP_PROTOCOL_CONSUMER, "consumer"}}}, - {_RK_GLOBAL | _RK_CGRP | _RK_MED | _RK_HIDDEN, "group.remote.assignor", - _RK_C_STR, _RK(group_remote_assignor), + {_RK_GLOBAL | _RK_CGRP | _RK_MED, "group.remote.assignor", _RK_C_STR, + _RK(group_remote_assignor), "Server side assignor to use. Keep it null to make server select a " "suitable assignor for the group. " "Available assignors: uniform or range. Default is null", diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 875e31624..3f62b820e 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -3252,7 +3252,7 @@ int rd_kafka_topic_partition_list_find_idx( * @brief Search 'rktparlist' for \p topic_id and \p partition. * @returns the elems[] index or -1 on miss. */ -int rd_kafka_topic_partition_list_find_by_id_idx( +int rd_kafka_topic_partition_list_find_idx_by_id( const rd_kafka_topic_partition_list_t *rktparlist, rd_kafka_Uuid_t topic_id, int32_t partition) { diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index 618cd7895..de3b3036f 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -773,7 +773,7 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_by_id( rd_kafka_Uuid_t topic_id, int32_t partition); -int rd_kafka_topic_partition_list_find_by_id_idx( +int rd_kafka_topic_partition_list_find_idx_by_id( const rd_kafka_topic_partition_list_t *rktparlist, rd_kafka_Uuid_t topic_id, int32_t partition);