Skip to content

Commit

Permalink
[KIP-848] Use metadata cache by topic id,
Browse files Browse the repository at this point in the history
- rename 'generic' protocol to 'classic'
- consumer group serve timer to awake the loop earlier
- compare and find into topic partition list by topic id only
- fix memory leak when instance creation fails and app_conf is provided
- fix cases where HB response is received after unsubscription
- use topic name from current assignment if it's missing from metadata
  • Loading branch information
emasab committed Apr 1, 2024
1 parent 117d317 commit 5789307
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 216 deletions.
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ group.instance.id | C | |
partition.assignment.strategy | C | | range,roundrobin | medium | The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky. <br>*Type: string*
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`. <br>*Type: integer*
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval. <br>*Type: integer*
group.protocol.type | C | | consumer | low | Group protocol type for the `generic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment. <br>*Type: integer*
max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information. <br>*Type: integer*
enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). <br>*Type: boolean*
Expand Down
14 changes: 11 additions & 3 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2185,6 +2185,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
int ret_errno = 0;
const char *conf_err;
char *group_remote_assignor_override = NULL;
rd_kafka_assignor_t *cooperative_assignor;
#ifndef _WIN32
sigset_t newset, oldset;
Expand Down Expand Up @@ -2386,14 +2387,18 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
if (!rk->rk_conf.group_remote_assignor) {
/* Default remote assignor to the chosen local one. */
if (rk->rk_conf.partition_assignors_cooperative) {
group_remote_assignor_override = rd_strdup("uniform");
rk->rk_conf.group_remote_assignor =
rd_strdup("uniform");
group_remote_assignor_override;
} else {
rd_kafka_assignor_t *range_assignor =
rd_kafka_assignor_find(rk, "range");
if (range_assignor && range_assignor->rkas_enabled)
rk->rk_conf.group_remote_assignor =
if (range_assignor && range_assignor->rkas_enabled) {
group_remote_assignor_override =
rd_strdup("range");
rk->rk_conf.group_remote_assignor =
group_remote_assignor_override;
}
}
}

Expand Down Expand Up @@ -2666,8 +2671,11 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
* that belong to rk_conf and thus needs to be cleaned up.
* Legacy APIs, sigh.. */
if (app_conf) {
if (group_remote_assignor_override)
rd_free(group_remote_assignor_override);
rd_kafka_assignors_term(rk);
rd_kafka_interceptors_destroy(&rk->rk_conf);

memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
}

Expand Down

0 comments on commit 5789307

Please sign in to comment.