Skip to content

Commit

Permalink
gossiper: load_saved_endpoint: set dc and rack
Browse files Browse the repository at this point in the history
When loading endpoint_state from system.peers,
pass the loaded nodes dc/rack info from
storage_service::join_token_ring to gossiper::add_saved_endpoint.

Load the endpoint DC/RACK information to the endpoint_state,
if available so they can propagate to bootstrapping nodes
via gossip, even if those nodes are DOWN after a full cluster-restart.

Note that this change makes the host_id presence
mandatory following scylladb#16376.
The reason to do so is that the other states: tokens, dc, and rack
are useless with the host_id.
This change is backward compatible since the HOST_ID application state
was written to system.peers since inception in scylla
and it would be missing only due to potential exception
in older versions that failed to write it.
In this case, manual intervention is needed and
the correct HOST_ID needs to be manually updated in system.peers.

Refs scylladb#15787

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Feb 18, 2024
1 parent 9426a4e commit a5b3dea
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 50 deletions.
28 changes: 13 additions & 15 deletions gms/gossiper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2134,12 +2134,16 @@ void gossiper::build_seeds_list() {
}
}

future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) {
future<> gossiper::add_saved_endpoint(inet_address ep, loaded_endpoint_state loaded_state, permit_id pid) {
if (ep == get_broadcast_address()) {
logger.debug("Attempt to add self as saved endpoint");
co_return;
}

if (!loaded_state.host_id) {
on_internal_error(logger, format("can't find host_id for ep {} for adding saved endpoint", ep));
}

auto permit = co_await lock_endpoint(ep, pid);

//preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
Expand All @@ -2154,24 +2158,18 @@ future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) {
logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state);
ep_state.update_timestamp();
}
// It's okay to use the local version generator for the loaded application state values
// As long as the endpoint_state has zero generation.
// It will get updated as a whole by handle_major_state_change
// via do_apply_state_locally when (remote_generation > local_generation)
const auto tmptr = get_token_metadata_ptr();
auto host_id = tmptr->get_host_id_if_known(ep);
if (host_id) {
ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value()));
auto tokens = tmptr->get_tokens(*host_id);
if (!tokens.empty()) {
std::unordered_set<dht::token> tokens_set(tokens.begin(), tokens.end());
ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(tokens_set));
}
ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(loaded_state.host_id));
if (!loaded_state.tokens.empty()) {
ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(loaded_state.tokens));
}
if (loaded_state.opt_dc_rack) {
ep_state.add_application_state(gms::application_state::DC, versioned_value::datacenter(loaded_state.opt_dc_rack->dc));
ep_state.add_application_state(gms::application_state::RACK, versioned_value::datacenter(loaded_state.opt_dc_rack->rack));
}
auto generation = ep_state.get_heart_beat_state().get_generation();
co_await replicate(ep, std::move(ep_state), permit.id());
_unreachable_endpoints[ep] = now();
logger.trace("Adding saved endpoint {} {}", ep, generation);
logger.debug("Added saved endpoint {} generation={} version={}: {}", ep, generation, ep_state.get_heart_beat_state().get_heart_beat_version(), ep_state);
}

future<> gossiper::add_local_application_state(application_state state, versioned_value value) {
Expand Down
9 changes: 8 additions & 1 deletion gms/gossiper.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/core/scheduling.hh>
#include "locator/token_metadata.hh"
#include "locator/types.hh"

namespace db {
class config;
Expand Down Expand Up @@ -76,6 +77,12 @@ struct gossip_config {
uint32_t skip_wait_for_gossip_to_settle = -1;
};

struct loaded_endpoint_state {
locator::host_id host_id;
std::unordered_set<dht::token> tokens;
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
};

/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
Expand Down Expand Up @@ -608,7 +615,7 @@ public:
/**
* Add an endpoint we knew about previously, but whose state is unknown
*/
future<> add_saved_endpoint(inet_address ep, permit_id);
future<> add_saved_endpoint(inet_address ep, loaded_endpoint_state, permit_id);

future<> add_local_application_state(application_state state, versioned_value value);

Expand Down
69 changes: 36 additions & 33 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "db/consistency_level.hh"
#include "seastar/core/when_all.hh"
#include "service/tablet_allocator.hh"
#include "locator/types.hh"
#include "locator/tablets.hh"
#include "locator/tablet_metadata_guard.hh"
#include "replica/tablet_mutation_builder.hh"
Expand Down Expand Up @@ -669,7 +670,14 @@ future<> storage_service::topology_state_load() {
// since it is not loaded in join_cluster in the
// raft_topology_change_enabled() case.
if (!_gossiper.get_endpoint_state_ptr(ep)) {
co_await _gossiper.add_saved_endpoint(ep, permit.id());
gms::loaded_endpoint_state st;
const auto& topo = tmptr->get_topology();
// node must exist in topology if it's in tmptr->get_all_endpoints
const auto* node = topo.find_node(e);
st.host_id = node->host_id();
st.tokens = boost::copy_range<std::unordered_set<dht::token>>(tmptr->get_tokens(e));
st.opt_dc_rack = node->dc_rack();
co_await _gossiper.add_saved_endpoint(ep, std::move(st), permit.id());
}
}

Expand Down Expand Up @@ -1284,7 +1292,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
sharded<service::storage_proxy>& proxy,
sharded<gms::gossiper>& gossiper,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, gms::loaded_endpoint_state> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
std::chrono::milliseconds delay,
start_hint_manager start_hm,
Expand Down Expand Up @@ -1364,10 +1372,10 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
my_ip, local_host_id));
}
co_await _gossiper.reset_endpoint_state_map();
for (auto ep : loaded_endpoints) {
for (auto& [ep, st] : loaded_endpoints) {
// gossiping hasn't started yet
// so no need to lock the endpoint
co_await _gossiper.add_saved_endpoint(ep, gms::null_permit_id);
co_await _gossiper.add_saved_endpoint(ep, std::move(st), gms::null_permit_id);
}
}
auto features = _feature_service.supported_feature_set();
Expand Down Expand Up @@ -2641,52 +2649,47 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&

