diff --git a/gms/gossiper.cc b/gms/gossiper.cc index effea2811fe7..0ddbb015b29d 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -2134,12 +2134,16 @@ void gossiper::build_seeds_list() { } } -future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) { +future<> gossiper::add_saved_endpoint(inet_address ep, loaded_endpoint_state loaded_state, permit_id pid) { if (ep == get_broadcast_address()) { logger.debug("Attempt to add self as saved endpoint"); co_return; } + if (!loaded_state.host_id) { + on_internal_error(logger, format("can't find host_id for ep {} for adding saved endpoint", ep)); + } + auto permit = co_await lock_endpoint(ep, pid); //preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on) @@ -2154,24 +2158,18 @@ future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) { logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state); ep_state.update_timestamp(); } - // It's okay to use the local version generator for the loaded application state values - // As long as the endpoint_state has zero generation. - // It will get updated as a whole by handle_major_state_change - // via do_apply_state_locally when (remote_generation > local_generation) - const auto tmptr = get_token_metadata_ptr(); - auto host_id = tmptr->get_host_id_if_known(ep); - if (host_id) { - ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value())); - auto tokens = tmptr->get_tokens(*host_id); - if (!tokens.empty()) { - std::unordered_set tokens_set(tokens.begin(), tokens.end()); - ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(tokens_set)); - } + ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(loaded_state.host_id)); + if (!loaded_state.tokens.empty()) { + ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(loaded_state.tokens)); + } + if (loaded_state.opt_dc_rack) { + ep_state.add_application_state(gms::application_state::DC, versioned_value::datacenter(loaded_state.opt_dc_rack->dc)); + ep_state.add_application_state(gms::application_state::RACK, versioned_value::datacenter(loaded_state.opt_dc_rack->rack)); } auto generation = ep_state.get_heart_beat_state().get_generation(); co_await replicate(ep, std::move(ep_state), permit.id()); _unreachable_endpoints[ep] = now(); - logger.trace("Adding saved endpoint {} {}", ep, generation); + logger.debug("Added saved endpoint {} generation={} version={}: {}", ep, generation, ep_state.get_heart_beat_state().get_heart_beat_version(), ep_state); } future<> gossiper::add_local_application_state(application_state state, versioned_value value) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 92bb24877078..85a547b94837 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -36,6 +36,7 @@ #include #include #include "locator/token_metadata.hh" +#include "locator/types.hh" namespace db { class config; @@ -76,6 +77,12 @@ struct gossip_config { uint32_t skip_wait_for_gossip_to_settle = -1; }; +struct loaded_endpoint_state { + locator::host_id host_id; + std::unordered_set tokens; + std::optional opt_dc_rack; +}; + /** * This module is responsible for Gossiping information for the local endpoint. This abstraction * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module @@ -608,7 +615,7 @@ public: /** * Add an endpoint we knew about previously, but whose state is unknown */ - future<> add_saved_endpoint(inet_address ep, permit_id); + future<> add_saved_endpoint(inet_address ep, loaded_endpoint_state, permit_id); future<> add_local_application_state(application_state state, versioned_value value); diff --git a/service/storage_service.cc b/service/storage_service.cc index deb9813daf1c..31fce01268e8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -28,6 +28,7 @@ #include "db/consistency_level.hh" #include "seastar/core/when_all.hh" #include "service/tablet_allocator.hh" +#include "locator/types.hh" #include "locator/tablets.hh" #include "locator/tablet_metadata_guard.hh" #include "replica/tablet_mutation_builder.hh" @@ -669,7 +670,14 @@ future<> storage_service::topology_state_load() { // since it is not loaded in join_cluster in the // raft_topology_change_enabled() case. if (!_gossiper.get_endpoint_state_ptr(ep)) { - co_await _gossiper.add_saved_endpoint(ep, permit.id()); + gms::loaded_endpoint_state st; + const auto& topo = tmptr->get_topology(); + // node must exist in topology if it's in tmptr->get_all_endpoints + const auto* node = topo.find_node(e); + st.host_id = node->host_id(); + st.tokens = boost::copy_range>(tmptr->get_tokens(e)); + st.opt_dc_rack = node->dc_rack(); + co_await _gossiper.add_saved_endpoint(ep, std::move(st), permit.id()); } } @@ -1284,7 +1292,7 @@ future<> storage_service::join_token_ring(sharded& proxy, sharded& gossiper, std::unordered_set initial_contact_nodes, - std::unordered_set loaded_endpoints, + std::unordered_map loaded_endpoints, std::unordered_map loaded_peer_features, std::chrono::milliseconds delay, start_hint_manager start_hm, @@ -1364,10 +1372,10 @@ future<> storage_service::join_token_ring(sharded storage_service::join_cluster(sharded& set_mode(mode::STARTING); - std::unordered_set loaded_endpoints; + std::unordered_map loaded_endpoints; if (_db.local().get_config().load_ring_state() && !raft_topology_change_enabled()) { slogger.info("Loading persisted ring state"); - auto loaded_tokens = co_await _sys_ks.local().load_tokens(); - auto loaded_host_ids = co_await _sys_ks.local().load_host_ids(); - auto loaded_dc_rack = co_await _sys_ks.local().load_dc_rack_info(); + for (auto& [ep, tokens] : co_await _sys_ks.local().load_tokens()) { + loaded_endpoints[ep].tokens = std::move(tokens); + } + for (auto& [ep, host_id] : co_await _sys_ks.local().load_host_ids()) { + loaded_endpoints[ep].host_id = std::move(host_id); + } + for (auto& [ep, dc_rack] : co_await _sys_ks.local().load_dc_rack_info()) { + loaded_endpoints[ep].opt_dc_rack = std::move(dc_rack); + } - auto get_dc_rack = [&loaded_dc_rack] (inet_address ep) { - if (loaded_dc_rack.contains(ep)) { - return loaded_dc_rack[ep]; - } else { - return locator::endpoint_dc_rack::default_location; - } + auto get_dc_rack = [] (const gms::loaded_endpoint_state& st) { + return st.opt_dc_rack.value_or(locator::endpoint_dc_rack::default_location); }; if (slogger.is_enabled(logging::log_level::debug)) { - for (auto& x : loaded_tokens) { - slogger.debug("Loaded tokens: endpoint={}, tokens={}", x.first, x.second); - } - - for (auto& x : loaded_host_ids) { - slogger.debug("Loaded host_id: endpoint={}, uuid={}", x.first, x.second); + for (const auto& [ep, st] : loaded_endpoints) { + auto dc_rack = get_dc_rack(st); + slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", ep, st.host_id, dc_rack.dc, dc_rack.rack, st.tokens); } } auto tmlock = co_await get_token_metadata_lock(); auto tmptr = co_await get_mutable_token_metadata_ptr(); - for (auto x : loaded_tokens) { - auto ep = x.first; - auto tokens = x.second; + for (const auto& [ep, st] : loaded_endpoints) { if (ep == get_broadcast_address()) { // entry has been mistakenly added, delete it co_await _sys_ks.local().remove_endpoint(ep); } else { - const auto dc_rack = get_dc_rack(ep); - const auto hostIdIt = loaded_host_ids.find(ep); - if (hostIdIt == loaded_host_ids.end()) { + const auto dc_rack = get_dc_rack(st); + if (!st.host_id) { on_internal_error(slogger, format("can't find host_id for ep {}", ep)); } - tmptr->update_topology(hostIdIt->second, dc_rack, locator::node::state::normal); - co_await tmptr->update_normal_tokens(tokens, hostIdIt->second); - tmptr->update_host_id(hostIdIt->second, ep); - loaded_endpoints.insert(ep); + tmptr->update_topology(st.host_id, dc_rack, locator::node::state::normal); + co_await tmptr->update_normal_tokens(st.tokens, st.host_id); + tmptr->update_host_id(st.host_id, ep); // gossiping hasn't started yet // so no need to lock the endpoint - co_await _gossiper.add_saved_endpoint(ep, gms::null_permit_id); + co_await _gossiper.add_saved_endpoint(ep, st, gms::null_permit_id); } } co_await replicate_to_all_cores(std::move(tmptr)); @@ -2699,10 +2702,10 @@ future<> storage_service::join_cluster(sharded& auto seeds = _gossiper.get_seeds(); auto initial_contact_nodes = loaded_endpoints.empty() ? std::unordered_set(seeds.begin(), seeds.end()) : - loaded_endpoints; + boost::copy_range>(loaded_endpoints | boost::adaptors::map_keys); auto loaded_peer_features = co_await _sys_ks.local().load_peer_features(); slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", - initial_contact_nodes, loaded_endpoints, loaded_peer_features.size()); + initial_contact_nodes, loaded_endpoints | boost::adaptors::map_keys, loaded_peer_features.size()); for (auto& x : loaded_peer_features) { slogger.info("peer={}, supported_features={}", x.first, x.second); } diff --git a/service/storage_service.hh b/service/storage_service.hh index d55dcbd3b7f6..bb829fbbb8cf 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -82,6 +82,7 @@ class range_streamer; namespace gms { class feature_service; class gossiper; +class loaded_endpoint_state; }; namespace service { @@ -370,7 +371,7 @@ private: sharded& proxy, sharded& gossiper, std::unordered_set initial_contact_nodes, - std::unordered_set loaded_endpoints, + std::unordered_map loaded_endpoints, std::unordered_map loaded_peer_features, std::chrono::milliseconds, start_hint_manager start_hm,