Skip to content

Commit

Permalink
feat(experimental): introduce new storage engine implement (#456)
Browse files Browse the repository at this point in the history
* feat(experimental): introduce new storage abstraction and implement

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* feat: introduce direct fs config builder

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* test: add unit test for direct fs device

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* test: add unit test for async batch pipeline

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: introduce Countdown and fix permits initialization

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* chore: remove prints for debug

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* feat: add remove & delete, optimize out updated entry, fix updated judge

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refactor: rename updated to outdated

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* feat: impl admission picker

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: fix reclaim index removal

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* test: add more tests

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* chore: update ci

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: try fix ci

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

---------

Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed May 8, 2024
1 parent f23f12a commit a638f7a
Show file tree
Hide file tree
Showing 30 changed files with 3,029 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/template/template.yml
Expand Up @@ -143,7 +143,7 @@ jobs:
run: |
cargo llvm-cov report --lcov --output-path lcov.info
- uses: codecov/codecov-action@v4
if: matrix.os == 'ubuntu-latest'
if: matrix.os == 'ubuntu-latest' && matrix.rust_toolchain == 'stable'
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Expand Up @@ -149,7 +149,7 @@ jobs:
run: |
cargo llvm-cov report --lcov --output-path lcov.info
- uses: codecov/codecov-action@v4
if: matrix.os == 'ubuntu-latest'
if: matrix.os == 'ubuntu-latest' && matrix.rust_toolchain == 'stable'
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pull-request.yml
Expand Up @@ -148,7 +148,7 @@ jobs:
run: |
cargo llvm-cov report --lcov --output-path lcov.info
- uses: codecov/codecov-action@v4
if: matrix.os == 'ubuntu-latest'
if: matrix.os == 'ubuntu-latest' && matrix.rust_toolchain == 'stable'
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
Expand Down
1 change: 1 addition & 0 deletions foyer-common/Cargo.toml
Expand Up @@ -24,4 +24,5 @@ serde = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
futures = "0.3"
rand = "0.8.5"
167 changes: 167 additions & 0 deletions foyer-common/src/async_batch_pipeline.rs
@@ -0,0 +1,167 @@
// Copyright 2024 Foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{future::Future, sync::Arc};

use parking_lot::Mutex;
use tokio::task::JoinHandle;

#[derive(Debug)]
pub struct AsyncBatchPipeline<T, R> {
inner: Arc<Mutex<AsyncBatchPipelineInner<T, R>>>,
}

impl<T, R> Clone for AsyncBatchPipeline<T, R> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}

#[derive(Debug)]
struct AsyncBatchPipelineInner<T, R> {
state: T,
has_leader: bool,
handle: Option<JoinHandle<R>>,
}

pub struct LeaderToken<T, R> {
batch: AsyncBatchPipeline<T, R>,
handle: Option<JoinHandle<R>>,
}

impl<T, R> AsyncBatchPipeline<T, R> {
pub fn new(state: T) -> Self {
Self {
inner: Arc::new(Mutex::new(AsyncBatchPipelineInner {
state,
has_leader: false,
handle: None,
})),
}
}

pub fn accumulate<F>(&self, f: F) -> Option<LeaderToken<T, R>>
where
F: FnOnce(&mut T),
{
let mut inner = self.inner.lock();

let token = if !inner.has_leader {
inner.has_leader = true;
Some(LeaderToken {
batch: self.clone(),
handle: inner.handle.take(),
})
} else {
None
};

f(&mut inner.state);

token
}

pub fn wait(&self) -> Option<JoinHandle<R>> {
self.inner.lock().handle.take()
}
}

impl<T, R> LeaderToken<T, R> {
/// Pipeline execute futures.
///
/// `new_state`
/// - Receives the reference of the old state and returns the new state.
///
/// `f`
/// - Receives the owned old state and returns a future.
/// - The future will be polled after handling the previous result.
/// - The future is guaranteed to be execute one by one in order.
///
/// `fr`
/// - Handle the previous result.
pub fn pipeline<FR, F, FU, NS>(mut self, new_state: NS, fr: FR, f: F)
where
R: Send + 'static,
FR: FnOnce(R) + Send + 'static,
F: FnOnce(T) -> FU,
FU: Future<Output = R> + Send + 'static,
NS: FnOnce(&T) -> T,
{
let handle = self.handle.take();

let mut inner = self.batch.inner.lock();

let mut state = new_state(&inner.state);
std::mem::swap(&mut inner.state, &mut state);

let future = f(state);
let handle = tokio::spawn(async move {
if let Some(handle) = handle {
fr(handle.await.unwrap());
}
future.await
});

inner.handle = Some(handle);
inner.has_leader = false;
}
}

