Skip to content

Commit

Permalink
test: add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 24, 2024
1 parent 6f35f89 commit 92bf20a
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 0 deletions.
176 changes: 176 additions & 0 deletions src/mito2/src/wal/raw_entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ pub struct RaftEngineNamespace {
region_id: RegionId,
}

impl RaftEngineNamespace {
pub fn new(region_id: RegionId) -> Self {
Self { region_id }
}
}

/// The namespace of [RawEntryReader].
pub(crate) enum LogStoreNamespace<'a> {
RaftEngine(RaftEngineNamespace),
Expand Down Expand Up @@ -150,6 +156,16 @@ pub struct RawEntryReaderFilter<R, F> {
filter: F,
}

impl<R, F> RawEntryReaderFilter<R, F>
where
R: RawEntryReader,
F: Fn(&RawEntry) -> bool + Sync + Send,
{
pub fn new(reader: R, filter: F) -> Self {
Self { reader, filter }
}
}

impl<R, F> RawEntryReader for RawEntryReaderFilter<R, F>
where
R: RawEntryReader,
Expand All @@ -174,3 +190,163 @@ where
Ok(Box::pin(stream))
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_wal::options::WalOptions;
use futures::stream;
use store_api::logstore::entry::{Entry, RawEntry};
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Namespace;
use store_api::logstore::{
AppendBatchResponse, AppendResponse, EntryId, LogStore, NamespaceId,
};
use store_api::storage::RegionId;

use super::*;
use crate::error;

#[derive(Debug)]
struct MockLogStore {
entires: Vec<RawEntry>,

Check warning on line 213 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".
}

#[derive(Debug, Eq, PartialEq, Clone, Copy, Default, Hash)]
struct MockNamespace;

impl Namespace for MockNamespace {
fn id(&self) -> NamespaceId {
0
}
}

#[async_trait::async_trait]
impl LogStore for MockLogStore {
type Entry = RawEntry;
type Error = error::Error;
type Namespace = MockNamespace;

async fn stop(&self) -> Result<(), Self::Error> {
unreachable!()
}

async fn append(&self, entry: Self::Entry) -> Result<AppendResponse, Self::Error> {
unreachable!()
}

async fn append_batch(
&self,
entries: Vec<Self::Entry>,
) -> Result<AppendBatchResponse, Self::Error> {
unreachable!()
}

async fn read(
&self,
ns: &Self::Namespace,
id: EntryId,
) -> Result<SendableEntryStream<Self::Entry, Self::Error>, Self::Error> {
Ok(Box::pin(stream::iter(vec![Ok(self.entires.clone())])))

Check warning on line 251 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".
}

async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> {
unreachable!()
}

async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> {
unreachable!()
}

async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error> {
unreachable!()
}

async fn obsolete(
&self,
ns: Self::Namespace,
entry_id: EntryId,
) -> Result<(), Self::Error> {
unreachable!()
}

fn entry(&self, data: &mut Vec<u8>, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry {
unreachable!()
}

fn namespace(&self, _ns_id: NamespaceId, _wal_options: &WalOptions) -> Self::Namespace {
MockNamespace
}
}

#[tokio::test]
async fn test_raw_entry_reader() {
let expected_entires = vec![RawEntry {

Check warning on line 285 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".
region_id: RegionId::new(1024, 1),
entry_id: 1,
data: vec![],
}];
let store = MockLogStore {
entires: expected_entires.clone(),

Check warning on line 291 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".

Check warning on line 291 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".
};

let reader = LogStoreRawEntryReader::new(Arc::new(store));
let entires = reader

Check warning on line 295 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".
.read(
LogStoreNamespace::RaftEngine(RaftEngineNamespace::new(RegionId::new(1024, 1))),
0,
)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(expected_entires, entires);

Check warning on line 304 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".

Check warning on line 304 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".
}

#[tokio::test]
async fn test_raw_entry_reader_filter() {
let all_entires = vec![

Check warning on line 309 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".
RawEntry {
region_id: RegionId::new(1024, 1),
entry_id: 1,
data: vec![1],
},
RawEntry {
region_id: RegionId::new(1024, 2),
entry_id: 2,
data: vec![2],
},
RawEntry {
region_id: RegionId::new(1024, 3),
entry_id: 3,
data: vec![3],
},
];
let store = MockLogStore {
entires: all_entires.clone(),

Check warning on line 327 in src/mito2/src/wal/raw_entry_reader.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"entires" should be "entries".
};

let expected_region_id = RegionId::new(1024, 3);
let reader =
RawEntryReaderFilter::new(LogStoreRawEntryReader::new(Arc::new(store)), |entry| {
entry.region_id == expected_region_id
});
let entires = reader
.read(
LogStoreNamespace::RaftEngine(RaftEngineNamespace::new(RegionId::new(1024, 1))),
0,
)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
all_entires
.into_iter()
.filter(|entry| entry.region_id == expected_region_id)
.collect::<Vec<_>>(),
entires
);
}
}
23 changes: 23 additions & 0 deletions src/store-api/src/logstore/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,35 @@ use crate::storage::RegionId;
pub type Id = u64;

/// The raw Wal entry.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RawEntry {
pub region_id: RegionId,
pub entry_id: Id,
pub data: Vec<u8>,
}

impl Entry for RawEntry {
fn into_raw_entry(self) -> RawEntry {
self
}

fn data(&self) -> &[u8] {
&self.data
}

fn id(&self) -> Id {
self.entry_id
}

fn region_id(&self) -> RegionId {
self.region_id
}

fn estimated_size(&self) -> usize {
std::mem::size_of_val(self)
}
}

/// Entry is the minimal data storage unit through which users interact with the log store.
/// The log store implementation may have larger or smaller data storage unit than an entry.
pub trait Entry: Send + Sync {
Expand Down

0 comments on commit 92bf20a

Please sign in to comment.