Skip to content

Commit

Permalink
change: RaftCore: merge last_log_{term,index} into last_log: LogId
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 9, 2021
1 parent 8e0b0df commit 9c5f3d7
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 38 deletions.
4 changes: 2 additions & 2 deletions async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
&mut self,
mut members: HashSet<NodeId>,
) -> Result<(), InitializeError> {
if self.core.last_log_index != 0 || self.core.current_term != 0 {
tracing::error!({self.core.last_log_index, self.core.current_term}, "rejecting init_with_config request as last_log_index or current_term is 0");
if self.core.last_log.index != 0 || self.core.current_term != 0 {
tracing::error!({self.core.last_log.index, self.core.current_term}, "rejecting init_with_config request as last_log_index or current_term is 0");
return Err(InitializeError::NotAllowed);
}

Expand Down
21 changes: 12 additions & 9 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::Update;
Expand Down Expand Up @@ -62,7 +63,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// log info, then replication is g2g.
let msg_prev_index_is_min = msg.prev_log_index == u64::min_value();
let msg_index_and_term_match =
(msg.prev_log_index == self.last_log_index) && (msg.prev_log_term == self.last_log_term);
(msg.prev_log_index == self.last_log.index) && (msg.prev_log_term == self.last_log.term);
if msg_prev_index_is_min || msg_index_and_term_match {
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
Expand Down Expand Up @@ -99,8 +100,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
term: self.current_term,
success: false,
conflict_opt: Some(ConflictOpt {
term: self.last_log_term,
index: self.last_log_index,
term: self.last_log.term,
index: self.last_log.index,
}),
});
}
Expand All @@ -110,7 +111,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if target_entry.term == msg.prev_log_term {
// We've found a point of agreement with the leader. If we have any logs present
// with an index greater than this, then we must delete them per §5.3.
if self.last_log_index > target_entry.index {
if self.last_log.index > target_entry.index {
self.storage
.delete_logs_from(target_entry.index + 1, None)
.await
Expand Down Expand Up @@ -139,8 +140,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
index: entry.index,
}),
None => Some(ConflictOpt {
term: self.last_log_term,
index: self.last_log_index,
term: self.last_log.term,
index: self.last_log.index,
}),
};
if report_metrics {
Expand Down Expand Up @@ -191,8 +192,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// Replicate entries to log (same as append, but in follower mode).
self.storage.replicate_to_log(entries).await.map_err(|err| self.map_fatal_storage_error(err))?;
if let Some(entry) = entries.last() {
self.last_log_index = entry.index;
self.last_log_term = entry.term;
self.last_log = LogId {
term: entry.term,
index: entry.index,
};
}
Ok(())
}
Expand Down Expand Up @@ -272,7 +275,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// from the AppendEntries RPC handler.
#[tracing::instrument(level = "trace", skip(self))]
async fn initial_replicate_to_state_machine(&mut self) {
let stop = std::cmp::min(self.commit_index, self.last_log_index) + 1;
let stop = std::cmp::min(self.commit_index, self.last_log.index) + 1;
let start = self.last_applied + 1;
let storage = self.storage.clone();

Expand Down
12 changes: 6 additions & 6 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
pub(super) async fn commit_initial_leader_entry(&mut self) -> RaftResult<()> {
// If the cluster has just formed, and the current index is 0, then commit the current
// config, else a blank payload.
let req: ClientWriteRequest<D> = if self.core.last_log_index == 0 {
let req: ClientWriteRequest<D> = if self.core.last_log.index == 0 {
ClientWriteRequest::new_config(self.core.membership.clone())
} else {
ClientWriteRequest::new_blank_payload()
Expand All @@ -71,8 +71,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Check to see if we have any config change logs newer than our commit index. If so, then
// we need to drive the commitment of the config change to the cluster.
let mut pending_config = None; // The inner bool represents `is_in_joint_consensus`.
if self.core.last_log_index > self.core.commit_index {
let (stale_logs_start, stale_logs_stop) = (self.core.commit_index + 1, self.core.last_log_index + 1);
if self.core.last_log.index > self.core.commit_index {
let (stale_logs_start, stale_logs_stop) = (self.core.commit_index + 1, self.core.last_log.index + 1);
pending_config = self
.core
.storage
Expand All @@ -93,7 +93,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Commit the initial payload to the cluster.
let (tx_payload_committed, rx_payload_committed) = oneshot::channel();
let entry = self.append_payload_to_log(req.entry).await?;
self.core.last_log_term = self.core.current_term; // This only ever needs to be updated once per term.
self.core.last_log.term = self.core.current_term; // This only ever needs to be updated once per term.
let cr_entry = ClientRequestEntry::from_entry(entry, tx_payload_committed);
self.replicate_client_request(cr_entry).await;
self.leader_report_metrics();
Expand Down Expand Up @@ -253,7 +253,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "trace", skip(self, payload))]
pub(super) async fn append_payload_to_log(&mut self, payload: EntryPayload<D>) -> RaftResult<Entry<D>> {
let entry = Entry {
index: self.core.last_log_index + 1,
index: self.core.last_log.index + 1,
term: self.core.current_term,
payload,
};
Expand All @@ -262,7 +262,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.append_entry_to_log(&entry)
.await
.map_err(|err| self.core.map_fatal_storage_error(err))?;
self.core.last_log_index = entry.index;
self.core.last_log.index = entry.index;
Ok(entry)
}

Expand Down
5 changes: 2 additions & 3 deletions async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
mut snapshot: Box<S::Snapshot>,
) -> RaftResult<()> {
snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?;
let delete_through = if self.last_log_index > req.last_included.index {
let delete_through = if self.last_log.index > req.last_included.index {
Some(req.last_included.index)
} else {
None
Expand All @@ -182,8 +182,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.map_err(|err| self.map_fatal_storage_error(err))?;
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
self.update_membership(membership)?;
self.last_log_index = req.last_included.index;
self.last_log_term = req.last_included.term;
self.last_log = req.last_included;
self.last_applied = req.last_included.index;
self.snapshot_last_included = req.last_included;
self.report_metrics(Update::Ignore);
Expand Down
17 changes: 7 additions & 10 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
/// first-come-first-served basis. See §5.4.1 for additional restriction on votes.
voted_for: Option<NodeId>,

/// The index of the last entry to be appended to the log.
last_log_index: u64,
/// The term of the last entry to be appended to the log.
last_log_term: u64,
/// The last entry to be appended to the log.
last_log: LogId,

/// The node's current snapshot state.
snapshot_state: Option<SnapshotState<S::Snapshot>>,
Expand Down Expand Up @@ -174,8 +172,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
current_term: 0,
current_leader: None,
voted_for: None,
last_log_index: 0,
last_log_term: 0,
last_log: LogId { term: 0, index: 0 },
snapshot_state: None,
snapshot_last_included: LogId { term: 0, index: 0 },
entries_cache: Default::default(),
Expand All @@ -197,8 +194,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
async fn main(mut self) -> RaftResult<()> {
tracing::trace!("raft node is initializing");
let state = self.storage.get_initial_state().await.map_err(|err| self.map_fatal_storage_error(err))?;
self.last_log_index = state.last_log_index;
self.last_log_term = state.last_log_term;
self.last_log.index = state.last_log_index;
self.last_log.term = state.last_log_term;
self.current_term = state.hard_state.current_term;
self.voted_for = state.hard_state.voted_for;
self.membership = state.membership;
Expand All @@ -216,7 +213,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.report_metrics(Update::Ignore);
}

let has_log = self.last_log_index != u64::min_value();
let has_log = self.last_log.index != u64::min_value();
let single = self.membership.members.len() == 1;
let is_candidate = self.membership.contains(&self.id);

Expand Down Expand Up @@ -284,7 +281,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
id: self.id,
state: self.target_state,
current_term: self.current_term,
last_log_index: self.last_log_index,
last_log_index: self.last_log.index,
last_applied: self.last_applied,
current_leader: self.current_leader,
membership_config: self.membership.clone(),
Expand Down
10 changes: 5 additions & 5 deletions async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
target,
self.core.current_term,
self.core.config.clone(),
self.core.last_log_index,
self.core.last_log_term,
self.core.last_log.index,
self.core.last_log.term,
self.core.commit_index,
self.core.network.clone(),
self.core.storage.clone(),
self.replicationtx.clone(),
);
ReplicationState {
matched: (self.core.current_term, self.core.last_log_index).into(),
matched: (self.core.current_term, self.core.last_log.index).into(),
replstream,
remove_after_commit: None,
}
Expand Down Expand Up @@ -241,7 +241,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// this node is me, the leader
if *id == self.core.id {
// TODO: can it be sure that self.core.last_log_term is the term of this leader?
rst.push((self.core.last_log_index, self.core.last_log_term));
rst.push((self.core.last_log.index, self.core.last_log.term));
continue;
}

Expand Down Expand Up @@ -288,7 +288,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
if let Some(snapshot) = current_snapshot_opt {
// If snapshot exists, ensure its distance from the leader's last log index is <= half
// of the configured snapshot threshold, else create a new snapshot.
if snapshot_is_within_half_of_threshold(&snapshot.included.index, &self.core.last_log_index, &threshold) {
if snapshot_is_within_half_of_threshold(&snapshot.included.index, &self.core.last_log.index, &threshold) {
let _ = tx.send(snapshot);
return Ok(());
}
Expand Down
6 changes: 3 additions & 3 deletions async-raft/src/core/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// Check if candidate's log is at least as up-to-date as this node's.
// If candidate's log is not at least as up-to-date as this node, then reject.
let client_is_uptodate =
(msg.last_log_term >= self.last_log_term) && (msg.last_log_index >= self.last_log_index);
(msg.last_log_term >= self.last_log.term) && (msg.last_log_index >= self.last_log.index);
if !client_is_uptodate {
tracing::trace!(
{ candidate = msg.candidate_id },
Expand Down Expand Up @@ -154,8 +154,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let rpc = VoteRequest::new(
self.core.current_term,
self.core.id,
self.core.last_log_index,
self.core.last_log_term,
self.core.last_log.index,
self.core.last_log.term,
);
let (network, tx_inner) = (self.core.network.clone(), tx.clone());
let _ = tokio::spawn(
Expand Down

0 comments on commit 9c5f3d7

Please sign in to comment.