diff --git a/gms/gossiper.cc b/gms/gossiper.cc index a4682d79c2f3..a345f1770d43 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -2134,18 +2134,20 @@ void gossiper::build_seeds_list() { } } -future<> gossiper::add_saved_endpoint(locator::host_id host_id, inet_address ep, permit_id pid) { +future<> gossiper::add_saved_endpoint(locator::host_id host_id, gms::loaded_endpoint_state st, permit_id pid) { if (host_id == my_host_id()) { logger.debug("Attempt to add self as saved endpoint"); co_return; } + const auto& ep = st.endpoint; if (!host_id) { on_internal_error(logger, format("Attempt to add {} with my null host_id as saved endpoint", ep)); - co_return; + } + if (ep == inet_address{}) { + on_internal_error(logger, format("Attempt to add {} with my null inet_address as saved endpoint", host_id)); } if (ep == get_broadcast_address()) { on_internal_error(logger, format("Attempt to add {} with my broadcast_address {} as saved endpoint", host_id, ep)); - co_return; } auto permit = co_await lock_endpoint(ep, pid); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index a583b2ee1df0..77518081f03f 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -620,7 +620,7 @@ public: /** * Add an endpoint we knew about previously, but whose state is unknown */ - future<> add_saved_endpoint(locator::host_id host_id, inet_address ep, permit_id); + future<> add_saved_endpoint(locator::host_id host_id, loaded_endpoint_state st, permit_id); future<> add_local_application_state(application_state state, versioned_value value); diff --git a/service/storage_service.cc b/service/storage_service.cc index 721628daff6f..da3dbb3ca0b7 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -28,6 +28,7 @@ #include "db/consistency_level.hh" #include "seastar/core/when_all.hh" #include "service/tablet_allocator.hh" +#include "locator/types.hh" #include "locator/tablets.hh" #include "locator/tablet_metadata_guard.hh" #include "replica/tablet_mutation_builder.hh" @@ -660,7 +661,13 @@ future<> storage_service::topology_state_load() { if (is_me(e)) { continue; } - const auto ep = tmptr->get_endpoint_for_host_id(e); + const auto& topo = tmptr->get_topology(); + const auto* node = topo.find_node(e); + // node must exist in topology if it's in tmptr->get_all_endpoints + if (!node) { + on_internal_error(slogger, format("Found no node for {} in topology", e)); + } + const auto& ep = node->endpoint(); if (ep == inet_address{}) { continue; } @@ -669,7 +676,11 @@ future<> storage_service::topology_state_load() { // since it is not loaded in join_cluster in the // raft_topology_change_enabled() case. if (!_gossiper.get_endpoint_state_ptr(ep)) { - co_await _gossiper.add_saved_endpoint(e, ep, permit.id()); + gms::loaded_endpoint_state st; + st.endpoint = ep; + st.tokens = boost::copy_range>(tmptr->get_tokens(e)); + st.opt_dc_rack = node->dc_rack(); + co_await _gossiper.add_saved_endpoint(e, std::move(st), permit.id()); } } @@ -1362,7 +1373,7 @@ future<> storage_service::join_token_ring(sharded storage_service::join_cluster(sharded& auto tmlock = co_await get_token_metadata_lock(); auto tmptr = co_await get_mutable_token_metadata_ptr(); for (const auto& [host_id, st] : loaded_endpoints) { - if (st.endpoint == get_broadcast_address()) { + if (host_id == my_host_id()) { // entry has been mistakenly added, delete it co_await _sys_ks.local().remove_endpoint(st.endpoint); + } else if (st.endpoint == get_broadcast_address()) { + on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id)); } else { - if (host_id == my_host_id()) { - on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id)); - } const auto dc_rack = get_dc_rack(st); tmptr->update_topology(host_id, dc_rack, locator::node::state::normal); co_await tmptr->update_normal_tokens(st.tokens, host_id); tmptr->update_host_id(host_id, st.endpoint); // gossiping hasn't started yet // so no need to lock the endpoint - co_await _gossiper.add_saved_endpoint(host_id, st.endpoint, gms::null_permit_id); + co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id); } } co_await replicate_to_all_cores(std::move(tmptr)); @@ -2688,7 +2698,7 @@ future<> storage_service::join_cluster(sharded& })); auto loaded_peer_features = co_await _sys_ks.local().load_peer_features(); slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", - initial_contact_nodes, loaded_endpoints, loaded_peer_features.size()); + initial_contact_nodes, loaded_endpoints | boost::adaptors::map_keys, loaded_peer_features.size()); for (auto& x : loaded_peer_features) { slogger.info("peer={}, supported_features={}", x.first, x.second); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 073992ed653e..ef2182edae47 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -281,12 +281,12 @@ public: } private: - inet_address get_broadcast_address() const noexcept { - return get_token_metadata_ptr()->get_topology().my_address(); - } locator::host_id my_host_id() const noexcept { return get_token_metadata_ptr()->get_topology().my_host_id(); } + inet_address get_broadcast_address() const noexcept { + return get_token_metadata_ptr()->get_topology().my_address(); + } bool is_me(inet_address addr) const noexcept { return get_token_metadata_ptr()->get_topology().is_me(addr); }