Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add on_install_snapshot callback for statemachine #446

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ void FSMCaller::do_stop_following(const LeaderChangeContext& stop_following_cont
_fsm->on_stop_following(stop_following_context);
}

void FSMCaller::on_pre_send_snapshot(const PeerId& peer_id) {
_fsm->on_pre_send_snapshot(peer_id);
}

void FSMCaller::describe(std::ostream &os, bool use_html) {
const char* newline = (use_html) ? "<br>" : "\n";
TaskType cur_task = _cur_task;
Expand Down
1 change: 1 addition & 0 deletions src/braft/fsm_caller.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class BAIDU_CACHELINE_ALIGNMENT FSMCaller {
int on_leader_start(int64_t term, int64_t lease_epoch);
int on_start_following(const LeaderChangeContext& start_following_context);
int on_stop_following(const LeaderChangeContext& stop_following_context);
void on_pre_send_snapshot(const PeerId& peer_id);
BRAFT_MOCK int on_error(const Error& e);
int64_t last_applied_index() const {
return _last_applied_index.load(butil::memory_order_relaxed);
Expand Down
4 changes: 4 additions & 0 deletions src/braft/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,10 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) {
// We have last snapshot index
_virtual_first_log_id = last_but_one_snapshot_id;
truncate_prefix(last_but_one_snapshot_id.index + 1, lck);
} else {
// after restart, no followers, we can truncate log safely
_virtual_first_log_id = _last_snapshot_id;
truncate_prefix(meta->last_included_index() + 1, lck);
}
return;
} else {
Expand Down
5 changes: 5 additions & 0 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,11 @@ void NodeImpl::on_error(const Error& e) {
lck.unlock();
}

void NodeImpl::pre_send_snapshot(const PeerId& peer_id) {
_fsm_caller->on_pre_send_snapshot(peer_id);
}


void NodeImpl::handle_vote_timeout() {
std::unique_lock<raft_mutex_t> lck(_mutex);

Expand Down
3 changes: 3 additions & 0 deletions src/braft/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ friend class VoteBallotCtx;

bool disable_cli() const { return _options.disable_cli; }
bool is_witness() const { return _options.witness; }

// Called when leader start to send snapshot to remote peer
void pre_send_snapshot(const PeerId& peer_id);
private:
friend class butil::RefCountedThreadSafe<NodeImpl>;

Expand Down
1 change: 1 addition & 0 deletions src/braft/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ void StateMachine::on_configuration_committed(const Configuration& conf, int64_t

void StateMachine::on_stop_following(const LeaderChangeContext&) {}
void StateMachine::on_start_following(const LeaderChangeContext&) {}
void StateMachine::on_pre_send_snapshot(const PeerId& peer_id) {}

BootstrapOptions::BootstrapOptions()
: last_log_index(0)
Expand Down
5 changes: 5 additions & 0 deletions src/braft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ class StateMachine {
// the very leader whom the follower starts to follow.
// User can reset the node's information as it starts to follow some leader.
virtual void on_start_following(const ::braft::LeaderChangeContext& ctx);

// Invoked when the leader start to send snapshot to |peer_id|
// Default: Do nothing
virtual void on_pre_send_snapshot(const PeerId& peer_id);

};

enum State {
Expand Down
2 changes: 2 additions & 0 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,8 @@ void Replicator::_install_snapshot() {
add_one_more_task(true)) {
return _block(butil::gettimeofday_us(), EBUSY);
}

node_impl->pre_send_snapshot(_options.peer_id);

// pre-set replicator state to INSTALLING_SNAPSHOT, so replicator could be
// blocked if something is wrong, such as throttled for a period of time
Expand Down
Loading