Skip to content

Commit

Permalink
feat: add RawEntryReader and OneshotWalEntryReader trait (#4027)
Browse files Browse the repository at this point in the history
* feat: add `RawEntryReader` and `OneShotWalEntryReader` trait

* chore: rename `OneShot` to `Oneshot`

* refacotr: remove `region_id` from `OneshotWalEntryReader`
  • Loading branch information
WenyXu committed May 24, 2024
1 parent 0101657 commit 466f7c6
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 12 deletions.
17 changes: 13 additions & 4 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ pub(crate) mod util;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::Namespace;

use crate::error::Error;
use store_api::storage::RegionId;

/// Kafka Namespace implementation.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -56,7 +55,13 @@ pub struct EntryImpl {
}

impl Entry for EntryImpl {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: self.data,
}
}

fn data(&self) -> &[u8] {
&self.data
Expand All @@ -66,6 +71,10 @@ impl Entry for EntryImpl {
self.id
}

fn region_id(&self) -> RegionId {
RegionId::from_u64(self.ns.region_id)
}

fn estimated_size(&self) -> usize {
size_of::<Self>() + self.data.capacity() * size_of::<u8>() + self.ns.topic.capacity()
}
Expand Down
15 changes: 13 additions & 2 deletions src/log-store/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

use common_wal::options::WalOptions;
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
use store_api::storage::RegionId;

use crate::error::{Error, Result};

Expand All @@ -36,7 +37,13 @@ impl Namespace for NamespaceImpl {
}

impl Entry for EntryImpl {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: vec![],
}
}

fn data(&self) -> &[u8] {
&[]
Expand All @@ -46,6 +53,10 @@ impl Entry for EntryImpl {
0
}

fn region_id(&self) -> RegionId {
RegionId::from_u64(0)
}

fn estimated_size(&self) -> usize {
0
}
Expand Down
16 changes: 13 additions & 3 deletions src/log-store/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use std::hash::{Hash, Hasher};
use std::mem::size_of;

use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::storage::RegionId;

use crate::error::Error;
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};

mod backend;
Expand Down Expand Up @@ -67,7 +67,13 @@ impl Namespace for NamespaceImpl {
}

impl Entry for EntryImpl {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: self.data,
}
}

fn data(&self) -> &[u8] {
self.data.as_slice()
Expand All @@ -77,6 +83,10 @@ impl Entry for EntryImpl {
self.id
}

fn region_id(&self) -> RegionId {
RegionId::from_u64(self.id)
}

fn estimated_size(&self) -> usize {
self.data.len() + size_of::<u64>() + size_of::<u64>()
}
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

//! Write ahead log of the engine.

/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod raw_entry_reader;
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod wal_entry_reader;

use std::collections::HashMap;
use std::mem;
use std::sync::Arc;
Expand Down
44 changes: 44 additions & 0 deletions src/mito2/src/wal/raw_entry_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::stream::BoxStream;
use store_api::logstore::entry::RawEntry;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::wal::EntryId;

/// A stream that yields [RawEntry].
pub type RawEntryStream<'a> = BoxStream<'a, Result<RawEntry>>;

// The namespace of kafka log store
pub struct KafkaNamespace<'a> {
topic: &'a str,
}

// The namespace of raft engine log store
pub struct RaftEngineNamespace {
region_id: RegionId,
}

/// The namespace of [RawEntryReader].
pub(crate) enum LogStoreNamespace<'a> {
RaftEngine(RaftEngineNamespace),
Kafka(KafkaNamespace<'a>),
}

/// [RawEntryReader] provides the ability to read [RawEntry] from the underlying [LogStore].
pub(crate) trait RawEntryReader: Send + Sync {
fn read(&self, ctx: LogStoreNamespace, start_id: EntryId) -> Result<RawEntryStream<'static>>;
}
24 changes: 24 additions & 0 deletions src/mito2/src/wal/wal_entry_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use store_api::storage::RegionId;

use crate::error::Result;
use crate::wal::raw_entry_reader::LogStoreNamespace;
use crate::wal::{EntryId, WalEntryStream};

/// [OneshotWalEntryReader] provides the ability to read and decode entries from the underlying store.
pub(crate) trait OneshotWalEntryReader: Send + Sync {
fn read(self, ctx: LogStoreNamespace, start_id: EntryId) -> Result<WalEntryStream>;
}
15 changes: 13 additions & 2 deletions src/store-api/src/logstore/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_error::ext::ErrorExt;
use crate::storage::RegionId;

/// An entry's id.
/// Different log store implementations may interpret the id to different meanings.
pub type Id = u64;

/// The raw Wal entry.
pub struct RawEntry {
pub region_id: RegionId,
pub entry_id: Id,
pub data: Vec<u8>,
}

/// Entry is the minimal data storage unit through which users interact with the log store.
/// The log store implementation may have larger or smaller data storage unit than an entry.
pub trait Entry: Send + Sync {
type Error: ErrorExt + Send + Sync;
/// Consumes [Entry] and converts to [RawEntry].
fn into_raw_entry(self) -> RawEntry;

/// Returns the contained data of the entry.
fn data(&self) -> &[u8];
Expand All @@ -30,6 +38,9 @@ pub trait Entry: Send + Sync {
/// Usually the namespace id is identical with the region id.
fn id(&self) -> Id;

/// Returns the [RegionId]
fn region_id(&self) -> RegionId;

/// Computes the estimated encoded size.
fn estimated_size(&self) -> usize;
}
14 changes: 13 additions & 1 deletion src/store-api/src/logstore/entry_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ mod tests {

use super::*;
pub use crate::logstore::entry::Id;
use crate::logstore::entry::RawEntry;
use crate::storage::RegionId;

pub struct SimpleEntry {
/// Binary data of current entry
Expand All @@ -64,7 +66,13 @@ mod tests {
}

impl Entry for SimpleEntry {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: RegionId::from_u64(0),
entry_id: 0,
data: vec![],
}
}

fn data(&self) -> &[u8] {
&self.data
Expand All @@ -74,6 +82,10 @@ mod tests {
0u64
}

fn region_id(&self) -> RegionId {
RegionId::from_u64(0)
}

fn estimated_size(&self) -> usize {
self.data.len()
}
Expand Down

0 comments on commit 466f7c6

Please sign in to comment.