set_mode(mode::STARTING);

std::unordered_set<inet_address> loaded_endpoints;
std::unordered_map<inet_address, gms::loaded_endpoint_state> loaded_endpoints;
if (_db.local().get_config().load_ring_state() && !raft_topology_change_enabled()) {
slogger.info("Loading persisted ring state");
auto loaded_tokens = co_await _sys_ks.local().load_tokens();
auto loaded_host_ids = co_await _sys_ks.local().load_host_ids();
auto loaded_dc_rack = co_await _sys_ks.local().load_dc_rack_info();
for (auto& [ep, tokens] : co_await _sys_ks.local().load_tokens()) {
loaded_endpoints[ep].tokens = std::move(tokens);
}
for (auto& [ep, host_id] : co_await _sys_ks.local().load_host_ids()) {
loaded_endpoints[ep].host_id = std::move(host_id);
}
for (auto& [ep, dc_rack] : co_await _sys_ks.local().load_dc_rack_info()) {
loaded_endpoints[ep].opt_dc_rack = std::move(dc_rack);
}

auto get_dc_rack = [&loaded_dc_rack] (inet_address ep) {
if (loaded_dc_rack.contains(ep)) {
return loaded_dc_rack[ep];
} else {
return locator::endpoint_dc_rack::default_location;
}
auto get_dc_rack = [] (const gms::loaded_endpoint_state& st) {
return st.opt_dc_rack.value_or(locator::endpoint_dc_rack::default_location);
};

if (slogger.is_enabled(logging::log_level::debug)) {
for (auto& x : loaded_tokens) {
slogger.debug("Loaded tokens: endpoint={}, tokens={}", x.first, x.second);
}

for (auto& x : loaded_host_ids) {
slogger.debug("Loaded host_id: endpoint={}, uuid={}", x.first, x.second);
for (const auto& [ep, st] : loaded_endpoints) {
auto dc_rack = get_dc_rack(st);
slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", ep, st.host_id, dc_rack.dc, dc_rack.rack, st.tokens);
}
}

auto tmlock = co_await get_token_metadata_lock();
auto tmptr = co_await get_mutable_token_metadata_ptr();
for (auto x : loaded_tokens) {
auto ep = x.first;
auto tokens = x.second;
for (const auto& [ep, st] : loaded_endpoints) {
if (ep == get_broadcast_address()) {
// entry has been mistakenly added, delete it
co_await _sys_ks.local().remove_endpoint(ep);
} else {
const auto dc_rack = get_dc_rack(ep);
const auto hostIdIt = loaded_host_ids.find(ep);
if (hostIdIt == loaded_host_ids.end()) {
const auto dc_rack = get_dc_rack(st);
if (!st.host_id) {
on_internal_error(slogger, format("can't find host_id for ep {}", ep));
}
tmptr->update_topology(hostIdIt->second, dc_rack, locator::node::state::normal);
co_await tmptr->update_normal_tokens(tokens, hostIdIt->second);
tmptr->update_host_id(hostIdIt->second, ep);
loaded_endpoints.insert(ep);
tmptr->update_topology(st.host_id, dc_rack, locator::node::state::normal);
co_await tmptr->update_normal_tokens(st.tokens, st.host_id);
tmptr->update_host_id(st.host_id, ep);
// gossiping hasn't started yet
// so no need to lock the endpoint
co_await _gossiper.add_saved_endpoint(ep, gms::null_permit_id);
co_await _gossiper.add_saved_endpoint(ep, st, gms::null_permit_id);
}
}
co_await replicate_to_all_cores(std::move(tmptr));
Expand All @@ -2699,10 +2702,10 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
auto seeds = _gossiper.get_seeds();
auto initial_contact_nodes = loaded_endpoints.empty() ?
std::unordered_set<gms::inet_address>(seeds.begin(), seeds.end()) :
loaded_endpoints;
boost::copy_range<std::unordered_set<gms::inet_address>>(loaded_endpoints | boost::adaptors::map_keys);
auto loaded_peer_features = co_await _sys_ks.local().load_peer_features();
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
initial_contact_nodes, loaded_endpoints, loaded_peer_features.size());
initial_contact_nodes, loaded_endpoints | boost::adaptors::map_keys, loaded_peer_features.size());
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
Expand Down
3 changes: 2 additions & 1 deletion service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class range_streamer;
namespace gms {
class feature_service;
class gossiper;
class loaded_endpoint_state;
};

namespace service {
Expand Down Expand Up @@ -370,7 +371,7 @@ private:
sharded<service::storage_proxy>& proxy,
sharded<gms::gossiper>& gossiper,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, gms::loaded_endpoint_state> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
std::chrono::milliseconds,
start_hint_manager start_hm,
Expand Down

0 comments on commit a5b3dea

Please sign in to comment.