Skip to content

Commit

Permalink
fix: when append-entries, deleting entries after prev-log-id causes c…
Browse files Browse the repository at this point in the history
…ommitted entry to be lost

Problem:

When append-entries, raft core removes old entries after
`prev_log_id.index`, then append new logs sent from leader.

Since deleting then appending entries are not atomic(two calls to `RaftStore`),
deleting consistent entries may cause loss of committed entries, if
server crashes after the delete.

E.g., an example cluster state with logs as following and R1 now is the leader:

```
R1 1,1  1,2  1,3
R2 1,1  1,2
R3
```

Committed entry `{1,2}` gets lost after the following steps:

- R1 to R2: `append_entries(entries=[{1,2}, {1,3}], prev_log_id={1,1})`
- R2 deletes 1,2
- R2 crash
- R2 is elected as leader with R3, and only see 1,1; the committed entry 1,2 is lost.

Solution:

The safe way is to skip every entry that are consistent to the leader.
And delete only the inconsistent entries.

Another issue with this solution is that:

Because we can not just delete `log[prev_log_id.index..]`, the commit index:
- must be update only after append-entries,
- and must point to a log entry that is consistent to leader.

Or there could be chance applying an uncommitted entry:

```
R0 1,1  1,2  3,3
R1 1,1  1,2  2,3
R2 1,1  1,2  3,3
```

- R0 to R1 `append_entries: entries=[{1,2}], prev_log_id = {1,1}, commit_index = 3`
- R1 accepted this `append_entries` request but was not aware of that entry {2,3} is inconsistent to leader.
  Updating commit index to 3 allows it to apply an uncommitted entrie `{2,3}`.
  • Loading branch information
drmingdrmer committed Aug 31, 2021
1 parent 05ecf7e commit 9540c90
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 114 deletions.
292 changes: 185 additions & 107 deletions async-raft/src/core/append_entries.rs
Expand Up @@ -29,6 +29,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
) -> RaftResult<AppendEntriesResponse> {
tracing::debug!(%self.last_log_id);

let mut msg_entries = msg.entries.as_slice();
let mut prev_log_id = msg.prev_log_id;

// If message's term is less than most recent term, then we do not honor the request.
if msg.term < self.current_term {
tracing::debug!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term");
Expand All @@ -39,63 +42,61 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
});
}

// Update election timeout.
// TODO(xp): only update commit_index if the log present. e.g., append entries first, then update commit_index.
self.update_next_election_timeout(true);
let mut report_metrics = false;

// The value for `self.commit_index` is only updated here when not the leader.
self.commit_index = msg.leader_commit;
// Caveat: Because we can not just delete `log[prev_log_id.index..]`, (which results in loss of committed
// entry), the commit index must be update only after append-entries
// and must point to a log entry that is consistent to leader.
// Or there would be chance applying an uncommitted entry:
//
// ```
// R0 1,1 1,2 3,3
// R1 1,1 1,2 2,3
// R2 1,1 1,2 3,3
// ```
//
// - R0 to R1 append_entries: entries=[{1,2}], prev_log_id = {1,1}, commit_index = 3
// - R1 accepted this append_entries request but was not aware of that entry {2,3} is inconsistent to leader.
// Then it will update commit_index to 3 and apply {2,3}

let valid_commit_index = msg_entries.last().map(|x| x.log_id.index).unwrap_or(prev_log_id.index);
let valid_commit_index = std::cmp::min(msg.leader_commit, valid_commit_index);

