Skip to content

Commit

Permalink
Metadata cache by topic id and
Browse files Browse the repository at this point in the history
fixes for failing metadata tests:
- cache is updated on full metadata refresh
  and not cleared
- Unknown topic happening before joining
  the consumer group
- fast metadata refresh stops without
  leader change when there are no stale
  leader epochs
- handling broker isn't updated on stale leader
  epoch
  • Loading branch information
emasab committed Apr 3, 2024
1 parent 14dd831 commit f405243
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 138 deletions.
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,33 @@ librdkafka v2.3.1 is a maintenance release:
check the [release notes](https://www.openssl.org/news/cl30.txt).
* Integration tests can be started in KRaft mode and run against any
GitHub Kafka branch other than the released versions.
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4660)
* Fixes to metadata cache expiration, metadata refresh interruption and
to avoid usage of stale metadata (#4660).


## Fixes

### General fixes

* Metadata cache was cleared on full metadata refresh, leading to unnecessary
refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
cache for existing or hinted entries instead of clearing them.
Happening since 2.1.0 (#4660).
* A metadata call before member joins consumer group,
could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating
the consumer group following a metadata refresh only in safe states.
Happening since 2.1.0 (#4660).
* Metadata refreshes without partition leader change could lead to a loop of
metadata calls at fixed intervals. Solved by stopping metadata refresh when
all existing metadata is non-stale. Happening since 2.3.0 (#4660).
* A partition migration could happen, using stale metadata, when the partition
was undergoing a validation and being retried because of an error.
Solved by doing a partition migration only with a non-stale leader epoch.
Happening since 2.1.0 (#4660).



# librdkafka v2.3.0
Expand Down
18 changes: 17 additions & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -6462,11 +6462,27 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,

rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));

if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC)
return;

if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
return;

if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC)
if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) &&
rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA &&
rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
/* When consumer group isn't in join state WAIT_METADATA
* or STEADY, it's possible that some topics aren't present
* in cache and not hinted.
* It uses metadata cache only if it's not a wildcard
* subscription, so skip only those cases. See issue #4589. */
rd_kafka_dbg(
rkcg->rkcg_rk,
CGRP | RD_KAFKA_DBG_METADATA | RD_KAFKA_DBG_CONSUMER,
"REJOIN", "Skipping metadata update checks in state %s",
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
return;
}

/*
* Unmatched topics will be added to the errored list.
Expand Down
146 changes: 100 additions & 46 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,14 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_internal_t *mdi = NULL;
rd_kafka_metadata_t *md = NULL;
size_t rkb_namelen;
const int log_decode_errors = LOG_ERR;
rd_list_t *missing_topics = NULL;

const rd_list_t *requested_topics = request_topics;
rd_bool_t all_topics = rd_false;
rd_bool_t cgrp_update = rd_false;
const int log_decode_errors = LOG_ERR;
rd_list_t *missing_topics = NULL;
rd_list_t *missing_topic_ids = NULL;

const rd_list_t *requested_topics = request_topics;
const rd_list_t *requested_topic_ids = NULL;
rd_bool_t all_topics = rd_false;
rd_bool_t cgrp_update = rd_false;
rd_bool_t has_reliable_leader_epochs =
rd_kafka_has_reliable_leader_epochs(rkb);
int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
Expand All @@ -488,16 +490,17 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
int broker_changes = 0;
int cache_changes = 0;
rd_ts_t ts_start = rd_clock();

/* If client rack is present, the metadata cache (topic or full) needs
* to contain the partition to rack map. */
rd_bool_t has_client_rack = rk->rk_conf.client_rack &&
RD_KAFKAP_STR_LEN(rk->rk_conf.client_rack);
rd_bool_t compute_racks = has_client_rack;

if (request) {
requested_topics = request->rkbuf_u.Metadata.topics;
all_topics = request->rkbuf_u.Metadata.all_topics;
requested_topics = request->rkbuf_u.Metadata.topics;
requested_topic_ids = request->rkbuf_u.Metadata.topic_ids;
all_topics = request->rkbuf_u.Metadata.all_topics;
cgrp_update =
request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp;
compute_racks |= request->rkbuf_u.Metadata.force_racks;
Expand All @@ -519,6 +522,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
if (requested_topics)
missing_topics =
rd_list_copy(requested_topics, rd_list_string_copy, NULL);
if (requested_topic_ids)
missing_topic_ids =
rd_list_copy(requested_topic_ids, rd_list_Uuid_copy, NULL);

rd_kafka_broker_lock(rkb);
rkb_namelen = strlen(rkb->rkb_name) + 1;
Expand Down Expand Up @@ -833,34 +839,37 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i],
&mdi->topics[i]);

// TODO: Should be done for requested_topic_ids as well.
if (requested_topics) {
if (requested_topics)
rd_list_free_cb(missing_topics,
rd_list_remove_cmp(missing_topics,
md->topics[i].topic,
(void *)strcmp));
if (!all_topics) {
/* Only update cache when not asking
* for all topics. */

rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
}
if (requested_topic_ids)
rd_list_free_cb(
missing_topic_ids,
rd_list_remove_cmp(missing_topic_ids,
&mdi->topics[i].topic_id,
(void *)rd_kafka_Uuid_ptr_cmp));
/* Only update cache when not asking
* for all topics or cache entry
* already exists. */
rd_kafka_wrlock(rk);
cache_changes +=
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt,
all_topics /*cache entry needs to exist
*if all_topics*/);
rd_kafka_wrunlock(rk);
}

