Skip to content

Commit

Permalink
SEX
Browse files Browse the repository at this point in the history
  • Loading branch information
namse committed Jun 25, 2024
1 parent 5677056 commit 7c6f4f3
Show file tree
Hide file tree
Showing 56 changed files with 3,326 additions and 1,668 deletions.
2 changes: 1 addition & 1 deletion luda-editor/new-server/database/database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ edition = "2021"
anyhow = "1.0.86"
aws-sdk-s3 = "1.34.0"
quick_cache = "0.5.1"
rkyv = { version = "0.7.44", features = ["validation"] }
rusqlite = { path = "../rusqlite-0.31.0", features = ["backup", "bundled"] }
tokio = { version = "1.38.0", features = ["rt", "time"] }
migration = { path = "../schema/migration" }
document = { path = "../schema/document" }
rkyv = { version = "0.7.44", features = ["validation"] }
26 changes: 22 additions & 4 deletions luda-editor/new-server/database/database/src/kv_store/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::KvStore;
use anyhow::Result;
use crate::Result;
use quick_cache::sync::Cache;
use std::{
sync::{atomic::AtomicBool, Arc},
Expand Down Expand Up @@ -115,14 +115,32 @@ impl<Store: KvStore + Clone> KvStore for InMemoryCachedKsStore<Store> {
ttl: Option<Duration>,
) -> Result<()> {
self.store.create(key.as_ref(), value_fn, ttl).await?;
if !self.enabled() {
return Ok(());
}
self.cache.remove(key.as_ref());

Ok(())
}

async fn transact(
&self,
transact_items: impl IntoIterator<Item = crate::TransactItem>,
) -> Result<()> {
let transact_items = transact_items.into_iter().collect::<Vec<_>>();
let keys = transact_items
.iter()
.map(|item| match item {
crate::TransactItem::Put { key, .. } => key.clone(),
crate::TransactItem::Create { key, .. } => key.clone(),
})
.collect::<Vec<_>>();

self.store.transact(transact_items).await?;
for key in keys {
self.cache.remove(&key);
}

Ok(())
}

// async fn update<T, Fut>(
// &self,
// key: impl AsRef<str>,
Expand Down
5 changes: 4 additions & 1 deletion luda-editor/new-server/database/database/src/kv_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ mod in_memory;
mod sqlite;

use crate::*;
use anyhow::Result;
pub(crate) use in_memory::*;
pub(crate) use sqlite::*;
use std::time::{Duration, SystemTime};
Expand Down Expand Up @@ -38,4 +37,8 @@ pub trait KvStore {
value_fn: impl FnOnce() -> Result<Bytes>,
ttl: Option<Duration>,
) -> Result<()>;
async fn transact(
&self,
transact_items: impl IntoIterator<Item = crate::TransactItem>,
) -> Result<()>;
}
159 changes: 106 additions & 53 deletions luda-editor/new-server/database/database/src/kv_store/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::KvStore;
use anyhow::Result;
use crate::Result;
use aws_sdk_s3::primitives::ByteStream;
use rusqlite::{Connection, OpenFlags, OptionalExtension};
use rusqlite::{Connection, OpenFlags, OptionalExtension, Transaction};
use std::{
sync::{atomic::AtomicPtr, Arc, Mutex, MutexGuard},
time::{Duration, SystemTime},
Expand All @@ -16,7 +16,7 @@ const DB_PATH: &str = "db.sqlite";
const BACKUP_PATH: &str = "db.sqlite.backup";

impl SqliteKvStore {
pub async fn new(s3_client: aws_sdk_s3::Client, bucket_name: String) -> Result<Self> {
pub async fn new(s3_client: aws_sdk_s3::Client, bucket_name: String) -> anyhow::Result<Self> {
try_fetch_db_file_from_s3(&s3_client, &bucket_name).await?;

let conn = Connection::open(DB_PATH).unwrap();
Expand Down Expand Up @@ -148,27 +148,18 @@ impl KvStore for SqliteKvStore {
value: &impl AsRef<[u8]>,
ttl: Option<Duration>,
) -> Result<()> {
let write_conn = self.write_conn();
let mut stmt = write_conn.prepare(
"
INSERT OR REPLACE INTO kv_store
(key, value, version, expired_at)
VALUES (?, ?, 0, ?)",
)?;

assert_eq!(
stmt.execute((key.as_ref(), value.as_ref(), ttl_to_expired_at(ttl)))?,
1
);

let mut write_conn = self.write_conn();
let trx = write_conn.transaction()?;
put(&trx, key, value, ttl)?;
trx.commit()?;
Ok(())
}

async fn delete(&self, key: impl AsRef<str>) -> Result<()> {
let write_conn = self.write_conn();
let mut stmt = write_conn.prepare("DELETE FROM kv_store WHERE key = ?")?;
stmt.execute([key.as_ref()])?;

let mut write_conn = self.write_conn();
let trx = write_conn.transaction()?;
delete(&trx, key)?;
trx.commit()?;
Ok(())
}

Expand All @@ -178,36 +169,31 @@ impl KvStore for SqliteKvStore {
value_fn: impl FnOnce() -> Result<Bytes>,
ttl: Option<Duration>,
) -> Result<()> {
let mut write_conn = self.write_conn();
let tx = write_conn.transaction()?;

let mut stmt = tx.prepare(
"
SELECT expired_at
FROM kv_store
WHERE key = ?",
)?;
let expired_at: Option<u64> = stmt
.query_row([key.as_ref()], |row| row.get(0))
.optional()?;
if let Some(expired_at) = expired_at {
if expired_at == 0 || now_epoch_time_secs() < expired_at {
return Ok(());
}
}
let mut write_conn: MutexGuard<Connection> = self.write_conn();
let trx = write_conn.transaction()?;
create(&trx, key, value_fn, ttl)?;
trx.commit()?;
Ok(())
}

let value = value_fn()?;
let mut stmt = tx.prepare(
"
INSERT OR REPLACE INTO kv_store
(key, value, version, expired_at)
VALUES (?, ?, 0, ?)",
)?;
assert_eq!(
stmt.execute((key.as_ref(), value.as_ref(), ttl_to_expired_at(ttl)))?,
1
);
async fn transact(
&self,
transact_items: impl IntoIterator<Item = crate::TransactItem>,
) -> Result<()> {
let mut write_conn: MutexGuard<Connection> = self.write_conn();
let trx = write_conn.transaction()?;

for item in transact_items.into_iter() {
match item {
crate::TransactItem::Put { key, value, ttl } => {
put(&trx, key, &value, ttl)?;
}
crate::TransactItem::Create { key, value, ttl } => {
create(&trx, key, || Ok(value), ttl)?;
}
}
}
trx.commit()?;
Ok(())
}

Expand Down Expand Up @@ -276,7 +262,7 @@ impl KvStore for SqliteKvStore {
async fn try_fetch_db_file_from_s3(
s3_client: &aws_sdk_s3::Client,
bucket_name: &str,
) -> Result<()> {
) -> anyhow::Result<()> {
if std::fs::metadata(DB_PATH).is_ok() {
return Ok(());
}
Expand All @@ -298,7 +284,7 @@ async fn backup(
sqlite3: &AtomicPtr<rusqlite::ffi::sqlite3>,
s3_client: &aws_sdk_s3::Client,
bucket_name: &str,
) -> Result<()> {
) -> anyhow::Result<()> {
println!("Start Backup db.sqlite");
let now = std::time::SystemTime::now();

Expand All @@ -319,7 +305,10 @@ async fn backup(
Ok(())
}

async fn save_db_backup_to_s3(s3_client: &aws_sdk_s3::Client, bucket_name: &str) -> Result<()> {
async fn save_db_backup_to_s3(
s3_client: &aws_sdk_s3::Client,
bucket_name: &str,
) -> anyhow::Result<()> {
// TODO: multipart
s3_client
.put_object()
Expand All @@ -332,7 +321,7 @@ async fn save_db_backup_to_s3(s3_client: &aws_sdk_s3::Client, bucket_name: &str)
Ok(())
}

async fn migrate() -> Result<()> {
async fn migrate() -> anyhow::Result<()> {
let pramga_conn = Connection::open(DB_PATH)?;
let current_version = {
let result = pramga_conn.query_row("PRAGMA kv_store.user_version", [], |row| {
Expand All @@ -352,7 +341,7 @@ async fn migrate() -> Result<()> {
fn map<From: document::Document, To: document::Document>(
&self,
mut f: impl FnMut(From) -> To,
) -> Result<()> {
) -> anyhow::Result<()> {
let read_conn =
Connection::open_with_flags(DB_PATH, OpenFlags::SQLITE_OPEN_READ_ONLY).unwrap();
let mut write_conn = Connection::open(DB_PATH)?;
Expand Down Expand Up @@ -406,3 +395,67 @@ fn now_epoch_time_secs() -> u64 {
.unwrap()
.as_secs()
}

fn put(
trx: &Transaction<'_>,
key: impl AsRef<str>,
value: &impl AsRef<[u8]>,
ttl: Option<Duration>,
) -> Result<()> {
let mut stmt = trx.prepare(
"
INSERT OR REPLACE INTO kv_store
(key, value, version, expired_at)
VALUES (?, ?, 0, ?)",
)?;

assert_eq!(
stmt.execute((key.as_ref(), value.as_ref(), ttl_to_expired_at(ttl)))?,
1
);

Ok(())
}

fn delete(trx: &Transaction<'_>, key: impl AsRef<str>) -> Result<()> {
let mut stmt = trx.prepare("DELETE FROM kv_store WHERE key = ?")?;
stmt.execute([key.as_ref()])?;

Ok(())
}

fn create<Bytes: AsRef<[u8]>>(
trx: &Transaction<'_>,
key: impl AsRef<str>,
value_fn: impl FnOnce() -> Result<Bytes>,
ttl: Option<Duration>,
) -> Result<()> {
let mut stmt = trx.prepare(
"
SELECT expired_at
FROM kv_store
WHERE key = ?",
)?;
let expired_at: Option<u64> = stmt
.query_row([key.as_ref()], |row| row.get(0))
.optional()?;
if let Some(expired_at) = expired_at {
if expired_at == 0 || now_epoch_time_secs() < expired_at {
return Ok(());
}
}

let value = value_fn()?;
let mut stmt = trx.prepare(
"
INSERT OR REPLACE INTO kv_store
(key, value, version, expired_at)
VALUES (?, ?, 0, ?)",
)?;
assert_eq!(
stmt.execute((key.as_ref(), value.as_ref(), ttl_to_expired_at(ttl)))?,
1
);

Ok(())
}
54 changes: 48 additions & 6 deletions luda-editor/new-server/database/database/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
mod heap_archived;
mod kv_store;
mod value_buffer;

use anyhow::Result;
pub use heap_archived::*;
use document::*;
pub use kv_store::KvStore;
pub use value_buffer::*;
pub use migration::schema;

pub async fn init(
s3_client: aws_sdk_s3::Client,
bucket_name: String,
turn_on_in_memory_cache: bool,
) -> Result<Database> {
) -> anyhow::Result<Database> {
let sqlite = kv_store::SqliteKvStore::new(s3_client, bucket_name).await?;
let store = kv_store::InMemoryCachedKsStore::new(sqlite, turn_on_in_memory_cache);

Expand All @@ -26,7 +23,45 @@ impl Database {
pub fn set_memory_cache(&self, turn_on: bool) {
self.store.set_cache_enabled(turn_on)
}
pub async fn get<T: Document>(
&self,
document_get: impl DocumentGet<Output = T>,
) -> Result<Option<HeapArchived<T>>> {
let key = document_get.key();
let value_buffer = self.store.get(key).await?;
Ok(value_buffer.map(|value_buffer| T::heap_archived(value_buffer)))
}

pub async fn transact(&self, transact: impl Transact) -> Result<()> {
let transact_items = transact.try_into_transact_items()?;
KvStore::transact(self, transact_items).await
}
}

#[derive(Debug)]
pub enum Error {
SqliteError(rusqlite::Error),
SerializationError(SerErr),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for Error {}

impl From<rusqlite::Error> for Error {
fn from(e: rusqlite::Error) -> Self {
Error::SqliteError(e)
}
}
impl From<SerErr> for Error {
fn from(e: SerErr) -> Self {
Error::SerializationError(e)
}
}

pub type Result<T> = std::result::Result<T, Error>;

impl KvStore for Database {
async fn get(&self, key: impl AsRef<str>) -> Result<Option<ValueBuffer>> {
Expand Down Expand Up @@ -61,4 +96,11 @@ impl KvStore for Database {
) -> Result<()> {
self.store.create(key, value_fn, ttl).await
}

async fn transact(
&self,
transact_items: impl IntoIterator<Item = crate::TransactItem>,
) -> Result<()> {
self.store.transact(transact_items).await
}
}
Loading

0 comments on commit 7c6f4f3

Please sign in to comment.