From f40524306d2159b86d197d29a8a95a6e78efba6b Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 3 Apr 2024 15:18:11 +0200 Subject: [PATCH] Metadata cache by topic id and fixes for failing metadata tests: - cache is updated on full metadata refresh and not cleared - Unknown topic happening before joining the consumer group - fast metadata refresh stops without leader change when there are no stale leader epochs - handling broker isn't updated on stale leader epoch --- CHANGELOG.md | 27 ++++++ src/rdkafka_cgrp.c | 18 +++- src/rdkafka_metadata.c | 146 ++++++++++++++++++++++---------- src/rdkafka_metadata.h | 19 +++-- src/rdkafka_metadata_cache.c | 159 ++++++++++++++++++++++++++++------- src/rdkafka_proto.h | 25 +++--- src/rdkafka_topic.c | 104 +++++++++++++---------- 7 files changed, 360 insertions(+), 138 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf3ee31396..304add1504 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,33 @@ librdkafka v2.3.1 is a maintenance release: check the [release notes](https://www.openssl.org/news/cl30.txt). * Integration tests can be started in KRaft mode and run against any GitHub Kafka branch other than the released versions. + * [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) + Continue partial implementation by adding a metadata cache by topic id + and updating the topic id corresponding to the partition name (#4660) + * Fixes to metadata cache expiration, metadata refresh interruption and + to avoid usage of stale metadata (#4660). + + +## Fixes + +### General fixes + + * Metadata cache was cleared on full metadata refresh, leading to unnecessary + refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating + cache for existing or hinted entries instead of clearing them. + Happening since 2.1.0 (#4660). + * A metadata call before member joins consumer group, + could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating + the consumer group following a metadata refresh only in safe states. + Happening since 2.1.0 (#4660). + * Metadata refreshes without partition leader change could lead to a loop of + metadata calls at fixed intervals. Solved by stopping metadata refresh when + all existing metadata is non-stale. Happening since 2.3.0 (#4660). + * A partition migration could happen, using stale metadata, when the partition + was undergoing a validation and being retried because of an error. + Solved by doing a partition migration only with a non-stale leader epoch. + Happening since 2.1.0 (#4660). + # librdkafka v2.3.0 diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index d969d63927..893c1645bd 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -6462,11 +6462,27 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg, rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); + if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC) + return; + if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0) return; - if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC) + if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) && + rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA && + rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY) { + /* When consumer group isn't in join state WAIT_METADATA + * or STEADY, it's possible that some topics aren't present + * in cache and not hinted. + * It uses metadata cache only if it's not a wildcard + * subscription, so skip only those cases. See issue #4589. */ + rd_kafka_dbg( + rkcg->rkcg_rk, + CGRP | RD_KAFKA_DBG_METADATA | RD_KAFKA_DBG_CONSUMER, + "REJOIN", "Skipping metadata update checks in state %s", + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); return; + } /* * Unmatched topics will be added to the errored list. diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 7365be645f..3f04cbbbfd 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -474,12 +474,14 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_metadata_internal_t *mdi = NULL; rd_kafka_metadata_t *md = NULL; size_t rkb_namelen; - const int log_decode_errors = LOG_ERR; - rd_list_t *missing_topics = NULL; - - const rd_list_t *requested_topics = request_topics; - rd_bool_t all_topics = rd_false; - rd_bool_t cgrp_update = rd_false; + const int log_decode_errors = LOG_ERR; + rd_list_t *missing_topics = NULL; + rd_list_t *missing_topic_ids = NULL; + + const rd_list_t *requested_topics = request_topics; + const rd_list_t *requested_topic_ids = NULL; + rd_bool_t all_topics = rd_false; + rd_bool_t cgrp_update = rd_false; rd_bool_t has_reliable_leader_epochs = rd_kafka_has_reliable_leader_epochs(rkb); int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion; @@ -488,7 +490,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int broker_changes = 0; int cache_changes = 0; - rd_ts_t ts_start = rd_clock(); + /* If client rack is present, the metadata cache (topic or full) needs * to contain the partition to rack map. */ rd_bool_t has_client_rack = rk->rk_conf.client_rack && @@ -496,8 +498,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_bool_t compute_racks = has_client_rack; if (request) { - requested_topics = request->rkbuf_u.Metadata.topics; - all_topics = request->rkbuf_u.Metadata.all_topics; + requested_topics = request->rkbuf_u.Metadata.topics; + requested_topic_ids = request->rkbuf_u.Metadata.topic_ids; + all_topics = request->rkbuf_u.Metadata.all_topics; cgrp_update = request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp; compute_racks |= request->rkbuf_u.Metadata.force_racks; @@ -519,6 +522,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, if (requested_topics) missing_topics = rd_list_copy(requested_topics, rd_list_string_copy, NULL); + if (requested_topic_ids) + missing_topic_ids = + rd_list_copy(requested_topic_ids, rd_list_Uuid_copy, NULL); rd_kafka_broker_lock(rkb); rkb_namelen = strlen(rkb->rkb_name) + 1; @@ -833,34 +839,37 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], &mdi->topics[i]); - // TODO: Should be done for requested_topic_ids as well. - if (requested_topics) { + if (requested_topics) rd_list_free_cb(missing_topics, rd_list_remove_cmp(missing_topics, md->topics[i].topic, (void *)strcmp)); - if (!all_topics) { - /* Only update cache when not asking - * for all topics. */ - - rd_kafka_wrlock(rk); - rd_kafka_metadata_cache_topic_update( - rk, &md->topics[i], &mdi->topics[i], - rd_false /*propagate later*/, - /* use has_client_rack rather than - compute_racks. We need cached rack ids - only in case we need to rejoin the group - if they change and client.rack is set - (KIP-881). */ - has_client_rack, mdi->brokers, - md->broker_cnt); - cache_changes++; - rd_kafka_wrunlock(rk); - } - } + if (requested_topic_ids) + rd_list_free_cb( + missing_topic_ids, + rd_list_remove_cmp(missing_topic_ids, + &mdi->topics[i].topic_id, + (void *)rd_kafka_Uuid_ptr_cmp)); + /* Only update cache when not asking + * for all topics or cache entry + * already exists. */ + rd_kafka_wrlock(rk); + cache_changes += + rd_kafka_metadata_cache_topic_update( + rk, &md->topics[i], &mdi->topics[i], + rd_false /*propagate later*/, + /* use has_client_rack rather than + compute_racks. We need cached rack ids + only in case we need to rejoin the group + if they change and client.rack is set + (KIP-881). */ + has_client_rack, mdi->brokers, + md->broker_cnt, + all_topics /*cache entry needs to exist + *if all_topics*/); + rd_kafka_wrunlock(rk); } - // TODO: Should be done for missing_topic_ids as well. /* Requested topics not seen in metadata? Propogate to topic code. */ if (missing_topics) { char *topic; @@ -892,6 +901,41 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } } } + if (missing_topic_ids) { + rd_kafka_Uuid_t *topic_id; + rd_rkb_dbg(rkb, TOPIC, "METADATA", + "%d/%d requested topic(s) seen in metadata", + rd_list_cnt(requested_topic_ids) - + rd_list_cnt(missing_topic_ids), + rd_list_cnt(requested_topic_ids)); + for (i = 0; i < rd_list_cnt(missing_topic_ids); i++) { + rd_kafka_Uuid_t *missing_topic_id = + missing_topic_ids->rl_elems[i]; + rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s", + rd_kafka_Uuid_base64str(missing_topic_id)); + } + RD_LIST_FOREACH(topic_id, missing_topic_ids, i) { + rd_kafka_topic_t *rkt; + + rd_kafka_rdlock(rk); + rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk, + *topic_id); + rd_kafka_rdunlock(rk); + if (rkt) { + /* Received metadata response contained no + * information about topic 'rkt' and thus + * indicates the topic is not available in the + * cluster. + * Mark the topic as non-existent */ + rd_kafka_topic_wrlock(rkt); + rd_kafka_topic_set_notexists( + rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); + rd_kafka_topic_wrunlock(rkt); + + rd_kafka_topic_destroy0(rkt); + } + } + } rd_kafka_wrlock(rkb->rkb_rk); @@ -934,9 +978,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } if (all_topics) { - /* Expire all cache entries that were not updated. */ - rd_kafka_metadata_cache_evict_by_age(rkb->rkb_rk, ts_start); - + /* All hints have been replaced by the corresponding entry. + * Rest of hints can be removed as topics aren't present + * in full metadata. */ + rd_kafka_metadata_cache_purge_all_hints(rkb->rkb_rk); if (rkb->rkb_rk->rk_full_metadata) rd_kafka_metadata_destroy( &rkb->rkb_rk->rk_full_metadata->metadata); @@ -956,17 +1001,17 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, "Caching full metadata with " "%d broker(s) and %d topic(s): %s", md->broker_cnt, md->topic_cnt, reason); - } else { - if (cache_changes) - rd_kafka_metadata_cache_propagate_changes(rk); - rd_kafka_metadata_cache_expiry_start(rk); } - - - // TODO: Should be done for requested_topic_ids as well. /* Remove cache hints for the originally requested topics. */ if (requested_topics) rd_kafka_metadata_cache_purge_hints(rk, requested_topics); + if (requested_topic_ids) + rd_kafka_metadata_cache_purge_hints(rk, requested_topic_ids); + + if (cache_changes) { + rd_kafka_metadata_cache_propagate_changes(rk); + rd_kafka_metadata_cache_expiry_start(rk); + } rd_kafka_wrunlock(rkb->rkb_rk); @@ -982,7 +1027,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, * which may contain only a sub-set of the subscribed topics (namely * the effective subscription of available topics) as to not * propagate non-included topics as non-existent. */ - if (cgrp_update && (requested_topics || all_topics)) + if (cgrp_update && + (requested_topics || requested_topic_ids || all_topics)) rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp, rd_true /*do join*/); @@ -995,10 +1041,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } done: - - // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); + if (missing_topic_ids) + rd_list_destroy(missing_topic_ids); /* This metadata request was triggered by someone wanting * the metadata information back as a reply, so send that reply now. @@ -1013,7 +1059,6 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, err_parse: err = rkbuf->rkbuf_err; err: - // TODO: Should be done for requested_topic_ids as well. if (requested_topics) { /* Failed requests shall purge cache hints for * the requested topics. */ @@ -1021,10 +1066,19 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_metadata_cache_purge_hints(rk, requested_topics); rd_kafka_wrunlock(rkb->rkb_rk); } + if (requested_topic_ids) { + /* Failed requests shall purge cache hints for + * the requested topics. */ + rd_kafka_wrlock(rkb->rkb_rk); + rd_kafka_metadata_cache_purge_hints_by_id(rk, + requested_topic_ids); + rd_kafka_wrunlock(rkb->rkb_rk); + } - // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); + if (missing_topic_ids) + rd_list_destroy(missing_topic_ids); rd_tmpabuf_destroy(&tbuf); return err; diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 213bf2b896..9a30fe5f7f 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -219,7 +219,8 @@ rd_kafka_metadata_new_topic_with_partition_replicas_mock(int replication_factor, */ struct rd_kafka_metadata_cache_entry { - rd_avl_node_t rkmce_avlnode; /* rkmc_avl */ + rd_avl_node_t rkmce_avlnode; /* rkmc_avl */ + rd_avl_node_t rkmce_avlnode_by_id; /* rkmc_avl_by_id */ TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */ rd_ts_t rkmce_ts_expires; /* Expire time */ rd_ts_t rkmce_ts_insert; /* Insert time */ @@ -243,6 +244,7 @@ struct rd_kafka_metadata_cache_entry { struct rd_kafka_metadata_cache { rd_avl_t rkmc_avl; + rd_avl_t rkmc_avl_by_id; TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry; rd_kafka_timer_t rkmc_expiry_tmr; int rkmc_cnt; @@ -270,20 +272,27 @@ struct rd_kafka_metadata_cache { int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic); void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk); -int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts); -void rd_kafka_metadata_cache_topic_update( +int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk); +int rd_kafka_metadata_cache_topic_update( rd_kafka_t *rk, - const rd_kafka_metadata_topic_t *mdt, + rd_kafka_metadata_topic_t *mdt, const rd_kafka_metadata_topic_internal_t *mdit, rd_bool_t propagate, rd_bool_t include_metadata, rd_kafka_metadata_broker_internal_t *brokers, - size_t broker_cnt); + size_t broker_cnt, + rd_bool_t only_existing); void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk); struct rd_kafka_metadata_cache_entry * rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid); +struct rd_kafka_metadata_cache_entry * +rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk, + const rd_kafka_Uuid_t, + int valid); void rd_kafka_metadata_cache_purge_hints(rd_kafka_t *rk, const rd_list_t *topics); +void rd_kafka_metadata_cache_purge_hints_by_id(rd_kafka_t *rk, + const rd_list_t *topics); int rd_kafka_metadata_cache_hint(rd_kafka_t *rk, const rd_list_t *topics, rd_list_t *dst, diff --git a/src/rdkafka_metadata_cache.c b/src/rdkafka_metadata_cache.c index b3bad4de8d..f37ebf5797 100644 --- a/src/rdkafka_metadata_cache.c +++ b/src/rdkafka_metadata_cache.c @@ -80,8 +80,14 @@ static RD_INLINE void rd_kafka_metadata_cache_delete(rd_kafka_t *rk, struct rd_kafka_metadata_cache_entry *rkmce, int unlink_avl) { - if (unlink_avl) + if (unlink_avl) { RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, rkmce); + if (!RD_KAFKA_UUID_IS_ZERO( + rkmce->rkmce_metadata_internal_topic.topic_id)) { + RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl_by_id, + rkmce); + } + } TAILQ_REMOVE(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link); rd_kafka_assert(NULL, rk->rk_metadata_cache.rkmc_cnt > 0); rk->rk_metadata_cache.rkmc_cnt--; @@ -161,45 +167,27 @@ static int rd_kafka_metadata_cache_evict(rd_kafka_t *rk) { /** - * @brief Evict timed out entries from cache based on their insert/update time - * rather than expiry time. Any entries older than \p ts will be evicted. + * @brief Remove all cache hints,. + * This is done when the Metadata response has been parsed and + * replaced hints with existing topic information, thus this will + * only remove unmatched topics from the cache. * - * @returns the number of entries evicted. + * @returns the number of purged hints * * @locks_required rd_kafka_wrlock() */ -int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts) { +int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk) { int cnt = 0; struct rd_kafka_metadata_cache_entry *rkmce, *tmp; TAILQ_FOREACH_SAFE(rkmce, &rk->rk_metadata_cache.rkmc_expiry, rkmce_link, tmp) { - if (rkmce->rkmce_ts_insert <= ts) { + if (!RD_KAFKA_METADATA_CACHE_VALID(rkmce)) { rd_kafka_metadata_cache_delete(rk, rkmce, 1); cnt++; } } - /* Update expiry timer */ - rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry); - if (rkmce) - rd_kafka_timer_start(&rk->rk_timers, - &rk->rk_metadata_cache.rkmc_expiry_tmr, - rkmce->rkmce_ts_expires - rd_clock(), - rd_kafka_metadata_cache_evict_tmr_cb, rk); - else - rd_kafka_timer_stop(&rk->rk_timers, - &rk->rk_metadata_cache.rkmc_expiry_tmr, 1); - - rd_kafka_dbg(rk, METADATA, "METADATA", - "Expired %d entries older than %dms from metadata cache " - "(%d entries remain)", - cnt, (int)((rd_clock() - ts) / 1000), - rk->rk_metadata_cache.rkmc_cnt); - - if (cnt) - rd_kafka_metadata_cache_propagate_changes(rk); - return cnt; } @@ -221,6 +209,25 @@ rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid) { return NULL; } +/** + * @brief Find cache entry by topic id + * + * @param valid: entry must be valid (not hint) + * + * @locks rd_kafka_*lock() + */ +struct rd_kafka_metadata_cache_entry * +rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk, + const rd_kafka_Uuid_t topic_id, + int valid) { + struct rd_kafka_metadata_cache_entry skel, *rkmce; + skel.rkmce_metadata_internal_topic.topic_id = topic_id; + rkmce = RD_AVL_FIND(&rk->rk_metadata_cache.rkmc_avl_by_id, &skel); + if (rkmce && (!valid || RD_KAFKA_METADATA_CACHE_VALID(rkmce))) + return rkmce; + return NULL; +} + /** * @brief Partition (id) comparator @@ -247,7 +254,7 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( rd_bool_t include_racks, rd_kafka_metadata_broker_internal_t *brokers_internal, size_t broker_cnt) { - struct rd_kafka_metadata_cache_entry *rkmce, *old; + struct rd_kafka_metadata_cache_entry *rkmce, *old, *old_by_id = NULL; rd_tmpabuf_t tbuf; int i; @@ -350,8 +357,28 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( /* Insert (and replace existing) entry. */ old = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl, rkmce, rkmce_avlnode); - if (old) + /* Insert (and replace existing) entry into the AVL tree sorted + * by topic id. */ + if (!RD_KAFKA_UUID_IS_ZERO( + rkmce->rkmce_metadata_internal_topic.topic_id)) { + /* If topic id isn't zero insert cache entry into this tree */ + old_by_id = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl_by_id, + rkmce, rkmce_avlnode_by_id); + } else if (old && !RD_KAFKA_UUID_IS_ZERO( + old->rkmce_metadata_internal_topic.topic_id)) { + /* If it had a topic id, remove it from the tree */ + RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl_by_id, old); + } + if (old) { + /* Delete and free old cache entry */ rd_kafka_metadata_cache_delete(rk, old, 0); + } + if (old_by_id && old_by_id != old) { + /* If there was a different cache entry in this tree, + * remove and free it. */ + RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, old_by_id); + rd_kafka_metadata_cache_delete(rk, old_by_id, 0); + } /* Explicitly not freeing the tmpabuf since rkmce points to its * memory. */ @@ -414,22 +441,41 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) { * For permanent errors (authorization failures), we keep * the entry cached for metadata.max.age.ms. * + * @return 1 on metadata change, 0 when no change was applied + * * @remark The cache expiry timer will not be updated/started, * call rd_kafka_metadata_cache_expiry_start() instead. * * @locks rd_kafka_wrlock() */ -void rd_kafka_metadata_cache_topic_update( +int rd_kafka_metadata_cache_topic_update( rd_kafka_t *rk, - const rd_kafka_metadata_topic_t *mdt, + rd_kafka_metadata_topic_t *mdt, const rd_kafka_metadata_topic_internal_t *mdit, rd_bool_t propagate, rd_bool_t include_racks, rd_kafka_metadata_broker_internal_t *brokers, - size_t broker_cnt) { - rd_ts_t now = rd_clock(); + size_t broker_cnt, + rd_bool_t only_existing) { + struct rd_kafka_metadata_cache_entry *rkmce = NULL; + rd_ts_t now = rd_clock(); rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); int changed = 1; + if (!mdt->topic) { + rkmce = + rd_kafka_metadata_cache_find_by_id(rk, mdit->topic_id, 1); + if (!rkmce) + return 0; + + /* Borrowed pointer from rkmce tmpabuf */ + mdt->topic = rkmce->rkmce_mtopic.topic; + } + if (unlikely(mdt->topic && !rkmce && only_existing)) { + rkmce = rd_kafka_metadata_cache_find(rk, mdt->topic, 0); + } + if (unlikely(!mdt->topic || (only_existing && !rkmce))) { + return 0; + } /* Cache unknown topics for a short while (100ms) to allow the cgrp * logic to find negative cache hits. */ @@ -448,6 +494,8 @@ void rd_kafka_metadata_cache_topic_update( if (changed && propagate) rd_kafka_metadata_cache_propagate_changes(rk); + + return changed; } @@ -485,6 +533,40 @@ void rd_kafka_metadata_cache_purge_hints(rd_kafka_t *rk, } } +/** + * @brief Remove cache hints for topic ids in \p topic_ids + * This is done when the Metadata response has been parsed and + * replaced hints with existing topic information, thus this will + * only remove unmatched topics from the cache. + * + * @locks rd_kafka_wrlock() + */ +void rd_kafka_metadata_cache_purge_hints_by_id(rd_kafka_t *rk, + const rd_list_t *topic_ids) { + const rd_kafka_Uuid_t *topic_id; + int i; + int cnt = 0; + + RD_LIST_FOREACH(topic_id, topic_ids, i) { + struct rd_kafka_metadata_cache_entry *rkmce; + + if (!(rkmce = rd_kafka_metadata_cache_find_by_id(rk, *topic_id, + 0 /*any*/)) || + RD_KAFKA_METADATA_CACHE_VALID(rkmce)) + continue; + + rd_kafka_metadata_cache_delete(rk, rkmce, 1 /*unlink avl*/); + cnt++; + } + + if (cnt > 0) { + rd_kafka_dbg(rk, METADATA, "METADATA", + "Purged %d/%d cached topic hint(s)", cnt, + rd_list_cnt(topic_ids)); + rd_kafka_metadata_cache_propagate_changes(rk); + } +} + /** * @brief Inserts a non-valid entry for topics in \p topics indicating @@ -589,6 +671,16 @@ static int rd_kafka_metadata_cache_entry_cmp(const void *_a, const void *_b) { return strcmp(a->rkmce_mtopic.topic, b->rkmce_mtopic.topic); } +/** + * @brief Cache entry comparator (on topic id) + */ +static int rd_kafka_metadata_cache_entry_by_id_cmp(const void *_a, + const void *_b) { + const struct rd_kafka_metadata_cache_entry *a = _a, *b = _b; + return rd_kafka_Uuid_cmp(a->rkmce_metadata_internal_topic.topic_id, + b->rkmce_metadata_internal_topic.topic_id); +} + /** * @brief Initialize the metadata cache @@ -598,6 +690,8 @@ static int rd_kafka_metadata_cache_entry_cmp(const void *_a, const void *_b) { void rd_kafka_metadata_cache_init(rd_kafka_t *rk) { rd_avl_init(&rk->rk_metadata_cache.rkmc_avl, rd_kafka_metadata_cache_entry_cmp, 0); + rd_avl_init(&rk->rk_metadata_cache.rkmc_avl_by_id, + rd_kafka_metadata_cache_entry_by_id_cmp, 0); TAILQ_INIT(&rk->rk_metadata_cache.rkmc_expiry); mtx_init(&rk->rk_metadata_cache.rkmc_full_lock, mtx_plain); mtx_init(&rk->rk_metadata_cache.rkmc_cnd_lock, mtx_plain); @@ -620,6 +714,7 @@ void rd_kafka_metadata_cache_destroy(rd_kafka_t *rk) { mtx_destroy(&rk->rk_metadata_cache.rkmc_cnd_lock); cnd_destroy(&rk->rk_metadata_cache.rkmc_cnd); rd_avl_destroy(&rk->rk_metadata_cache.rkmc_avl); + rd_avl_destroy(&rk->rk_metadata_cache.rkmc_avl_by_id); } diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 7f202a5e29..686e9c7b62 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -597,19 +597,22 @@ typedef struct rd_kafka_Uuid_s { 0, 1, "" \ } -/** - * Initialize given UUID to zero UUID. - * - * @param uuid UUID to initialize. - */ -static RD_INLINE RD_UNUSED void rd_kafka_Uuid_init(rd_kafka_Uuid_t *uuid) { - memset(uuid, 0, sizeof(*uuid)); -} - static RD_INLINE RD_UNUSED int rd_kafka_Uuid_cmp(rd_kafka_Uuid_t a, rd_kafka_Uuid_t b) { - return (a.most_significant_bits - b.most_significant_bits) || - (a.least_significant_bits - b.least_significant_bits); + if (a.most_significant_bits < b.most_significant_bits) + return -1; + if (a.most_significant_bits > b.most_significant_bits) + return 1; + if (a.least_significant_bits < b.least_significant_bits) + return -1; + if (a.least_significant_bits > b.least_significant_bits) + return 1; + return 0; +} + +static RD_INLINE RD_UNUSED int rd_kafka_Uuid_ptr_cmp(void *a, void *b) { + rd_kafka_Uuid_t *a_uuid = a, *b_uuid = b; + return rd_kafka_Uuid_cmp(*a_uuid, *b_uuid); } rd_kafka_Uuid_t rd_kafka_Uuid_random(); diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index ccaf535a92..bd1239d501 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -662,8 +662,8 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rd_kafka_broker_t *leader, int32_t leader_epoch) { rd_kafka_toppar_t *rktp; - rd_bool_t fetching_from_follower, need_epoch_validation = rd_false; - int r = 0; + rd_bool_t need_epoch_validation = rd_false; + int r = 0; rktp = rd_kafka_toppar_get(rkt, partition, 0); if (unlikely(!rktp)) { @@ -691,14 +691,17 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, leader_epoch, rktp->rktp_leader_epoch); - if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE) { + if (rktp->rktp_fetch_state != + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) { rd_kafka_toppar_unlock(rktp); rd_kafka_toppar_destroy(rktp); /* from get() */ return 0; } } - if (leader_epoch > rktp->rktp_leader_epoch) { + if (rktp->rktp_leader_epoch == -1 || + leader_epoch > rktp->rktp_leader_epoch) { + rd_bool_t fetching_from_follower; rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", "%s [%" PRId32 "]: leader %" PRId32 " epoch %" PRId32 " -> leader %" PRId32 @@ -706,44 +709,50 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, rktp->rktp_leader_id, rktp->rktp_leader_epoch, leader_id, leader_epoch); - rktp->rktp_leader_epoch = leader_epoch; - need_epoch_validation = rd_true; - } else if (rktp->rktp_fetch_state == - RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + if (leader_epoch > rktp->rktp_leader_epoch) + rktp->rktp_leader_epoch = leader_epoch; need_epoch_validation = rd_true; - fetching_from_follower = - leader != NULL && rktp->rktp_broker != NULL && - rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && - rktp->rktp_broker != leader; - if (fetching_from_follower && rktp->rktp_leader_id == leader_id) { - rd_kafka_dbg( - rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", - "Topic %s [%" PRId32 "]: leader %" PRId32 - " unchanged, " - "not migrating away from preferred replica %" PRId32, - rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, - leader_id, rktp->rktp_broker_id); - r = 0; + fetching_from_follower = + leader != NULL && rktp->rktp_broker != NULL && + rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && + rktp->rktp_broker != leader; - } else { + if (fetching_from_follower && + rktp->rktp_leader_id == leader_id) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "Topic %s [%" PRId32 "]: leader %" PRId32 + " unchanged, " + "not migrating away from preferred " + "replica %" PRId32, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, leader_id, + rktp->rktp_broker_id); + r = 0; - if (rktp->rktp_leader_id != leader_id || - rktp->rktp_leader != leader) { - /* Update leader if it has changed */ - rktp->rktp_leader_id = leader_id; - if (rktp->rktp_leader) - rd_kafka_broker_destroy(rktp->rktp_leader); - if (leader) - rd_kafka_broker_keep(leader); - rktp->rktp_leader = leader; + } else { + + if (rktp->rktp_leader_id != leader_id || + rktp->rktp_leader != leader) { + /* Update leader if it has changed */ + rktp->rktp_leader_id = leader_id; + if (rktp->rktp_leader) + rd_kafka_broker_destroy( + rktp->rktp_leader); + if (leader) + rd_kafka_broker_keep(leader); + rktp->rktp_leader = leader; + } + + /* Update handling broker */ + r = rd_kafka_toppar_broker_update( + rktp, leader_id, leader, "leader updated"); } - /* Update handling broker */ - r = rd_kafka_toppar_broker_update(rktp, leader_id, leader, - "leader updated"); - } + } else if (rktp->rktp_fetch_state == + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + need_epoch_validation = rd_true; if (need_epoch_validation) { /* Set offset validation position, @@ -1277,8 +1286,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rd_kafka_broker_t **partbrokers; int leader_cnt = 0; int old_state; - rd_bool_t partition_exists_with_no_leader_epoch = rd_false; - rd_bool_t partition_exists_with_updated_leader_epoch = rd_false; + rd_bool_t partition_exists_with_no_leader_epoch = rd_false; + rd_bool_t partition_exists_with_stale_leader_epoch = rd_false; if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR) rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", @@ -1328,8 +1337,17 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) { upd += rd_kafka_topic_partition_cnt_update(rkt, mdt->partition_cnt); - if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO)) + if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO)) { + /* FIXME: an offset reset must be triggered. + * when rkt_topic_id wasn't zero. + * There are no problems + * in test 0107_topic_recreate if offsets in new + * topic are lower than in previous one, + * causing an out of range and an offset reset, + * but the rarer case where they're higher needs + * to be checked. */ rkt->rkt_topic_id = mdit->topic_id; + } /* If the metadata times out for a topic (because all brokers * are down) the state will transition to S_UNKNOWN. * When updated metadata is eventually received there might @@ -1343,7 +1361,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, /* Update leader for each partition */ for (j = 0; j < mdt->partition_cnt; j++) { - int r; + int r = 0; rd_kafka_broker_t *leader; int32_t leader_epoch = mdit->partitions[j].leader_epoch; rd_kafka_toppar_t *rktp = @@ -1362,8 +1380,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, * set to -1, we assume that metadata is not stale. */ if (leader_epoch == -1) partition_exists_with_no_leader_epoch = rd_true; - else if (rktp->rktp_leader_epoch < leader_epoch) - partition_exists_with_updated_leader_epoch = rd_true; + else if (leader_epoch < rktp->rktp_leader_epoch) + partition_exists_with_stale_leader_epoch = rd_true; /* Update leader for partition */ @@ -1386,7 +1404,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, * stale, we can turn off fast leader query. */ if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt && (partition_exists_with_no_leader_epoch || - partition_exists_with_updated_leader_epoch)) + !partition_exists_with_stale_leader_epoch)) rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) { @@ -2046,7 +2064,7 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, rd_kafka_wrlock(rkt->rkt_rk); rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true, - rd_false, NULL, 0); + rd_false, NULL, 0, rd_false); rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock()); rd_kafka_wrunlock(rkt->rkt_rk); rd_free(partitions);