Skip to content

Commit

Permalink
gossiper: add_saved_endpoint: set dc and rack
Browse files Browse the repository at this point in the history
When loading endpoint_state from system.peers,
pass the loaded nodes dc/rack info from
storage_service::join_token_ring to gossiper::add_saved_endpoint.

Load the endpoint DC/RACK information to the endpoint_state,
if available so they can propagate to bootstrapping nodes
via gossip, even if those nodes are DOWN after a full cluster-restart.

Note that this change makes the host_id presence
mandatory following scylladb#16376.
The reason to do so is that the other states: tokens, dc, and rack
are useless with the host_id.
This change is backward compatible since the HOST_ID application state
was written to system.peers since inception in scylla
and it would be missing only due to potential exception
in older versions that failed to write it.
In this case, manual intervention is needed and
the correct HOST_ID needs to be manually updated in system.peers.

Refs scylladb#15787

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Feb 26, 2024
1 parent 07edf6d commit 94835e6
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 16 deletions.
8 changes: 5 additions & 3 deletions gms/gossiper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2134,18 +2134,20 @@ void gossiper::build_seeds_list() {
}
}

future<> gossiper::add_saved_endpoint(locator::host_id host_id, inet_address ep, permit_id pid) {
future<> gossiper::add_saved_endpoint(locator::host_id host_id, gms::loaded_endpoint_state st, permit_id pid) {
if (host_id == my_host_id()) {
logger.debug("Attempt to add self as saved endpoint");
co_return;
}
const auto& ep = st.endpoint;
if (!host_id) {
on_internal_error(logger, format("Attempt to add {} with my null host_id as saved endpoint", ep));
co_return;
}
if (ep == inet_address{}) {
on_internal_error(logger, format("Attempt to add {} with my null inet_address as saved endpoint", host_id));
}
if (ep == get_broadcast_address()) {
on_internal_error(logger, format("Attempt to add {} with my broadcast_address {} as saved endpoint", host_id, ep));
co_return;
}

auto permit = co_await lock_endpoint(ep, pid);
Expand Down
2 changes: 1 addition & 1 deletion gms/gossiper.hh
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ public:
/**
* Add an endpoint we knew about previously, but whose state is unknown
*/
future<> add_saved_endpoint(locator::host_id host_id, inet_address ep, permit_id);
future<> add_saved_endpoint(locator::host_id host_id, loaded_endpoint_state st, permit_id);

future<> add_local_application_state(application_state state, versioned_value value);

Expand Down
28 changes: 19 additions & 9 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "db/consistency_level.hh"
#include "seastar/core/when_all.hh"
#include "service/tablet_allocator.hh"
#include "locator/types.hh"
#include "locator/tablets.hh"
#include "locator/tablet_metadata_guard.hh"
#include "replica/tablet_mutation_builder.hh"
Expand Down Expand Up @@ -660,7 +661,13 @@ future<> storage_service::topology_state_load() {
if (is_me(e)) {
continue;
}
const auto ep = tmptr->get_endpoint_for_host_id(e);
const auto& topo = tmptr->get_topology();
const auto* node = topo.find_node(e);
// node must exist in topology if it's in tmptr->get_all_endpoints
if (!node) {
on_internal_error(slogger, format("Found no node for {} in topology", e));
}
const auto& ep = node->endpoint();
if (ep == inet_address{}) {
continue;
}
Expand All @@ -669,7 +676,11 @@ future<> storage_service::topology_state_load() {
// since it is not loaded in join_cluster in the
// raft_topology_change_enabled() case.
if (!_gossiper.get_endpoint_state_ptr(ep)) {
co_await _gossiper.add_saved_endpoint(e, ep, permit.id());
gms::loaded_endpoint_state st;
st.endpoint = ep;
st.tokens = boost::copy_range<std::unordered_set<dht::token>>(tmptr->get_tokens(e));
st.opt_dc_rack = node->dc_rack();
co_await _gossiper.add_saved_endpoint(e, std::move(st), permit.id());
}
}

Expand Down Expand Up @@ -1362,7 +1373,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
for (const auto& [host_id, st] : loaded_endpoints) {
// gossiping hasn't started yet
// so no need to lock the endpoint
co_await _gossiper.add_saved_endpoint(host_id, st.endpoint, gms::null_permit_id);
co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
}
}
auto features = _feature_service.supported_feature_set();
Expand Down Expand Up @@ -2657,20 +2668,19 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = co_await get_mutable_token_metadata_ptr();
for (const auto& [host_id, st] : loaded_endpoints) {
if (st.endpoint == get_broadcast_address()) {
if (host_id == my_host_id()) {
// entry has been mistakenly added, delete it
co_await _sys_ks.local().remove_endpoint(st.endpoint);
} else if (st.endpoint == get_broadcast_address()) {
on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
} else {
if (host_id == my_host_id()) {
on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
}
const auto dc_rack = get_dc_rack(st);
tmptr->update_topology(host_id, dc_rack, locator::node::state::normal);
co_await tmptr->update_normal_tokens(st.tokens, host_id);
tmptr->update_host_id(host_id, st.endpoint);
// gossiping hasn't started yet
// so no need to lock the endpoint
co_await _gossiper.add_saved_endpoint(host_id, st.endpoint, gms::null_permit_id);
co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
}
}
co_await replicate_to_all_cores(std::move(tmptr));
Expand All @@ -2688,7 +2698,7 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
}));
auto loaded_peer_features = co_await _sys_ks.local().load_peer_features();
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
initial_contact_nodes, loaded_endpoints, loaded_peer_features.size());
initial_contact_nodes, loaded_endpoints | boost::adaptors::map_keys, loaded_peer_features.size());
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
Expand Down
6 changes: 3 additions & 3 deletions service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ public:
}

private:
inet_address get_broadcast_address() const noexcept {
return get_token_metadata_ptr()->get_topology().my_address();
}
locator::host_id my_host_id() const noexcept {
return get_token_metadata_ptr()->get_topology().my_host_id();
}
inet_address get_broadcast_address() const noexcept {
return get_token_metadata_ptr()->get_topology().my_address();
}
bool is_me(inet_address addr) const noexcept {
return get_token_metadata_ptr()->get_topology().is_me(addr);
}
Expand Down

0 comments on commit 94835e6

Please sign in to comment.