Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl Logstore::read by LogFile::create_stream #124

Merged
merged 2 commits into from Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/log-store/src/fs/file.rs
Expand Up @@ -274,10 +274,10 @@ impl LogFile {
pub async fn replay(&mut self) -> Result<(usize, Id)> {
let log_name = self.name.to_string();
let previous_offset = self.flush_offset.load(Ordering::Relaxed);
let ns = LocalNamespace::default();
let mut stream = self.create_stream(
// TODO(hl): LocalNamespace should be filled
LocalNamespace::default(),
0,
&ns, 0,
);

let mut last_offset = 0usize;
Expand Down Expand Up @@ -313,7 +313,11 @@ impl LogFile {
/// ### Notice
/// If the entry with start entry id is not present, the first generated entry will start with
/// the first entry with an id greater than `start_entry_id`.
pub fn create_stream(&self, _ns: impl Namespace, start_entry_id: u64) -> impl EntryStream + '_ {
pub fn create_stream(
&self,
_ns: &impl Namespace,
evenyag marked this conversation as resolved.
Show resolved Hide resolved
start_entry_id: u64,
) -> impl EntryStream<Entry = EntryImpl, Error = Error> + '_ {
let length = self.flush_offset.load(Ordering::Relaxed);

let s = stream!({
Expand Down
49 changes: 29 additions & 20 deletions src/log-store/src/fs/log.rs
Expand Up @@ -3,9 +3,13 @@ use std::path::Path;
use std::sync::Arc;

use arc_swap::ArcSwap;
use async_stream::stream;
use common_telemetry::{error, info, warn};
use futures::pin_mut;
use futures::StreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::entry::{Encode, Id};
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::LogStore;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -167,7 +171,7 @@ impl LogStore for LocalFileLogStore {

async fn append(
&self,
_ns: Self::Namespace,
_ns: &Self::Namespace,
mut entry: Self::Entry,
) -> Result<Self::AppendResponse> {
// TODO(hl): configurable retry times
Expand Down Expand Up @@ -208,11 +212,24 @@ impl LogStore for LocalFileLogStore {

async fn read(
&self,
_ns: Self::Namespace,
_id: Id,
) -> Result<store_api::logstore::entry_stream::SendableEntryStream<'_, Self::Entry, Self::Error>>
{
todo!()
ns: Self::Namespace,
id: Id,
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
let files = self.files.read().await;

let s = stream!({
for (start_id, file) in files.iter() {
if *start_id >= id {
let s = file.create_stream(&ns, *start_id);
pin_mut!(s);
while let Some(entries) = s.next().await {
yield entries;
}
}
}
});

Ok(Box::pin(s))
}

async fn create_namespace(&mut self, _ns: Self::Namespace) -> Result<()> {
Expand Down Expand Up @@ -248,13 +265,11 @@ mod tests {
};

let logstore = LocalFileLogStore::open(&config).await.unwrap();
let ns = LocalNamespace::default();
assert_eq!(
0,
logstore
.append(
LocalNamespace::default(),
EntryImpl::new(generate_data(100)),
)
.append(&ns, EntryImpl::new(generate_data(100)),)
.await
.unwrap()
.entry_id
Expand All @@ -263,10 +278,7 @@ mod tests {
assert_eq!(
1,
logstore
.append(
LocalNamespace::default(),
EntryImpl::new(generate_data(100)),
)
.append(&ns, EntryImpl::new(generate_data(100)),)
.await
.unwrap()
.entry_id
Expand Down Expand Up @@ -296,18 +308,15 @@ mod tests {
log_file_dir: dir.path().to_str().unwrap().to_string(),
};
let logstore = LocalFileLogStore::open(&config).await.unwrap();
let ns = LocalNamespace::default();
let id = logstore
.append(
LocalNamespace::default(),
EntryImpl::new(generate_data(100)),
)
.append(&ns, EntryImpl::new(generate_data(100)))
.await
.unwrap()
.entry_id;
assert_eq!(0, id);

let active_file = logstore.active_file();
let stream = active_file.create_stream(LocalNamespace::default(), 0);
let stream = logstore.read(ns, 0).await.unwrap();
tokio::pin!(stream);

let entries = stream.next().await.unwrap().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/fs/noop.rs
Expand Up @@ -17,7 +17,7 @@ impl LogStore for NoopLogStore {

async fn append(
&self,
_ns: Self::Namespace,
_ns: &Self::Namespace,
mut _e: Self::Entry,
) -> Result<Self::AppendResponse> {
Ok(AppendResponseImpl {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/wal.rs
Expand Up @@ -127,7 +127,7 @@ impl<S: LogStore> Wal<S> {

let res = self
.store
.append(ns, e)
.append(&ns, e)
.await
.map_err(BoxedError::new)
.context(error::WriteWalSnafu { name: self.name() })?;
Expand Down
4 changes: 2 additions & 2 deletions src/store-api/src/logstore.rs
Expand Up @@ -21,7 +21,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
/// Append an `Entry` to WAL with given namespace
async fn append(
&self,
ns: Self::Namespace,
ns: &Self::Namespace,
mut e: Self::Entry,
) -> Result<Self::AppendResponse, Self::Error>;

Expand All @@ -32,7 +32,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
e: Vec<Self::Entry>,
) -> Result<Id, Self::Error>;

// Create a new `EntryStream` to asynchronously generates `Entry`.
// Create a new `EntryStream` to asynchronously generates `Entry` with ids starting from `id`.
async fn read(
&self,
ns: Self::Namespace,
Expand Down