tracing::debug!("start to check and update to latest term/leader");
{
let mut report_metrics = false;

if msg.term > self.current_term {
self.update_current_term(msg.term, None);
self.save_hard_state().await?;
report_metrics = true;
}

// Update current term if needed.
if self.current_term != msg.term {
self.update_current_term(msg.term, None);
self.save_hard_state().await?;
report_metrics = true;
}
// Update current leader if needed.
if self.current_leader.as_ref() != Some(&msg.leader_id) {
self.update_current_leader(UpdateCurrentLeader::OtherNode(msg.leader_id));
report_metrics = true;
}

// Update current leader if needed.
if self.current_leader.as_ref() != Some(&msg.leader_id) {
self.update_current_leader(UpdateCurrentLeader::OtherNode(msg.leader_id));
report_metrics = true;
if report_metrics {
self.report_metrics(Update::Ignore);
}
}

// Transition to follower state if needed.
if !self.target_state.is_follower() && !self.target_state.is_non_voter() {
self.set_target_state(State::Follower);
}

// If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local
// log info, then replication is g2g.
let msg_prev_index_is_min = msg.prev_log_id.index == u64::MIN;
let msg_index_and_term_match = msg.prev_log_id == self.last_log_id;

if msg_prev_index_is_min || msg_index_and_term_match {
if !msg.entries.is_empty() {
self.append_log_entries(&msg.entries).await?;
}
self.replicate_to_state_machine_if_needed().await?;

if report_metrics {
self.report_metrics(Update::Ignore);
}

return Ok(AppendEntriesResponse {
term: self.current_term,
success: true,
conflict_opt: None,
});
if prev_log_id.index == u64::MIN || prev_log_id == self.last_log_id {
// Matches! Great!
return self.append_apply_log_entries(msg_entries, valid_commit_index).await;
}

/////////////////////////////////////
//// Begin Log Consistency Check ////
tracing::debug!("begin log consistency check");

if self.last_log_id.index < msg.prev_log_id.index {
if report_metrics {
self.report_metrics(Update::Ignore);
}

// Lagging too much, let the leader to retry append_entries from my last_log.index
if self.last_log_id.index < prev_log_id.index {
return Ok(AppendEntriesResponse {
term: self.current_term,
success: false,
Expand All @@ -105,93 +106,170 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
});
}

// last_log_id.index >= prev_log_id.index
// prev_log_id.index <= last_log_id.index

// Previous log info doesn't immediately line up, so perform log consistency check and proceed based on its
// result.
let prev_entry = self
.storage
.try_get_log_entry(msg.prev_log_id.index)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
// Log entries upto last_applied may be removed.
// The applied entries are also committed thus always be consistent with the leader.
// Align the prev_log_id to last_applied.
let local_prev_log_id = self.earliest_log_id_since(prev_log_id.index).await?;

let target_entry = match prev_entry {
Some(target_entry) => target_entry,
None => {
// This can only happen if the target entry is removed, e.g., when installing snapshot or log
// compaction.
// Use the last known index & term as a conflict opt.
if prev_log_id.index < local_prev_log_id.index {
let distance = local_prev_log_id.index - prev_log_id.index;

if report_metrics {
self.report_metrics(Update::Ignore);
}
prev_log_id = local_prev_log_id;

return Ok(AppendEntriesResponse {
term: self.current_term,
success: false,
conflict_opt: Some(ConflictOpt {
log_id: self.last_log_id,
}),
});
}
};
msg_entries = if msg_entries.len() > distance as usize {
&msg_entries[distance as usize..]
} else {
&[]
};
}

// last_applied.index <= prev_log_id.index <= last_log_id.index

// The target entry was found. Compare its term with target term to ensure everything is consistent.
if target_entry.log_id.term == msg.prev_log_id.term {
if local_prev_log_id == prev_log_id {
// We've found a point of agreement with the leader. If we have any logs present
// with an index greater than this, then we must delete them per §5.3.
if self.last_log_id.index > target_entry.log_id.index {

if self.last_log_id.index > prev_log_id.index {
self.delete_inconsistent_log(prev_log_id, msg_entries).await?;
}
tracing::debug!("end log consistency check");

return self.append_apply_log_entries(msg_entries, valid_commit_index).await;
}

let last_match = self.last_possible_matched(prev_log_id).await?;

Ok(AppendEntriesResponse {
term: self.current_term,
success: false,
conflict_opt: Some(ConflictOpt { log_id: last_match }),
})
}

#[tracing::instrument(level = "debug", skip(self))]
async fn earliest_log_id_since(&mut self, index: u64) -> RaftResult<LogId> {
if index <= self.last_applied.index {
return Ok(self.last_applied);
}

// last_applied.index < prev_log_id.index <= last_log_id.index

let x = self.storage.get_log_entries(index..=index).await.map_err(|err| self.map_fatal_storage_error(err))?;

let entry = x
.first()
.ok_or_else(|| self.map_fatal_storage_error(anyhow::anyhow!("log entry not found at: {}", index)))?;

Ok(entry.log_id)
}

#[tracing::instrument(level="debug", skip(self, msg_entries), fields(msg_entries=%msg_entries.summary()))]
async fn delete_inconsistent_log(&mut self, prev_log_id: LogId, msg_entries: &[Entry<D>]) -> RaftResult<()> {
// Caveat: Deleting then appending entries are not atomic, thus deleting consistent entries may cause loss of
// committed logs.
//
// E.g., the logs are as following and R1 now is the leader:
//
// ```
// R1 1,1 1,2 1,3
// R2 1,1 1,2
// R3
// ```
// When the following steps take place, committed entry `{1,2}` is lost:
//
// - R1 to R2: append_entries(entries=[{1,2}, {1,3}], prev_log_id={1,1})
// - R2 deletes 1,2
// - R2 crash
// - R2 elected as leader and only see 1,1; the committed entry 1,2 is lost.
//
// **The safe way is to skip every entry that present in append_entries message then delete only the
// inconsistent entries**.

let end = std::cmp::min(prev_log_id.index + msg_entries.len() as u64, self.last_log_id.index + 1);

tracing::debug!(
"find and delete inconsistent log entries [{}, {}), last_log_id: {}, entries: {}",
prev_log_id,
end,
self.last_log_id,
msg_entries.summary()
);

let entries = self
.storage
.get_log_entries(prev_log_id.index..end)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;

for (i, ent) in entries.iter().enumerate() {
if ent.log_id.term != msg_entries[i].log_id.term {
tracing::debug!("delete inconsistent log entries from: {}", ent.log_id);

self.storage
.delete_logs_from(target_entry.log_id.index + 1..)
.delete_logs_from(ent.log_id.index..)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;

let membership =
self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;

self.update_membership(membership)?;

break;
}
}
// The target entry does not have the same term. Fetch the last 50 logs, and use the last
// entry of that payload which is still in the target term for conflict optimization.
else {
let mut start = if msg.prev_log_id.index >= 50 {
msg.prev_log_id.index - 50
} else {
0
};
if start == 0 {
start = 1
}
let old_entries = self
.storage
.get_log_entries(start..msg.prev_log_id.index)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
Ok(())
}

let first = old_entries.first().map(|x| x.log_id).unwrap_or_default();
/// Walks backward 50 entries to find the last log entry that has the same `term` as `prev_log_id`, which is
/// consistent to the leader. Further replication will use this log id as `prev_log_id` to sync log.
#[tracing::instrument(level = "debug", skip(self))]
async fn last_possible_matched(&mut self, prev_log_id: LogId) -> RaftResult<LogId> {
let start = prev_log_id.index.saturating_sub(50);

let opt = match old_entries.iter().rev().find(|entry| entry.log_id.term <= msg.prev_log_id.term) {
Some(entry) => Some(ConflictOpt { log_id: entry.log_id }),
None => Some(ConflictOpt { log_id: first }),
};
if report_metrics {
self.report_metrics(Update::Ignore);
}
return Ok(AppendEntriesResponse {
term: self.current_term,
success: false,
conflict_opt: opt,
});
if start < self.last_applied.index {
// The applied log is always consistent to leader.
return Ok(self.last_applied);
}

///////////////////////////////////
//// End Log Consistency Check ////
tracing::debug!("end log consistency check");
if start == 0 {
// A simple way is to sync from the beginning.
return Ok(LogId { term: 0, index: 0 });
}

self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed().await?;
if report_metrics {
self.report_metrics(Update::Ignore);
let old_entries = self
.storage
.get_log_entries(start..=prev_log_id.index)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;

let last_matched = old_entries.iter().rev().find(|entry| entry.log_id.term == prev_log_id.term);

let first = old_entries.first().map(|x| x.log_id).unwrap();

let last_matched = last_matched.map(|x| x.log_id).unwrap_or_else(|| first);
Ok(last_matched)
}

#[tracing::instrument(level="debug", skip(self, entries), fields(entries=%entries.summary()))]
async fn append_apply_log_entries(
&mut self,
entries: &[Entry<D>],
commit_index: u64,
) -> RaftResult<AppendEntriesResponse> {
if !entries.is_empty() {
self.append_log_entries(entries).await?;
}

self.commit_index = commit_index;

self.replicate_to_state_machine_if_needed().await?;

self.report_metrics(Update::Ignore);

Ok(AppendEntriesResponse {
term: self.current_term,
success: true,
Expand Down
18 changes: 18 additions & 0 deletions async-raft/src/raft.rs
Expand Up @@ -520,6 +520,24 @@ impl<D: AppData> Entry<D> {
}
}

impl<D: AppData> MessageSummary for Entry<D> {
fn summary(&self) -> String {
format!("{}:{}", self.log_id, self.payload.summary())
}
}

impl<D: AppData> MessageSummary for &[Entry<D>] {
fn summary(&self) -> String {
let mut res = Vec::with_capacity(self.len());
for x in self.iter() {
let e = format!("{}:{}", x.log_id, x.payload.summary());
res.push(e);
}

res.join(",")
}
}

/// Log entry payload variants.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum EntryPayload<D: AppData> {
Expand Down

0 comments on commit 9540c90

Please sign in to comment.