From a5b3dea0d568c10399809adaacbfc055e442f7a9 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 20 Oct 2023 20:14:52 +0300 Subject: [PATCH] gossiper: load_saved_endpoint: set dc and rack When loading endpoint_state from system.peers, pass the loaded nodes dc/rack info from storage_service::join_token_ring to gossiper::add_saved_endpoint. Load the endpoint DC/RACK information to the endpoint_state, if available so they can propagate to bootstrapping nodes via gossip, even if those nodes are DOWN after a full cluster-restart. Note that this change makes the host_id presence mandatory following https://github.com/scylladb/scylladb/pull/16376. The reason to do so is that the other states: tokens, dc, and rack are useless with the host_id. This change is backward compatible since the HOST_ID application state was written to system.peers since inception in scylla and it would be missing only due to potential exception in older versions that failed to write it. In this case, manual intervention is needed and the correct HOST_ID needs to be manually updated in system.peers. Refs #15787 Signed-off-by: Benny Halevy --- gms/gossiper.cc | 28 +++++++--------- gms/gossiper.hh | 9 ++++- service/storage_service.cc | 69 ++++++++++++++++++++------------------ service/storage_service.hh | 3 +- 4 files changed, 59 insertions(+), 50 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index effea2811fe7..0ddbb015b29d 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -2134,12 +2134,16 @@ void gossiper::build_seeds_list() { } } -future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) { +future<> gossiper::add_saved_endpoint(inet_address ep, loaded_endpoint_state loaded_state, permit_id pid) { if (ep == get_broadcast_address()) { logger.debug("Attempt to add self as saved endpoint"); co_return; } + if (!loaded_state.host_id) { + on_internal_error(logger, format("can't find host_id for ep {} for adding saved endpoint", ep)); + } + auto permit = co_await lock_endpoint(ep, pid); //preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on) @@ -2154,24 +2158,18 @@ future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) { logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state); ep_state.update_timestamp(); } - // It's okay to use the local version generator for the loaded application state values - // As long as the endpoint_state has zero generation. - // It will get updated as a whole by handle_major_state_change - // via do_apply_state_locally when (remote_generation > local_generation) - const auto tmptr = get_token_metadata_ptr(); - auto host_id = tmptr->get_host_id_if_known(ep); - if (host_id) { - ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value())); - auto tokens = tmptr->get_tokens(*host_id); - if (!tokens.empty()) { - std::unordered_set tokens_set(tokens.begin(), tokens.end()); - ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(tokens_set)); - } + ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(loaded_state.host_id)); + if (!loaded_state.tokens.empty()) { + ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(loaded_state.tokens)); + } + if (loaded_state.opt_dc_rack) { + ep_state.add_application_state(gms::application_state::DC, versioned_value::datacenter(loaded_state.opt_dc_rack->dc)); + ep_state.add_application_state(gms::application_state::RACK, versioned_value::datacenter(loaded_state.opt_dc_rack->rack)); } auto generation = ep_state.get_heart_beat_state().get_generation(); co_await replicate(ep, std::move(ep_state), permit.id()); _unreachable_endpoints[ep] = now(); - logger.trace("Adding saved endpoint {} {}", ep, generation); + logger.debug("Added saved endpoint {} generation={} version={}: {}", ep, generation, ep_state.get_heart_beat_state().get_heart_beat_version(), ep_state); } future<> gossiper::add_local_application_state(application_state state, versioned_value value) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 92bb24877078..85a547b94837 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -36,6 +36,7 @@ #include #include #include "locator/token_metadata.hh" +#include "locator/types.hh" namespace db { class config; @@ -76,6 +77,12 @@ struct gossip_config { uint32_t skip_wait_for_gossip_to_settle = -1; }; +struct loaded_endpoint_state { + locator::host_id host_id; + std::unordered_set tokens; + std::optional opt_dc_rack; +}; + /** * This module is responsible for Gossiping information for the local endpoint. This abstraction * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module @@ -608,7 +615,7 @@ public: /** * Add an endpoint we knew about previously, but whose state is unknown */ - future<> add_saved_endpoint(inet_address ep, permit_id); + future<> add_saved_endpoint(inet_address ep, loaded_endpoint_state, permit_id); future<> add_local_application_state(application_state state, versioned_value value); diff --git a/service/storage_service.cc b/service/storage_service.cc index deb9813daf1c..31fce01268e8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -28,6 +28,7 @@ #include "db/consistency_level.hh" #include "seastar/core/when_all.hh" #include "service/tablet_allocator.hh" +#include "locator/types.hh" #include "locator/tablets.hh" #include "locator/tablet_metadata_guard.hh" #include "replica/tablet_mutation_builder.hh" @@ -669,7 +670,14 @@ future<> storage_service::topology_state_load() { // since it is not loaded in join_cluster in the // raft_topology_change_enabled() case. if (!_gossiper.get_endpoint_state_ptr(ep)) { - co_await _gossiper.add_saved_endpoint(ep, permit.id()); + gms::loaded_endpoint_state st; + const auto& topo = tmptr->get_topology(); + // node must exist in topology if it's in tmptr->get_all_endpoints + const auto* node = topo.find_node(e); + st.host_id = node->host_id(); + st.tokens = boost::copy_range>(tmptr->get_tokens(e)); + st.opt_dc_rack = node->dc_rack(); + co_await _gossiper.add_saved_endpoint(ep, std::move(st), permit.id()); } } @@ -1284,7 +1292,7 @@ future<> storage_service::join_token_ring(sharded& proxy, sharded& gossiper, std::unordered_set initial_contact_nodes, - std::unordered_set loaded_endpoints, + std::unordered_map loaded_endpoints, std::unordered_map loaded_peer_features, std::chrono::milliseconds delay, start_hint_manager start_hm, @@ -1364,10 +1372,10 @@ future<> storage_service::join_token_ring(sharded storage_service::join_cluster(sharded& set_mode(mode::STARTING); - std::unordered_set loaded_endpoints; + std::unordered_map loaded_endpoints; if (_db.local().get_config().load_ring_state() && !raft_topology_change_enabled()) { slogger.info("Loading persisted ring state"); - auto loaded_tokens = co_await _sys_ks.local().load_tokens(); - auto loaded_host_ids = co_await _sys_ks.local().load_host_ids(); - auto loaded_dc_rack = co_await _sys_ks.local().load_dc_rack_info(); + for (auto& [ep, tokens] : co_await _sys_ks.local().load_tokens()) { + loaded_endpoints[ep].tokens = std::move(tokens); + } + for (auto& [ep, host_id] : co_await _sys_ks.local().load_host_ids()) { + loaded_endpoints[ep].host_id = std::move(host_id); + } + for (auto& [ep, dc_rack] : co_await _sys_ks.local().load_dc_rack_info()) { + loaded_endpoints[ep].opt_dc_rack = std::move(dc_rack); + } - auto get_dc_rack = [&loaded_dc_rack] (inet_address ep) { - if (loaded_dc_rack.contains(ep)) { - return loaded_dc_rack[ep]; - } else { - return locator::endpoint_dc_rack::default_location; - } + auto get_dc_rack = [] (const gms::loaded_endpoint_state& st) { + return st.opt_dc_rack.value_or(locator::endpoint_dc_rack::default_location); }; if (slogger.is_enabled(logging::log_level::debug)) { - for (auto& x : loaded_tokens) { - slogger.debug("Loaded tokens: endpoint={}, tokens={}", x.first, x.second); - } - - for (auto& x : loaded_host_ids) { - slogger.debug("Loaded host_id: endpoint={}, uuid={}", x.first, x.second); + for (const auto& [ep, st] : loaded_endpoints) { + auto dc_rack = get_dc_rack(st); + slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", ep, st.host_id, dc_rack.dc, dc_rack.rack, st.tokens); } } auto tmlock = co_await get_token_metadata_lock(); auto tmptr = co_await get_mutable_token_metadata_ptr(); - for (auto x : loaded_tokens) { - auto ep = x.first; - auto tokens = x.second; + for (const auto& [ep, st] : loaded_endpoints) { if (ep == get_broadcast_address()) { // entry has been mistakenly added, delete it co_await _sys_ks.local().remove_endpoint(ep); } else { - const auto dc_rack = get_dc_rack(ep); - const auto hostIdIt = loaded_host_ids.find(ep); - if (hostIdIt == loaded_host_ids.end()) { + const auto dc_rack = get_dc_rack(st); + if (!st.host_id) { on_internal_error(slogger, format("can't find host_id for ep {}", ep)); } - tmptr->update_topology(hostIdIt->second, dc_rack, locator::node::state::normal); - co_await tmptr->update_normal_tokens(tokens, hostIdIt->second); - tmptr->update_host_id(hostIdIt->second, ep); - loaded_endpoints.insert(ep); + tmptr->update_topology(st.host_id, dc_rack, locator::node::state::normal); + co_await tmptr->update_normal_tokens(st.tokens, st.host_id); + tmptr->update_host_id(st.host_id, ep); // gossiping hasn't started yet // so no need to lock the endpoint - co_await _gossiper.add_saved_endpoint(ep, gms::null_permit_id); + co_await _gossiper.add_saved_endpoint(ep, st, gms::null_permit_id); } } co_await replicate_to_all_cores(std::move(tmptr)); @@ -2699,10 +2702,10 @@ future<> storage_service::join_cluster(sharded& auto seeds = _gossiper.get_seeds(); auto initial_contact_nodes = loaded_endpoints.empty() ? std::unordered_set(seeds.begin(), seeds.end()) : - loaded_endpoints; + boost::copy_range>(loaded_endpoints | boost::adaptors::map_keys); auto loaded_peer_features = co_await _sys_ks.local().load_peer_features(); slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", - initial_contact_nodes, loaded_endpoints, loaded_peer_features.size()); + initial_contact_nodes, loaded_endpoints | boost::adaptors::map_keys, loaded_peer_features.size()); for (auto& x : loaded_peer_features) { slogger.info("peer={}, supported_features={}", x.first, x.second); } diff --git a/service/storage_service.hh b/service/storage_service.hh index d55dcbd3b7f6..bb829fbbb8cf 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -82,6 +82,7 @@ class range_streamer; namespace gms { class feature_service; class gossiper; +class loaded_endpoint_state; }; namespace service { @@ -370,7 +371,7 @@ private: sharded& proxy, sharded& gossiper, std::unordered_set initial_contact_nodes, - std::unordered_set loaded_endpoints, + std::unordered_map loaded_endpoints, std::unordered_map loaded_peer_features, std::chrono::milliseconds, start_hint_manager start_hm,