// TODO: Should be done for missing_topic_ids as well.
/* Requested topics not seen in metadata? Propogate to topic code. */
if (missing_topics) {
char *topic;
Expand Down Expand Up @@ -892,6 +901,41 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}
}
}
if (missing_topic_ids) {
rd_kafka_Uuid_t *topic_id;
rd_rkb_dbg(rkb, TOPIC, "METADATA",
"%d/%d requested topic(s) seen in metadata",
rd_list_cnt(requested_topic_ids) -
rd_list_cnt(missing_topic_ids),
rd_list_cnt(requested_topic_ids));
for (i = 0; i < rd_list_cnt(missing_topic_ids); i++) {
rd_kafka_Uuid_t *missing_topic_id =
missing_topic_ids->rl_elems[i];
rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s",
rd_kafka_Uuid_base64str(missing_topic_id));
}
RD_LIST_FOREACH(topic_id, missing_topic_ids, i) {
rd_kafka_topic_t *rkt;

rd_kafka_rdlock(rk);
rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk,
*topic_id);
rd_kafka_rdunlock(rk);
if (rkt) {
/* Received metadata response contained no
* information about topic 'rkt' and thus
* indicates the topic is not available in the
* cluster.
* Mark the topic as non-existent */
rd_kafka_topic_wrlock(rkt);
rd_kafka_topic_set_notexists(
rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
rd_kafka_topic_wrunlock(rkt);

rd_kafka_topic_destroy0(rkt);
}
}
}


rd_kafka_wrlock(rkb->rkb_rk);
Expand Down Expand Up @@ -934,9 +978,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}

if (all_topics) {
/* Expire all cache entries that were not updated. */
rd_kafka_metadata_cache_evict_by_age(rkb->rkb_rk, ts_start);

/* All hints have been replaced by the corresponding entry.
* Rest of hints can be removed as topics aren't present
* in full metadata. */
rd_kafka_metadata_cache_purge_all_hints(rkb->rkb_rk);
if (rkb->rkb_rk->rk_full_metadata)
rd_kafka_metadata_destroy(
&rkb->rkb_rk->rk_full_metadata->metadata);
Expand All @@ -956,17 +1001,17 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
"Caching full metadata with "
"%d broker(s) and %d topic(s): %s",
md->broker_cnt, md->topic_cnt, reason);
} else {
if (cache_changes)
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}


// TODO: Should be done for requested_topic_ids as well.
/* Remove cache hints for the originally requested topics. */
if (requested_topics)
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
if (requested_topic_ids)
rd_kafka_metadata_cache_purge_hints(rk, requested_topic_ids);

if (cache_changes) {
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}

rd_kafka_wrunlock(rkb->rkb_rk);

Expand All @@ -982,7 +1027,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
* which may contain only a sub-set of the subscribed topics (namely
* the effective subscription of available topics) as to not
* propagate non-included topics as non-existent. */
if (cgrp_update && (requested_topics || all_topics))
if (cgrp_update &&
(requested_topics || requested_topic_ids || all_topics))
rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp,
rd_true /*do join*/);

Expand All @@ -995,10 +1041,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}

done:

// TODO: Should be done for requested_topic_ids as well.
if (missing_topics)
rd_list_destroy(missing_topics);
if (missing_topic_ids)
rd_list_destroy(missing_topic_ids);

/* This metadata request was triggered by someone wanting
* the metadata information back as a reply, so send that reply now.
Expand All @@ -1013,18 +1059,26 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
err_parse:
err = rkbuf->rkbuf_err;
err:
// TODO: Should be done for requested_topic_ids as well.
if (requested_topics) {
/* Failed requests shall purge cache hints for
* the requested topics. */
rd_kafka_wrlock(rkb->rkb_rk);
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
rd_kafka_wrunlock(rkb->rkb_rk);
}
if (requested_topic_ids) {
/* Failed requests shall purge cache hints for
* the requested topics. */
rd_kafka_wrlock(rkb->rkb_rk);
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);
rd_kafka_wrunlock(rkb->rkb_rk);
}

// TODO: Should be done for requested_topic_ids as well.
if (missing_topics)
rd_list_destroy(missing_topics);
if (missing_topic_ids)
rd_list_destroy(missing_topic_ids);
rd_tmpabuf_destroy(&tbuf);

return err;
Expand Down
19 changes: 14 additions & 5 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ rd_kafka_metadata_new_topic_with_partition_replicas_mock(int replication_factor,
*/

struct rd_kafka_metadata_cache_entry {
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
rd_avl_node_t rkmce_avlnode_by_id; /* rkmc_avl_by_id */
TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */
rd_ts_t rkmce_ts_expires; /* Expire time */
rd_ts_t rkmce_ts_insert; /* Insert time */
Expand All @@ -243,6 +244,7 @@ struct rd_kafka_metadata_cache_entry {

struct rd_kafka_metadata_cache {
rd_avl_t rkmc_avl;
rd_avl_t rkmc_avl_by_id;
TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry;
rd_kafka_timer_t rkmc_expiry_tmr;
int rkmc_cnt;
Expand Down Expand Up @@ -270,20 +272,27 @@ struct rd_kafka_metadata_cache {

int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic);
void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk);
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts);
void rd_kafka_metadata_cache_topic_update(
int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk);
int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_metadata,
rd_kafka_metadata_broker_internal_t *brokers,
size_t broker_cnt);
size_t broker_cnt,
rd_bool_t only_existing);
void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t,
int valid);
void rd_kafka_metadata_cache_purge_hints(rd_kafka_t *rk,
const rd_list_t *topics);
void rd_kafka_metadata_cache_purge_hints_by_id(rd_kafka_t *rk,
const rd_list_t *topics);
int rd_kafka_metadata_cache_hint(rd_kafka_t *rk,
const rd_list_t *topics,
rd_list_t *dst,
Expand Down
Loading

0 comments on commit f405243

Please sign in to comment.