Skip to content

Commit

Permalink
feat: impl Logstore::read by LogFile::create_stream (#124)
Browse files Browse the repository at this point in the history
* feat: bridge LogStore::read to LogFile::create_stream

* fix some CR comments
  • Loading branch information
v0y4g3r committed Aug 2, 2022
1 parent 96b4ed0 commit 868098d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 27 deletions.
10 changes: 7 additions & 3 deletions src/log-store/src/fs/file.rs
Expand Up @@ -274,10 +274,10 @@ impl LogFile {
pub async fn replay(&mut self) -> Result<(usize, Id)> {
let log_name = self.name.to_string();
let previous_offset = self.flush_offset.load(Ordering::Relaxed);
let ns = LocalNamespace::default();
let mut stream = self.create_stream(
// TODO(hl): LocalNamespace should be filled
LocalNamespace::default(),
0,
&ns, 0,
);

let mut last_offset = 0usize;
Expand Down Expand Up @@ -313,7 +313,11 @@ impl LogFile {
/// ### Notice
/// If the entry with start entry id is not present, the first generated entry will start with
/// the first entry with an id greater than `start_entry_id`.
pub fn create_stream(&self, _ns: impl Namespace, start_entry_id: u64) -> impl EntryStream + '_ {
pub fn create_stream(
&self,
_ns: &impl Namespace,
start_entry_id: u64,
) -> impl EntryStream<Entry = EntryImpl, Error = Error> + '_ {
let length = self.flush_offset.load(Ordering::Relaxed);

let s = stream!({
Expand Down
49 changes: 29 additions & 20 deletions src/log-store/src/fs/log.rs
Expand Up @@ -3,9 +3,13 @@ use std::path::Path;
use std::sync::Arc;

use arc_swap::ArcSwap;
use async_stream::stream;
use common_telemetry::{error, info, warn};
use futures::pin_mut;
use futures::StreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::entry::{Encode, Id};
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::LogStore;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -167,7 +171,7 @@ impl LogStore for LocalFileLogStore {

async fn append(
&self,
_ns: Self::Namespace,
_ns: &Self::Namespace,
mut entry: Self::Entry,
) -> Result<Self::AppendResponse> {
// TODO(hl): configurable retry times
Expand Down Expand Up @@ -208,11 +212,24 @@ impl LogStore for LocalFileLogStore {

async fn read(
&self,
_ns: Self::Namespace,
_id: Id,
) -> Result<store_api::logstore::entry_stream::SendableEntryStream<'_, Self::Entry, Self::Error>>
{
todo!()
ns: Self::Namespace,
id: Id,
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
let files = self.files.read().await;

let s = stream!({
for (start_id, file) in files.iter() {
if *start_id >= id {
let s = file.create_stream(&ns, *start_id);
pin_mut!(s);
while let Some(entries) = s.next().await {
yield entries;
}
}
}
});

Ok(Box::pin(s))
}

async fn create_namespace(&mut self, _ns: Self::Namespace) -> Result<()> {
Expand Down Expand Up @@ -248,13 +265,11 @@ mod tests {
};

let logstore = LocalFileLogStore::open(&config).await.unwrap();
let ns = LocalNamespace::default();
assert_eq!(
0,
logstore
.append(
LocalNamespace::default(),
EntryImpl::new(generate_data(100)),
)
.append(&ns, EntryImpl::new(generate_data(100)),)
.await
.unwrap()
.entry_id
Expand All @@ -263,10 +278,7 @@ mod tests {
assert_eq!(
1,
logstore
.append(
LocalNamespace::default(),
EntryImpl::new(generate_data(100)),
)
.append(&ns, EntryImpl::new(generate_data(100)),)
.await
.unwrap()
.entry_id
Expand Down Expand Up @@ -296,18 +308,15 @@ mod tests {
log_file_dir: dir.path().to_str().unwrap().to_string(),
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
let ns = LocalNamespace::default();
let id = logstore
.append(
LocalNamespace::default(),
EntryImpl::new(generate_data(100)),
)
.append(&ns, EntryImpl::new(generate_data(100)))
.await
.unwrap()
.entry_id;
assert_eq!(0, id);

let active_file = logstore.active_file();
let stream = active_file.create_stream(LocalNamespace::default(), 0);
let stream = logstore.read(ns, 0).await.unwrap();
tokio::pin!(stream);

let entries = stream.next().await.unwrap().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/fs/noop.rs
Expand Up @@ -17,7 +17,7 @@ impl LogStore for NoopLogStore {

async fn append(
&self,
_ns: Self::Namespace,
_ns: &Self::Namespace,
mut _e: Self::Entry,
) -> Result<Self::AppendResponse> {
Ok(AppendResponseImpl {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/wal.rs
Expand Up @@ -127,7 +127,7 @@ impl<S: LogStore> Wal<S> {

let res = self
.store
.append(ns, e)
.append(&ns, e)
.await
.map_err(BoxedError::new)
.context(error::WriteWalSnafu { name: self.name() })?;
Expand Down
4 changes: 2 additions & 2 deletions src/store-api/src/logstore.rs
Expand Up @@ -21,7 +21,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
/// Append an `Entry` to WAL with given namespace
async fn append(
&self,
ns: Self::Namespace,
ns: &Self::Namespace,
mut e: Self::Entry,
) -> Result<Self::AppendResponse, Self::Error>;

Expand All @@ -32,7 +32,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
e: Vec<Self::Entry>,
) -> Result<Id, Self::Error>;

// Create a new `EntryStream` to asynchronously generates `Entry`.
// Create a new `EntryStream` to asynchronously generates `Entry` with ids starting from `id`.
async fn read(
&self,
ns: Self::Namespace,
Expand Down

0 comments on commit 868098d

Please sign in to comment.