diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index c13219e..d400fe1 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -705,25 +705,7 @@ where return; } - let mut state_machine = self.state_machine.lock().unwrap(); - for entry in entries { - // Drop local inconsistent logs. - if entry.index <= self.get_last_log_index() - && entry.term != self.get_term_at_index(entry.index).unwrap() { - for i in entry.index..self.log.len() { - state_machine.register_transition_state( - self.log[i].transition.get_id(), - TransitionState::Abandoned(TransitionAbandonedReason::ConflictWithLeader) - ); - } - self.log.truncate(entry.index); - } - - // Push received logs. - if entry.index == self.log.len() + self.index_offset { - self.log.push(entry); - } - } + self.process_entries(entries); // Update local commit index to either the received commit index or the // latest local log position, whichever is smaller. @@ -745,6 +727,29 @@ where ); } + fn process_entries(&mut self, entries: Vec>) { + let mut state_machine = self.state_machine.lock().unwrap(); + for entry in entries { + // Drop local inconsistent logs. + if entry.index <= self.get_last_log_index() + && entry.term != self.get_term_at_index(entry.index).unwrap() + { + for i in entry.index..self.log.len() { + state_machine.register_transition_state( + self.log[i].transition.get_id(), + TransitionState::Abandoned(TransitionAbandonedReason::ConflictWithLeader), + ); + } + self.log.truncate(entry.index); + } + + // Push received logs. + if entry.index == self.log.len() + self.index_offset { + self.log.push(entry); + } + } + } + fn process_message_as_follower(&mut self, message: Message) { match message { Message::VoteRequest {