Skip to content

Commit

Permalink
Change: move default implemented method from trait RaftLogReader to…
Browse files Browse the repository at this point in the history
… `StorageHelper`

Function `get_log_entries()` and `try_get_log_entry()` are provided by
trait `RaftLogReader` with default implementations. However, they do not
need to be part of this trait and an application does not have to
implement them.

Therefore in this patch they are moved to `StorageHelper` struct, which
provides additional storage access methods that are built based on the
`RaftStorage` trait.
  • Loading branch information
drmingdrmer committed Feb 21, 2023
1 parent 7838890 commit 55217aa
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 57 deletions.
3 changes: 2 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use crate::RaftStorage;
use crate::RaftTypeConfig;
use crate::SnapshotId;
use crate::StorageError;
use crate::StorageHelper;
use crate::Update;
use crate::Vote;

Expand Down Expand Up @@ -705,7 +706,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
return Ok(());
}

let entries = self.storage.get_log_entries(since..end).await?;
let entries = StorageHelper::new(&mut self.storage).get_log_entries(since..end).await?;
tracing::debug!(entries=%entries.as_slice().summary(), "about to apply");

let entry_refs = entries.iter().collect::<Vec<_>>();
Expand Down
11 changes: 10 additions & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,16 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
let logs = if start == end {
vec![]
} else {
self.log_reader.get_log_entries(start..end).await?
let logs = self.log_reader.try_get_log_entries(start..end).await?;
debug_assert_eq!(
logs.len(),
(end - start) as usize,
"expect logs {}..{} but got only {} entries",
start,
end,
logs.len()
);
logs
};

// Build the heartbeat frame to be sent to the follower.
Expand Down
29 changes: 28 additions & 1 deletion openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::RangeBounds;
use std::sync::Arc;

use crate::defensive::check_range_matches_entries;
use crate::engine::LogIdList;
use crate::EffectiveMembership;
use crate::Entry;
use crate::EntryPayload;
use crate::LogId;
use crate::LogIdOptionExt;
Expand Down Expand Up @@ -85,7 +89,7 @@ where
return Ok(st.last_purged_log_id.unwrap());
}

let entries = self.sto.get_log_entries(log_index..=log_index).await?;
let entries = self.get_log_entries(log_index..=log_index).await?;

Ok(entries[0].log_id)
}
Expand Down Expand Up @@ -170,4 +174,27 @@ where

Ok(res)
}

/// Try to get an log entry.
///
/// It does not return an error if the log entry at `log_index` is not found.
pub async fn try_get_log_entry(&mut self, log_index: u64) -> Result<Option<Entry<C>>, StorageError<C::NodeId>> {
let mut res = self.sto.try_get_log_entries(log_index..(log_index + 1)).await?;
Ok(res.pop())
}

/// Get a series of log entries from storage.
///
/// Similar to `try_get_log_entries` except an error will be returned if there is an entry not
/// found in the specified range.
pub async fn get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&mut self,
range: RB,
) -> Result<Vec<Entry<C>>, StorageError<C::NodeId>> {
let res = self.sto.try_get_log_entries(range.clone()).await?;

check_range_matches_entries(range, &res)?;

Ok(res)
}
}
24 changes: 0 additions & 24 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tokio::io::AsyncRead;
use tokio::io::AsyncSeek;
use tokio::io::AsyncWrite;

