Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cache #1560

Merged
merged 9 commits into from Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 19 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -12,9 +12,9 @@ members = [
"common/http_protocol",
"common/error_code",
"common/error_code/error_code_macro",
"common/lru_cache",
"common/limiter_bucket",
"common/memory_pool",
"common/cache",
"query_server/spi",
"query_server/query",
"query_server/test",
Expand Down Expand Up @@ -72,6 +72,7 @@ futures = { version = "0.3", default-features = false }
integer-encoding = "4.0.0"
lazy_static = "1.4"
libc = { version = "0.2", default-features = false }
lru = "0.11.1"
md-5 = "0.10"
minivec = "0.4.0"
mio = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion client/src/exec.rs
Expand Up @@ -180,7 +180,7 @@ fn parse_use_database(sql: &str) -> Option<String> {

pub fn is_system_table_db(db: &str) -> bool {
let db = db.to_ascii_lowercase();
db.eq("cluster_schema") || db.eq("information_schema")
db.eq("cluster_schema") || db.eq("information_schema") || db.eq("usage_schema")
}

pub async fn connect_database(database: &str, ctx: &mut SessionContext) -> Result<()> {
Expand Down
21 changes: 21 additions & 0 deletions common/cache/Cargo.toml
@@ -0,0 +1,21 @@
[package]
name = "cache"
edition.workspace = true
version.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
parking_lot = { workspace = true }
lru = {workspace = true}
tokio = { workspace = true , features = ["sync"]}
futures = { workspace = true, features = ["std"] }
async-trait = {workspace = true }
rand = { workspace = true }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio"] }

[[bench]]
harness = false
name = "lru"
73 changes: 73 additions & 0 deletions common/cache/benches/lru.rs
@@ -0,0 +1,73 @@
#[macro_use]
extern crate criterion;

use std::sync::Arc;

use cache::{AsyncCache, ShardedAsyncCache, ShardedSyncCache, SyncCache};
use rand::random;

use crate::criterion::Criterion;

fn criterion_benchmark(c: &mut Criterion) {
let lru = Arc::new(ShardedSyncCache::<i8, Arc<i8>>::create_lru_sharded_cache(
128,
));
let mut group = c.benchmark_group("bench cache");
group.bench_function("bench random insert", |b| {
b.iter(|| {
lru.insert(random(), Arc::new(random()));
})
});
group.bench_function("bench random get", |b| {
b.iter(|| {
lru.get(&random());
})
});
group.bench_function("bench get / insert", |b| {
b.iter(|| {
let k = random();
let _ = match lru.get(&k) {
Some(v) => v,
None => {
let v = Arc::new(k);
lru.insert(k, v.clone());
v
}
};
})
});
group.finish();

let rt = Arc::new(tokio::runtime::Runtime::new().unwrap());
let lru = Arc::new(ShardedAsyncCache::<i8, Arc<i8>>::create_lru_sharded_cache(
128,
));
let mut group = c.benchmark_group("bench async cache");
group.bench_function("bench random insert", |b| {
b.to_async(rt.as_ref()).iter(|| async {
lru.insert(random(), Arc::new(random())).await;
})
});
group.bench_function("bench random get", |b| {
b.to_async(rt.as_ref()).iter(|| async {
lru.get(&random()).await;
})
});
group.bench_function("bench get / insert", |b| {
b.to_async(rt.as_ref()).iter(|| async {
let k = random();
let _ = match lru.get(&k).await {
Some(v) => v,
None => {
let v = Arc::new(k);
lru.insert(k, v.clone()).await;
v.clone()
}
};
})
});
group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
75 changes: 75 additions & 0 deletions common/cache/src/async_cache.rs
@@ -0,0 +1,75 @@
use std::fmt::Debug;
use std::hash::Hash;
use std::num::NonZeroUsize;

use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::Cache;

#[async_trait]
pub trait AsyncCache: Debug {
type K;
type V: Clone;
async fn insert(&self, key: Self::K, value: Self::V) -> Option<Self::V>;
async fn get(&self, key: &Self::K) -> Option<Self::V>;
async fn remove(&self, key: &Self::K) -> Option<Self::V>;
async fn pop(&self) -> Option<(Self::K, Self::V)>;
async fn set_capacity(&self, capacity: NonZeroUsize);
async fn get_capacity(&self) -> usize;
async fn get_usage(&self) -> usize;
async fn clear(&self);
}

#[derive(Debug)]
pub struct AsyncCacheWrap<K, V> {
cache: Mutex<Box<dyn Cache<K = K, V = V>>>,
}
impl<K, V> AsyncCacheWrap<K, V> {
pub fn new(cache: impl Cache<K = K, V = V> + 'static) -> AsyncCacheWrap<K, V> {
Self {
cache: Mutex::new(Box::new(cache)),
}
}
}

#[async_trait]
impl<K, V> AsyncCache for AsyncCacheWrap<K, V>
where
K: Debug + Hash + Send + Sync,
V: Debug + Clone + Send + Sync,
{
type K = K;
type V = V;

async fn insert(&self, key: Self::K, value: Self::V) -> Option<Self::V> {
self.cache.lock().await.insert(key, value)
}

async fn get(&self, key: &Self::K) -> Option<Self::V> {
self.cache.lock().await.get(key)
}

async fn remove(&self, key: &Self::K) -> Option<Self::V> {
self.cache.lock().await.remove(key)
}

async fn pop(&self) -> Option<(Self::K, Self::V)> {
self.cache.lock().await.pop()
}

async fn set_capacity(&self, capacity: NonZeroUsize) {
self.cache.lock().await.set_capacity(capacity)
}

async fn get_capacity(&self) -> usize {
self.cache.lock().await.get_capacity()
}

async fn get_usage(&self) -> usize {
self.cache.lock().await.get_usage()
}
async fn clear(&self) {
self.cache.lock().await.clear()
}
}
73 changes: 73 additions & 0 deletions common/cache/src/lib.rs
@@ -0,0 +1,73 @@
mod async_cache;
mod lru_cache;
mod sharded_async_cache;
mod sharded_sync_cache;
mod sync_cache;

use std::collections::hash_map::DefaultHasher;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;

pub use crate::async_cache::*;
pub use crate::lru_cache::*;
pub use crate::sharded_async_cache::*;
pub use crate::sharded_sync_cache::*;
pub use crate::sync_cache::*;

pub type AfterRemovedFnMut<K, V> = Box<dyn FnMut(&K, &mut V) + Send + Sync>;

pub(crate) const NUM_SHARD_BITS: usize = 4;
pub(crate) const NUM_SHARDS: usize = 1 << NUM_SHARD_BITS;

pub(crate) fn per_shard(capacity: usize, shard_len: usize) -> NonZeroUsize {
// FIXME: Cannot set a precise capacity freely (such as 1000, will be 63 * 16)
let per_shard = (capacity + (shard_len - 1)) / shard_len;
NonZeroUsize::new(per_shard).unwrap_or(unsafe { NonZeroUsize::new_unchecked(1) })
}

pub(crate) fn shard<K: Hash>(key: &K, shard_len: usize) -> usize {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
let res = hasher.finish() as usize % shard_len;
debug_assert!(res <= shard_len);
res
}

pub trait Cache: Debug + Sync + Send {
type K;
type V: Clone;
/// Insert a key-value pair,
/// If the key already exists in the cache,
/// then it updates the key's value and returns the old value.
/// Otherwise, None is returned.
fn insert(&mut self, key: Self::K, value: Self::V) -> Option<Self::V>;

fn get(&mut self, key: &Self::K) -> Option<Self::V>;
fn remove(&mut self, key: &Self::K) -> Option<Self::V>;
/// Removes and returns the key and value corresponding to the least recently
/// used item or `None` if the cache is empty.
///
/// # Example
///
/// ```
/// use cache::{Cache, LruWrap};
/// use std::num::NonZeroUsize;
/// let mut cache = LruWrap::new(NonZeroUsize::new(2).unwrap());
///
/// cache.insert(2, "a");
/// cache.insert(3, "b");
/// cache.insert(4, "c");
/// cache.get(&3);
///
/// assert_eq!(cache.pop(), Some((4, "c")));
/// assert_eq!(cache.pop(), Some((3, "b")));
/// assert_eq!(cache.pop(), None);
/// assert_eq!(cache.get_usage(), 0);
/// ```
fn pop(&mut self) -> Option<(Self::K, Self::V)>;
fn set_capacity(&mut self, capacity: NonZeroUsize);
fn get_capacity(&self) -> usize;
fn get_usage(&self) -> usize;
fn clear(&mut self);
}