#[cfg(test)]
mod tests {

use futures::future::join_all;
use itertools::Itertools;

use super::*;

#[tokio::test]
async fn test_async_batch_pipeline() {
let batch: AsyncBatchPipeline<Vec<u64>, Vec<u64>> = AsyncBatchPipeline::new(vec![]);
let res = join_all((0..100).map(|i| {
let batch = batch.clone();
async move { batch.accumulate(|state| state.push(i)) }
}))
.await;

let mut res = res.into_iter().flatten().collect_vec();
assert_eq!(res.len(), 1);
let token = res.remove(0);
token.pipeline(|_| vec![], |_| unreachable!(), |state| async move { state });

let res = join_all((100..200).map(|i| {
let batch = batch.clone();
async move { batch.accumulate(|state| state.push(i)) }
}))
.await;

let mut res = res.into_iter().flatten().collect_vec();
assert_eq!(res.len(), 1);
let token = res.remove(0);
token.pipeline(
|_| vec![],
|mut res| {
res.sort();
assert_eq!(res, (0..100).collect_vec());
},
|state| async move { state },
);

let mut res = batch.wait().unwrap().await.unwrap();
res.sort();
assert_eq!(res, (100..200).collect_vec());
}
}
31 changes: 31 additions & 0 deletions foyer-common/src/asyncify.rs
@@ -0,0 +1,31 @@
// Copyright 2024 Foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(not(madsim))]
pub async fn asyncify<F, T>(f: F) -> T
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
tokio::task::spawn_blocking(f).await.unwrap()
}

#[cfg(madsim)]
pub async fn asyncify<F, T>(f: F) -> T
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
f()
}
80 changes: 80 additions & 0 deletions foyer-common/src/countdown.rs
@@ -0,0 +1,80 @@
// Copyright 2024 Foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};

#[derive(Debug)]
pub struct Countdown {
finish: AtomicBool,
counter: AtomicIsize,
}

impl Countdown {
/// Countdown `counter` times.
///
/// # Safety
///
/// Panics if `counter` exceeds [`isize::MAX`].
pub fn new(counter: usize) -> Self {
Self {
finish: AtomicBool::new(false),
counter: AtomicIsize::new(isize::try_from(counter).expect("`counter` must NOT exceed `isize::MAX`.")),
}
}

/// Returns `false` for the first `counter` times, then always returns `true`.
pub fn countdown(&self) -> bool {
if self.finish.load(Ordering::Relaxed) {
return true;
}
self.counter.fetch_sub(1, Ordering::Relaxed) <= 0
}

/// Reset [`Countdown`] with `counter`.
pub fn reset(&self, counter: usize) {
self.finish.store(false, Ordering::Relaxed);
self.counter.store(
isize::try_from(counter).expect("`counter` must NOT exceed `isize::MAX`."),
Ordering::Relaxed,
);
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use futures::future::join_all;

use super::*;

async fn case(counter: usize, concurrency: usize) {
let cd = Countdown::new(counter);
let res = join_all((0..concurrency).map(|_| async {
tokio::time::sleep(Duration::from_millis(10)).await;
cd.countdown()
}))
.await;
assert_eq!(counter, res.into_iter().filter(|b| !b).count());
}

#[tokio::test]
async fn test_countdown() {
for counter in [1, 4, 8, 16] {
for concurrency in [16, 32, 64, 128] {
case(counter, concurrency).await;
}
}
}
}
3 changes: 3 additions & 0 deletions foyer-common/src/lib.rs
Expand Up @@ -15,12 +15,15 @@
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

pub mod arc_key_hash_map;
pub mod async_batch_pipeline;
pub mod async_queue;
pub mod asyncify;
pub mod batch;
pub mod bits;
pub mod buf;
pub mod code;
pub mod continuum;
pub mod countdown;
pub mod erwlock;
pub mod object_pool;
pub mod range;
Expand Down
28 changes: 28 additions & 0 deletions foyer-memory/src/cache.rs
Expand Up @@ -70,6 +70,7 @@ pub type S3FifoCacheEntry<K, V, L = DefaultCacheEventListener<K, V>, S = RandomS
pub type S3FifoEntry<K, V, ER, L = DefaultCacheEventListener<K, V>, S = RandomState> =
GenericEntry<K, V, S3Fifo<(Arc<K>, Arc<V>)>, ArcKeyHashMapIndexer<K, S3FifoHandle<(Arc<K>, Arc<V>)>>, L, S, ER>;

#[derive(Debug)]
pub enum CacheEntry<K, V, L, S = RandomState>
where
K: Key,
Expand Down Expand Up @@ -174,6 +175,15 @@ where
L: CacheEventListener<K, V>,
S: BuildHasher + Send + Sync + 'static,
{
pub fn hash(&self) -> u64 {
match self {
CacheEntry::Fifo(entry) => entry.hash(),
CacheEntry::Lru(entry) => entry.hash(),
CacheEntry::Lfu(entry) => entry.hash(),
CacheEntry::S3Fifo(entry) => entry.hash(),
}
}

pub fn key(&self) -> &K {
match self {
CacheEntry::Fifo(entry) => entry.key(),
Expand Down Expand Up @@ -218,6 +228,15 @@ where
CacheEntry::S3Fifo(entry) => entry.refs(),
}
}

pub fn is_outdated(&self) -> bool {
match self {
CacheEntry::Fifo(entry) => entry.is_outdated(),
CacheEntry::Lru(entry) => entry.is_outdated(),
CacheEntry::Lfu(entry) => entry.is_outdated(),
CacheEntry::S3Fifo(entry) => entry.is_outdated(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -590,6 +609,15 @@ where
Cache::S3Fifo(cache) => cache.metrics(),
}
}

pub fn hash_builder(&self) -> &S {
match self {
Cache::Fifo(cache) => cache.hash_builder(),
Cache::Lru(cache) => cache.hash_builder(),
Cache::Lfu(cache) => cache.hash_builder(),
Cache::S3Fifo(cache) => cache.hash_builder(),
}
}
}

pub enum Entry<K, V, ER, L = DefaultCacheEventListener<K, V>, S = RandomState>
Expand Down

0 comments on commit a638f7a

Please sign in to comment.