use crate::defensive::check_range_matches_entries;
use crate::membership::EffectiveMembership;
use crate::node::Node;
use crate::raft_types::SnapshotId;
Expand Down Expand Up @@ -116,29 +115,6 @@ pub struct LogState<C: RaftTypeConfig> {
pub trait RaftLogReader<C>: Send + Sync + 'static
where C: RaftTypeConfig
{
/// Get a series of log entries from storage.
///
/// Similar to `try_get_log_entries` except an error will be returned if there is an entry not
/// found in the specified range.
async fn get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&mut self,
range: RB,
) -> Result<Vec<Entry<C>>, StorageError<C::NodeId>> {
let res = self.try_get_log_entries(range.clone()).await?;

check_range_matches_entries(range, &res)?;

Ok(res)
}

/// Try to get an log entry.
///
/// It does not return an error if the log entry at `log_index` is not found.
async fn try_get_log_entry(&mut self, log_index: u64) -> Result<Option<Entry<C>>, StorageError<C::NodeId>> {
let mut res = self.try_get_log_entries(log_index..(log_index + 1)).await?;
Ok(res.pop())
}

/// Returns the last deleted log id and the last log id.
///
/// The impl should not consider the applied log id in state machine.
Expand Down
33 changes: 18 additions & 15 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,13 +699,13 @@ where

tracing::info!("--- get start == stop");
{
let logs = store.get_log_entries(3..3).await?;
let logs = StorageHelper::new(&mut store).get_log_entries(3..3).await?;
assert_eq!(logs.len(), 0, "expected no logs to be returned");
}

tracing::info!("--- get start < stop");
{
let logs = store.get_log_entries(5..7).await?;
let logs = StorageHelper::new(&mut store).get_log_entries(5..7).await?;

assert_eq!(logs.len(), 2);
assert_eq!(logs[0].log_id, LogId::new(CommittedLeaderId::new(1, NODE_ID.into()), 5));
Expand All @@ -720,23 +720,26 @@ where

store.purge_logs_upto(LogId::new(CommittedLeaderId::new(0, C::NodeId::default()), 0)).await?;

let ent = store.try_get_log_entry(3).await?;
let mut sh = StorageHelper::new(&mut store);

let ent = sh.try_get_log_entry(3).await?;
assert_eq!(
Some(LogId::new(CommittedLeaderId::new(1, NODE_ID.into()), 3)),
ent.map(|x| x.log_id)
);

let ent = store.try_get_log_entry(0).await?;
let ent = sh.try_get_log_entry(0).await?;
assert_eq!(None, ent.map(|x| x.log_id));

let ent = store.try_get_log_entry(11).await?;
let ent = sh.try_get_log_entry(11).await?;
assert_eq!(None, ent.map(|x| x.log_id));

Ok(())
}

pub async fn initial_logs(mut store: S) -> Result<(), StorageError<C::NodeId>> {
let ent = store.try_get_log_entry(0).await?;
let mut sh = StorageHelper::new(&mut store);
let ent = sh.try_get_log_entry(0).await?;
assert!(ent.is_none(), "store initialized");

Ok(())
Expand Down Expand Up @@ -1390,22 +1393,22 @@ where

store.purge_logs_upto(LogId::new(CommittedLeaderId::new(0, C::NodeId::default()), 0)).await?;

store.get_log_entries(..).await?;
store.get_log_entries(5..).await?;
store.get_log_entries(..5).await?;
store.get_log_entries(5..7).await?;
StorageHelper::new(&mut store).get_log_entries(..).await?;
StorageHelper::new(&mut store).get_log_entries(5..).await?;
StorageHelper::new(&mut store).get_log_entries(..5).await?;
StorageHelper::new(&mut store).get_log_entries(5..7).await?;

// mismatched bound.

let res = store.get_log_entries(11..).await;
let res = StorageHelper::new(&mut store).get_log_entries(11..).await;
let e = res.unwrap_err().into_defensive().unwrap();
assert!(matches!(e, DefensiveError {
subject: ErrorSubject::LogIndex(11),
violation: Violation::LogIndexNotFound { want: 11, got: None },
..
}));

let res = store.get_log_entries(1..1).await;
let res = StorageHelper::new(&mut store).get_log_entries(1..1).await;
let e = res.unwrap_err().into_defensive().unwrap();
assert!(matches!(e, DefensiveError {
subject: ErrorSubject::Logs,
Expand All @@ -1416,23 +1419,23 @@ where
..
}));

let res = store.get_log_entries(0..1).await;
let res = StorageHelper::new(&mut store).get_log_entries(0..1).await;
let e = res.unwrap_err().into_defensive().unwrap();
assert!(matches!(e, DefensiveError {
subject: ErrorSubject::LogIndex(0),
violation: Violation::LogIndexNotFound { want: 0, got: None },
..
}));

let res = store.get_log_entries(0..2).await;
let res = StorageHelper::new(&mut store).get_log_entries(0..2).await;
let e = res.unwrap_err().into_defensive().unwrap();
assert!(matches!(e, DefensiveError {
subject: ErrorSubject::LogIndex(0),
violation: Violation::LogIndexNotFound { want: 0, got: Some(1) },
..
}));

let res = store.get_log_entries(10..12).await;
let res = StorageHelper::new(&mut store).get_log_entries(10..12).await;
let e = res.unwrap_err().into_defensive().unwrap();
assert!(matches!(e, DefensiveError {
subject: ErrorSubject::LogIndex(11),
Expand Down
3 changes: 2 additions & 1 deletion openraft/tests/append_entries/t20_append_conflicts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use openraft::MessageSummary;
use openraft::RaftStorage;
use openraft::RaftTypeConfig;
use openraft::ServerState;
use openraft::StorageHelper;
use openraft::Vote;

use crate::fixtures::blank;
Expand Down Expand Up @@ -222,7 +223,7 @@ where
C: RaftTypeConfig,
Sto: RaftStorage<C>,
{
let logs = sto.get_log_entries(..).await?;
let logs = StorageHelper::new(sto).get_log_entries(..).await?;
let skip = 0;
let want: Vec<Entry<memstore::Config>> = terms
.iter()
Expand Down
4 changes: 2 additions & 2 deletions openraft/tests/append_entries/t30_append_inconsistent_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use openraft::Config;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::RaftLogReader;
use openraft::RaftStorage;
use openraft::ServerState;
use openraft::StorageHelper;
use openraft::Vote;

use crate::fixtures::init_default_ut_tracing;
Expand Down Expand Up @@ -119,7 +119,7 @@ async fn append_inconsistent_log() -> Result<()> {
.log_at_least(Some(log_index), "sync log to node 0")
.await?;

let logs = sto0.get_log_entries(60..=60).await?;
let logs = StorageHelper::new(&mut sto0).get_log_entries(60..=60).await?;
assert_eq!(
3,
logs.first().unwrap().log_id.leader_id.term,
Expand Down
4 changes: 2 additions & 2 deletions openraft/tests/life_cycle/t20_initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use openraft::EffectiveMembership;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::Membership;
use openraft::RaftLogReader;
use openraft::RaftStorage;
use openraft::ServerState;
use openraft::StorageHelper;
use openraft::Vote;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -113,7 +113,7 @@ async fn initialization() -> anyhow::Result<()> {

for i in [0, 1, 2] {
let mut sto = router.get_storage_handle(&1)?;
let first = sto.get_log_entries(0..2).await?.first().cloned();
let first = StorageHelper::new(&mut sto).get_log_entries(0..2).await?.first().cloned();

tracing::info!("--- check membership is replicated: id: {}, first log: {:?}", i, first);
let mem = match first.unwrap().payload {
Expand Down
4 changes: 2 additions & 2 deletions openraft/tests/log_compaction/t10_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::Membership;
use openraft::RaftLogReader;
use openraft::RaftNetwork;
use openraft::RaftNetworkFactory;
use openraft::RaftStorage;
use openraft::ServerState;
use openraft::SnapshotPolicy;
use openraft::StorageHelper;
use openraft::Vote;

use crate::fixtures::blank;
Expand Down Expand Up @@ -115,7 +115,7 @@ async fn compaction() -> Result<()> {
tracing::info!("--- logs should be deleted after installing snapshot; left only the last one");
{
let mut sto = router.get_storage_handle(&1)?;
let logs = sto.get_log_entries(..).await?;
let logs = StorageHelper::new(&mut sto).get_log_entries(..).await?;
assert_eq!(2, logs.len());
assert_eq!(LogId::new(CommittedLeaderId::new(1, 0), log_index - 1), logs[0].log_id)
}
Expand Down
3 changes: 1 addition & 2 deletions openraft/tests/membership/t10_add_learner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use openraft::CommittedLeaderId;
use openraft::Config;
use openraft::LogId;
use openraft::Membership;
use openraft::RaftLogReader;
use openraft::StorageHelper;
use tokio::time::sleep;

Expand Down Expand Up @@ -68,7 +67,7 @@ async fn add_learner_basic() -> Result<()> {
{
let mut sto1 = router.get_storage_handle(&1)?;

let logs = sto1.get_log_entries(..).await?;
let logs = StorageHelper::new(&mut sto1).get_log_entries(..).await?;

assert_eq!(log_index, logs[logs.len() - 1].log_id.index);
// 0-th log
Expand Down
4 changes: 2 additions & 2 deletions openraft/tests/membership/t20_change_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use openraft::error::ChangeMembershipError;
use openraft::error::ClientWriteError;
use openraft::Config;
use openraft::LogIdOptionExt;
use openraft::RaftLogReader;
use openraft::ServerState;
use openraft::StorageHelper;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;
Expand Down Expand Up @@ -87,7 +87,7 @@ async fn change_with_new_learner_blocking() -> anyhow::Result<()> {

for node_id in 0..2 {
let mut sto = router.get_storage_handle(&node_id)?;
let logs = sto.get_log_entries(..).await?;
let logs = StorageHelper::new(&mut sto).get_log_entries(..).await?;
assert_eq!(log_index, logs[logs.len() - 1].log_id.index, "node: {}", node_id);
// 0-th log
assert_eq!(log_index + 1, logs.len() as u64, "node: {}", node_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn new_leader_auto_commit_uniform_config() -> Result<()> {
// )
// .await?;
//
// let final_log = sto.get_log_entries(want..=want).await?[0].clone();
// let final_log = StorageHelper::new(&mut sto).get_log_entries(want..=want).await?[0].clone();
//
// let m = match final_log.payload {
// EntryPayload::Membership(ref m) => m.membership.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use openraft::CommittedLeaderId;
use openraft::Config;
use openraft::LogId;
use openraft::Membership;
use openraft::RaftLogReader;
use openraft::SnapshotPolicy;
use openraft::StorageHelper;

Expand Down Expand Up @@ -73,7 +72,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
.await?;

{
let logs = sto0.get_log_entries(..).await?;
let logs = StorageHelper::new(&mut sto0).get_log_entries(..).await?;
assert_eq!(3, logs.len(), "only one applied log is kept");
}
let m = StorageHelper::new(&mut sto0).get_membership().await?;
Expand Down Expand Up @@ -109,7 +108,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
tracing::info!("--- check membership");
{
{
let logs = sto0.get_log_entries(..).await?;
let logs = StorageHelper::new(&mut sto0).get_log_entries(..).await?;
assert_eq!(3, logs.len(), "only one applied log");
}
let m = StorageHelper::new(&mut sto0).get_membership().await?;
Expand Down

0 comments on commit 55217aa

Please sign in to comment.