Skip to content

Commit

Permalink
raft: server: add trigger_snapshot API
Browse files Browse the repository at this point in the history
This allows the user of `raft::server` to ask it to create a snapshot
and truncate the Raft log. In a later commit we'll add a REST endpoint
to Scylla to trigger group 0 snapshots.

One use case for this API is to create group 0 snapshots in Scylla
deployments which upgraded to Raft in version 5.2 and started with an
empty Raft log with no snapshot at the beginning. This causes problems,
e.g. when a new node bootstraps to the cluster, it will not receive a
snapshot that would contain both schema and group 0 history, which would
then lead to inconsistent schema state and trigger assertion failures as
observed in scylladb#16683.

In 5.4 the logic of initial group 0 setup was changed to start the Raft
log with a snapshot at index 1 (ff386e7)
but a problem remains with these existing deployments coming from 5.2,
we need a way to trigger a snapshot in them (other than performing 1000
arbitrary schema changes).

Another potential use case in the future would be to trigger snapshots
based on external memory pressure in tablet Raft groups (for strongly
consistent tables).
  • Loading branch information
kbr-scylla authored and dgarcia360 committed Apr 30, 2024
1 parent 5568e0c commit 9123d64
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 6 deletions.
88 changes: 82 additions & 6 deletions raft/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,14 @@ class server_impl : public rpc_server, public server {
struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd

struct removed_from_config{}; // sent to applier_fiber when we're not a leader and we're outside the current configuration

struct trigger_snapshot_msg{};

using applier_fiber_message = std::variant<
std::vector<log_entry_ptr>,
snapshot_descriptor,
removed_from_config>;
removed_from_config,
trigger_snapshot_msg>;
queue<applier_fiber_message> _apply_entries = queue<applier_fiber_message>(10);

struct stats {
Expand Down Expand Up @@ -220,8 +224,10 @@ class server_impl : public rpc_server, public server {
absl::flat_hash_map<server_id, append_request_queue> _append_request_status;

struct server_requests {
bool snapshot = false;

bool empty() const {
return true;
return !snapshot;
}
};

Expand Down Expand Up @@ -298,6 +304,8 @@ class server_impl : public rpc_server, public server {

future<> wait_for_state_change(seastar::abort_source* as) override;

virtual future<bool> trigger_snapshot(seastar::abort_source* as) override;

// Get "safe to read" index from a leader
future<read_barrier_reply> get_read_idx(server_id leader, seastar::abort_source* as);
// Wait for an entry with a specific term to get committed or
Expand Down Expand Up @@ -454,6 +462,54 @@ future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
}
}

future<bool> server_impl::trigger_snapshot(seastar::abort_source* as) {
check_not_aborted();

if (_applied_idx <= _snapshot_desc_idx) {
logger.debug(
"[{}] trigger_snapshot: last persisted snapshot descriptor index is up-to-date"
", applied index: {}, persisted snapshot descriptor index: {}, last fsm log index: {}"
", last fsm snapshot index: {}", _id, _applied_idx, _snapshot_desc_idx,
_fsm->log_last_idx(), _fsm->log_last_snapshot_idx());
co_return false;
}

_new_server_requests.snapshot = true;
_events.signal();

// Wait for persisted snapshot index to catch up to this index.
auto awaited_idx = _applied_idx;

logger.debug("[{}] snapshot request waiting for index {}", _id, awaited_idx);

try {
optimized_optional<abort_source::subscription> sub;
if (as) {
as->check();
sub = as->subscribe([this] () noexcept { _snapshot_desc_idx_changed.broadcast(); });
assert(sub); // due to `check()` above
}
co_await _snapshot_desc_idx_changed.when([this, as, awaited_idx] {
return (as && as->abort_requested()) || awaited_idx <= _snapshot_desc_idx;
});
if (as) {
as->check();
}
} catch (abort_requested_exception&) {
throw request_aborted();
} catch (seastar::broken_condition_variable&) {
throw request_aborted();
}

logger.debug(
"[{}] snapshot request satisfied, awaited index {}, persisted snapshot descriptor index: {}"
", current applied index {}, last fsm log index {}, last fsm snapshot index {}",
_id, awaited_idx, _snapshot_desc_idx, _applied_idx,
_fsm->log_last_idx(), _fsm->log_last_snapshot_idx());

co_return true;
}

future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as) {
// The entry may have been already committed and even applied
// in case it was forwarded to the leader. In this case
Expand Down Expand Up @@ -1146,7 +1202,9 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
}

future<> server_impl::process_server_requests(server_requests&& requests) {
co_return;
if (requests.snapshot) {
co_await _apply_entries.push_eventually(trigger_snapshot_msg{});
}
}

future<> server_impl::io_fiber(index_t last_stable) {
Expand Down Expand Up @@ -1311,9 +1369,9 @@ future<> server_impl::applier_fiber() {
// Note that at this point (after the `co_await`), _fsm may already have applied a later snapshot.
// That's fine, `_fsm->apply_snapshot` will simply ignore our current attempt; we will soon receive
// a later snapshot from the queue.
if (!_fsm->apply_snapshot(snp,
force_snapshot ? 0 : _config.snapshot_trailing,
force_snapshot ? 0 : _config.snapshot_trailing_size, true)) {
auto max_trailing = force_snapshot ? 0 : _config.snapshot_trailing;
auto max_trailing_bytes = force_snapshot ? 0 : _config.snapshot_trailing_size;
if (!_fsm->apply_snapshot(snp, max_trailing, max_trailing_bytes, true)) {
logger.trace("[{}] applier fiber: while taking snapshot term={} idx={} id={},"
" fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx());
}
Expand All @@ -1335,6 +1393,23 @@ future<> server_impl::applier_fiber() {
// it may never know the status of entries it submitted.
drop_waiters();
co_return;
},
[this] (const trigger_snapshot_msg&) -> future<> {
auto applied_term = _fsm->log_term_for(_applied_idx);
// last truncation index <= snapshot index <= applied index
assert(applied_term);

snapshot_descriptor snp;
snp.term = *applied_term;
snp.idx = _applied_idx;
snp.config = _fsm->log_last_conf_for(_applied_idx);
logger.trace("[{}] taking snapshot at term={}, idx={} due to request", _id, snp.term, snp.idx);
snp.id = co_await _state_machine->take_snapshot();
if (!_fsm->apply_snapshot(snp, 0, 0, true)) {
logger.trace("[{}] while taking snapshot term={} idx={} id={} due to request,"
" fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx());
}
_stats.snapshots_taken++;
}
), v);

Expand Down Expand Up @@ -1489,6 +1564,7 @@ future<> server_impl::abort(sstring reason) {
logger.trace("[{}]: abort() called", _id);
_fsm->stop();
_events.broken();
_snapshot_desc_idx_changed.broken();

// IO and applier fibers may update waiters and start new snapshot
// transfers, so abort them first
Expand Down
16 changes: 16 additions & 0 deletions raft/server.hh
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,22 @@ public:
// It it passes nullptr, the function is unabortable.
virtual future<> wait_for_state_change(seastar::abort_source* as) = 0;

// Manually trigger snapshot creation and log truncation.
//
// Does nothing if the current apply index is less or equal to the last persisted snapshot descriptor index
// and returns `false`.
//
// Otherwise returns `true`; when the future resolves, it is guaranteed that the snapshot descriptor
// is persisted, but not that the snapshot is loaded to the state machine yet (it will be eventually).
//
// The request may be resolved by the regular snapshotting mechanisms (e.g. a snapshot
// is created because the Raft log grows too large). In this case there is no guarantee
// how many trailing entries will be left trailing behind the snapshot. However,
// if there are no operations running on the server concurrently with the request and all
// committed entries are already applied, the created snapshot is guaranteed to leave
// zero trailing entries.
virtual future<bool> trigger_snapshot(seastar::abort_source* as) = 0;

// Ad hoc functions for testing
virtual void wait_until_candidate() = 0;
virtual future<> wait_election_done() = 0;
Expand Down

0 comments on commit 9123d64

Please sign in to comment.