diff --git a/Cargo.lock b/Cargo.lock index d0cd5e2..ffae73a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9205,6 +9205,7 @@ dependencies = [ "aes-gcm", "arrow", "async-std", + "async-trait", "dashmap", "duckdb", "futures", diff --git a/tinycloud-core/Cargo.toml b/tinycloud-core/Cargo.toml index 7d128cc..e20f76d 100644 --- a/tinycloud-core/Cargo.toml +++ b/tinycloud-core/Cargo.toml @@ -16,6 +16,7 @@ async-std = ["sea-orm/runtime-async-std-rustls"] [dependencies] dashmap = "5.5" +async-trait.workspace = true sea-orm = { version = "1.1", default-features = false, features = ["macros", "with-time", "with-json"] } sea-orm-migration = { version = "1.1", default-features = false } futures.workspace = true diff --git a/tinycloud-core/src/database_artifacts.rs b/tinycloud-core/src/database_artifacts.rs new file mode 100644 index 0000000..5f00018 --- /dev/null +++ b/tinycloud-core/src/database_artifacts.rs @@ -0,0 +1,147 @@ +use async_trait::async_trait; +use sea_orm::{ActiveModelTrait, ActiveValue::Set, DatabaseConnection, DbErr, EntityTrait}; +use time::{format_description::well_known::Rfc3339, OffsetDateTime}; + +use crate::{hash::hash, models::database_artifact}; + +#[derive(Debug, Clone)] +pub struct DatabaseArtifact { + pub payload: Vec, + pub content_hash: String, + pub revision: i64, + pub size_bytes: i64, + pub updated_at: String, + pub backend: String, + pub storage_mode: String, +} + +#[derive(Debug, thiserror::Error)] +pub enum DatabaseArtifactError { + #[error("database artifact storage error: {0}")] + Db(#[from] DbErr), + #[error("database artifact payload too large: {0} bytes")] + PayloadTooLarge(u64), + #[error("database artifact backend error: {0}")] + Backend(String), +} + +#[async_trait] +pub trait DatabaseArtifactRepository: Send + Sync { + async fn load( + &self, + service: &str, + space: &str, + name: &str, + ) -> Result, DatabaseArtifactError>; + + async fn save( + &self, + service: &str, + space: &str, + name: &str, + payload: Vec, + ) -> Result; +} + +#[derive(Clone)] +pub struct SeaOrmDatabaseArtifactRepository { + conn: DatabaseConnection, +} + +impl SeaOrmDatabaseArtifactRepository { + pub fn new(conn: DatabaseConnection) -> Self { + Self { conn } + } +} + +#[async_trait] +impl DatabaseArtifactRepository for SeaOrmDatabaseArtifactRepository { + async fn load( + &self, + service: &str, + space: &str, + name: &str, + ) -> Result, DatabaseArtifactError> { + database_artifact::Entity::find_by_id(( + service.to_string(), + space.to_string(), + name.to_string(), + )) + .one(&self.conn) + .await + .map(|row| { + row.map(|model| DatabaseArtifact { + payload: model.payload, + content_hash: model.content_hash, + revision: model.revision, + size_bytes: model.size_bytes, + updated_at: model.updated_at, + backend: model.backend, + storage_mode: model.storage_mode, + }) + }) + .map_err(DatabaseArtifactError::Db) + } + + async fn save( + &self, + service: &str, + space: &str, + name: &str, + payload: Vec, + ) -> Result { + let size_bytes = i64::try_from(payload.len()) + .map_err(|_| DatabaseArtifactError::PayloadTooLarge(payload.len() as u64))?; + let content_hash = hash(&payload).to_cid(0x55).to_string(); + let now = OffsetDateTime::now_utc() + .format(&Rfc3339) + .expect("current timestamps should format as RFC3339"); + + let existing = database_artifact::Entity::find_by_id(( + service.to_string(), + space.to_string(), + name.to_string(), + )) + .one(&self.conn) + .await?; + + let revision = existing + .as_ref() + .map(|model| model.revision + 1) + .unwrap_or(1); + let created_at = existing + .as_ref() + .map(|model| model.created_at.clone()) + .unwrap_or_else(|| now.clone()); + + let active = database_artifact::ActiveModel { + service: Set(service.to_string()), + space: Set(space.to_string()), + name: Set(name.to_string()), + revision: Set(revision), + content_hash: Set(content_hash.clone()), + payload: Set(payload.clone()), + size_bytes: Set(size_bytes), + backend: Set("storage.database".to_string()), + storage_mode: Set("database-blob".to_string()), + created_at: Set(created_at), + updated_at: Set(now.clone()), + }; + + let model = if existing.is_some() { + active.update(&self.conn).await? + } else { + active.insert(&self.conn).await? + }; + + Ok(DatabaseArtifact { + payload, + content_hash, + revision: model.revision, + size_bytes: model.size_bytes, + updated_at: model.updated_at, + backend: model.backend, + storage_mode: model.storage_mode, + }) + } +} diff --git a/tinycloud-core/src/duckdb/database.rs b/tinycloud-core/src/duckdb/database.rs index 14cca37..ec182ce 100644 --- a/tinycloud-core/src/duckdb/database.rs +++ b/tinycloud-core/src/duckdb/database.rs @@ -177,7 +177,8 @@ fn handle_export( ) -> Result, DuckDbError> { match mode { StorageMode::File(_) => { - // File-backed: read the file directly + conn.execute_batch("CHECKPOINT;") + .map_err(|e| DuckDbError::Internal(e.to_string()))?; std::fs::read(file_path).map_err(|e| DuckDbError::Internal(e.to_string())) } StorageMode::InMemory => { diff --git a/tinycloud-core/src/duckdb/service.rs b/tinycloud-core/src/duckdb/service.rs index f9b1809..e560f74 100644 --- a/tinycloud-core/src/duckdb/service.rs +++ b/tinycloud-core/src/duckdb/service.rs @@ -1,8 +1,13 @@ -use std::sync::Arc; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; use dashmap::DashMap; use tinycloud_auth::resource::SpaceId; +use crate::database_artifacts::{DatabaseArtifactError, DatabaseArtifactRepository}; + use super::{ caveats::DuckDbCaveats, database::{spawn_actor, DatabaseHandle}, @@ -16,6 +21,7 @@ pub struct DuckDbService { memory_threshold: u64, idle_timeout_secs: u64, max_memory_per_connection: String, + artifact_repository: Arc, } fn validate_db_name(name: &str) -> Result<(), DuckDbError> { @@ -38,6 +44,7 @@ impl DuckDbService { memory_threshold: u64, idle_timeout_secs: u64, max_memory_per_connection: String, + artifact_repository: Arc, ) -> Self { Self { databases: Arc::new(DashMap::new()), @@ -45,6 +52,7 @@ impl DuckDbService { memory_threshold, idle_timeout_secs, max_memory_per_connection, + artifact_repository, } } @@ -60,25 +68,31 @@ impl DuckDbService { validate_db_name(db_name)?; let key = (space.to_string(), db_name.to_string()); - let handle = self - .databases - .entry(key) - .or_insert_with(|| { - spawn_actor( - space.to_string(), - db_name.to_string(), - self.base_path.clone(), - self.memory_threshold, - self.idle_timeout_secs, - self.max_memory_per_connection.clone(), - self.databases.clone(), - ) - }) - .clone(); + let handle = self.handle(space, db_name).await?; - handle + let result = handle .execute(request, caveats, ability, arrow_format) - .await + .await?; + + if !result.write_targets.is_empty() { + let payload = match handle.export().await { + Ok(payload) => payload, + Err(e) => { + let _ = self.discard_local_state(&key).await; + return Err(e); + } + }; + if let Err(e) = self + .artifact_repository + .save("duckdb", &space.to_string(), db_name, payload) + .await + { + let _ = self.discard_local_state(&key).await; + return Err(artifact_error_to_duckdb(e)); + } + } + + Ok(result) } pub async fn export(&self, space: &SpaceId, db_name: &str) -> Result, DuckDbError> { @@ -91,18 +105,19 @@ impl DuckDbService { return handle.export().await; } - // No live actor — try reading the file directly (cold database) - let path = std::path::PathBuf::from(&self.base_path) - .join(space.to_string()) - .join(format!("{}.duckdb", db_name)); - - if !tokio::fs::try_exists(&path).await.unwrap_or(false) { - return Err(DuckDbError::DatabaseNotFound); - } - - tokio::fs::read(&path) + match self + .artifact_repository + .load("duckdb", &space.to_string(), db_name) .await - .map_err(|e| DuckDbError::Internal(e.to_string())) + .map_err(artifact_error_to_duckdb)? + { + Some(artifact) => { + remove_duckdb_cache_files(&self.cache_path(space, db_name)).await?; + write_cache_file(&self.cache_path(space, db_name), &artifact.payload).await?; + Ok(artifact.payload) + } + None => Err(DuckDbError::DatabaseNotFound), + } } pub async fn import_db( @@ -156,6 +171,15 @@ impl DuckDbService { let key = (space.to_string(), db_name.to_string()); self.databases.remove(&key); + if let Err(e) = self + .artifact_repository + .save("duckdb", &space.to_string(), db_name, data.to_vec()) + .await + { + let _ = self.discard_local_state(&key).await; + return Err(artifact_error_to_duckdb(e)); + } + Ok(()) } @@ -170,4 +194,255 @@ impl DuckDbService { }) .unwrap_or_else(|| "default".to_string()) } + + async fn handle(&self, space: &SpaceId, db_name: &str) -> Result { + let key = (space.to_string(), db_name.to_string()); + if let Some(handle) = self.databases.get(&key).map(|h| h.clone()) { + return Ok(handle); + } + + self.hydrate_cache(space, db_name).await?; + + Ok(self + .databases + .entry(key) + .or_insert_with(|| { + spawn_actor( + space.to_string(), + db_name.to_string(), + self.base_path.clone(), + self.memory_threshold, + self.idle_timeout_secs, + self.max_memory_per_connection.clone(), + self.databases.clone(), + ) + }) + .clone()) + } + + async fn hydrate_cache(&self, space: &SpaceId, db_name: &str) -> Result<(), DuckDbError> { + let cache_path = self.cache_path(space, db_name); + match self + .artifact_repository + .load("duckdb", &space.to_string(), db_name) + .await + .map_err(artifact_error_to_duckdb)? + { + Some(artifact) => { + remove_duckdb_cache_files(&cache_path).await?; + write_cache_file(&cache_path, &artifact.payload).await + } + None => remove_duckdb_cache_files(&cache_path).await, + } + } + + fn cache_path(&self, space: &SpaceId, db_name: &str) -> PathBuf { + PathBuf::from(&self.base_path) + .join(space.to_string()) + .join(format!("{}.duckdb", db_name)) + } + + async fn discard_local_state(&self, key: &(String, String)) -> Result<(), DuckDbError> { + self.databases.remove(key); + let cache_path = PathBuf::from(&self.base_path) + .join(&key.0) + .join(format!("{}.duckdb", key.1)); + remove_duckdb_cache_files(&cache_path).await + } +} + +async fn write_cache_file(path: &Path, payload: &[u8]) -> Result<(), DuckDbError> { + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent) + .await + .map_err(|e| DuckDbError::Internal(e.to_string()))?; + } + + let temp_path = path.with_extension("duckdb.tmp"); + tokio::fs::write(&temp_path, payload) + .await + .map_err(|e| DuckDbError::Internal(e.to_string()))?; + tokio::fs::rename(&temp_path, path) + .await + .map_err(|e| DuckDbError::Internal(e.to_string())) +} + +async fn remove_duckdb_cache_files(path: &Path) -> Result<(), DuckDbError> { + for candidate in [ + path.to_path_buf(), + PathBuf::from(format!("{}.tmp", path.display())), + PathBuf::from(format!("{}.wal", path.display())), + ] { + match tokio::fs::remove_file(&candidate).await { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => return Err(DuckDbError::Internal(e.to_string())), + } + } + Ok(()) +} + +fn artifact_error_to_duckdb(err: DatabaseArtifactError) -> DuckDbError { + DuckDbError::Internal(err.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + database_artifacts::SeaOrmDatabaseArtifactRepository, + migrations::Migrator, + sea_orm::{ConnectOptions, Database}, + sea_orm_migration::MigratorTrait, + }; + use tempfile::TempDir; + use tinycloud_auth::{ + resolver::DID_METHODS, + ssi::{dids::DIDBuf, jwk::JWK}, + }; + + fn test_space_id(name: &str) -> SpaceId { + let jwk = JWK::generate_ed25519().unwrap(); + let did: DIDBuf = DID_METHODS.generate(&jwk, "key").unwrap(); + SpaceId::new(did, name.parse().unwrap()) + } + + async fn artifact_repository() -> Arc { + let db = Database::connect(ConnectOptions::new("sqlite::memory:".to_string())) + .await + .unwrap(); + Migrator::up(&db, None).await.unwrap(); + Arc::new(SeaOrmDatabaseArtifactRepository::new(db)) + } + + fn service(cache: &TempDir, repo: Arc) -> DuckDbService { + DuckDbService::new( + cache.path().to_string_lossy().to_string(), + u64::MAX, + 300, + "128MB".to_string(), + repo, + ) + } + + #[tokio::test] + async fn duckdb_write_survives_service_recreation_with_empty_cache() { + let repo = artifact_repository().await; + let cache_one = TempDir::new().unwrap(); + let cache_two = TempDir::new().unwrap(); + let space = test_space_id("duckdb-hydrate"); + + service(&cache_one, repo.clone()) + .execute( + &space, + "analytics", + DuckDbRequest::Execute { + schema: Some(vec![ + "CREATE TABLE events (id INTEGER, name VARCHAR)".to_string() + ]), + sql: "INSERT INTO events VALUES (1, 'durable')".to_string(), + params: Vec::new(), + }, + None, + "tinycloud.duckdb/write".to_string(), + false, + ) + .await + .unwrap(); + + let recreated = service(&cache_two, repo); + let result = recreated + .execute( + &space, + "analytics", + DuckDbRequest::Query { + sql: "SELECT name FROM events ORDER BY id".to_string(), + params: Vec::new(), + }, + None, + "tinycloud.duckdb/read".to_string(), + false, + ) + .await + .unwrap(); + + match result.response { + DuckDbResponse::Query(query) => { + assert_eq!(query.row_count, 1); + assert_eq!(query.rows[0][0], DuckDbValue::Text("durable".to_string())); + } + other => panic!("expected query response, got {:?}", other), + } + + let exported = recreated.export(&space, "analytics").await.unwrap(); + assert!(!exported.is_empty(), "hydrated DuckDB should export"); + + let hydrated_path = cache_two + .path() + .join(space.to_string()) + .join("analytics.duckdb"); + assert!( + hydrated_path.exists(), + "durable artifact should hydrate cache" + ); + } + + #[tokio::test] + async fn duckdb_import_survives_service_recreation_with_empty_cache() { + let source_repo = artifact_repository().await; + let source_cache = TempDir::new().unwrap(); + let space = test_space_id("duckdb-import"); + + let source = service(&source_cache, source_repo); + source + .execute( + &space, + "source", + DuckDbRequest::Execute { + schema: Some(vec![ + "CREATE TABLE events (id INTEGER, name VARCHAR)".to_string() + ]), + sql: "INSERT INTO events VALUES (1, 'imported')".to_string(), + params: Vec::new(), + }, + None, + "tinycloud.duckdb/write".to_string(), + false, + ) + .await + .unwrap(); + let exported = source.export(&space, "source").await.unwrap(); + + let repo = artifact_repository().await; + let import_cache = TempDir::new().unwrap(); + service(&import_cache, repo.clone()) + .import_db(&space, "imported", &exported) + .await + .unwrap(); + + let empty_cache = TempDir::new().unwrap(); + let recreated = service(&empty_cache, repo); + let result = recreated + .execute( + &space, + "imported", + DuckDbRequest::Query { + sql: "SELECT name FROM events ORDER BY id".to_string(), + params: Vec::new(), + }, + None, + "tinycloud.duckdb/read".to_string(), + false, + ) + .await + .unwrap(); + + match result.response { + DuckDbResponse::Query(query) => { + assert_eq!(query.row_count, 1); + assert_eq!(query.rows[0][0], DuckDbValue::Text("imported".to_string())); + } + other => panic!("expected query response, got {:?}", other), + } + } } diff --git a/tinycloud-core/src/duckdb/types.rs b/tinycloud-core/src/duckdb/types.rs index 74d8c0d..a9026fa 100644 --- a/tinycloud-core/src/duckdb/types.rs +++ b/tinycloud-core/src/duckdb/types.rs @@ -80,7 +80,7 @@ fn default_ingest_mode() -> String { "create".to_string() } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum DuckDbValue { Null, Boolean(bool), diff --git a/tinycloud-core/src/lib.rs b/tinycloud-core/src/lib.rs index d994c67..e4cff9a 100644 --- a/tinycloud-core/src/lib.rs +++ b/tinycloud-core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod database_artifacts; pub mod db; pub mod duckdb; pub mod encryption; diff --git a/tinycloud-core/src/migrations/m20260516_000000_database_artifacts.rs b/tinycloud-core/src/migrations/m20260516_000000_database_artifacts.rs new file mode 100644 index 0000000..b4ba1b4 --- /dev/null +++ b/tinycloud-core/src/migrations/m20260516_000000_database_artifacts.rs @@ -0,0 +1,87 @@ +use sea_orm_migration::prelude::*; + +use crate::models::database_artifact; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(database_artifact::Entity) + .if_not_exists() + .col( + ColumnDef::new(database_artifact::Column::Service) + .string() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::Space) + .string() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::Name) + .string() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::Revision) + .big_integer() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::ContentHash) + .string() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::Payload) + .blob() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::SizeBytes) + .big_integer() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::Backend) + .string() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::StorageMode) + .string() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::CreatedAt) + .string() + .not_null(), + ) + .col( + ColumnDef::new(database_artifact::Column::UpdatedAt) + .string() + .not_null(), + ) + .primary_key( + Index::create() + .col(database_artifact::Column::Service) + .col(database_artifact::Column::Space) + .col(database_artifact::Column::Name), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(database_artifact::Entity).to_owned()) + .await + } +} diff --git a/tinycloud-core/src/migrations/mod.rs b/tinycloud-core/src/migrations/mod.rs index f9d0cca..e03e7e1 100644 --- a/tinycloud-core/src/migrations/mod.rs +++ b/tinycloud-core/src/migrations/mod.rs @@ -3,6 +3,7 @@ pub mod m20230510_101010_init_tables; pub mod m20260218_sql_database; pub mod m20260409_000000_hook_tables; pub mod m20260512_000000_signed_kv_tickets; +pub mod m20260516_000000_database_artifacts; pub struct Migrator; @@ -14,6 +15,7 @@ impl MigratorTrait for Migrator { Box::new(m20260218_sql_database::Migration), Box::new(m20260409_000000_hook_tables::Migration), Box::new(m20260512_000000_signed_kv_tickets::Migration), + Box::new(m20260516_000000_database_artifacts::Migration), ] } } diff --git a/tinycloud-core/src/models/database_artifact.rs b/tinycloud-core/src/models/database_artifact.rs new file mode 100644 index 0000000..ef4d061 --- /dev/null +++ b/tinycloud-core/src/models/database_artifact.rs @@ -0,0 +1,25 @@ +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "database_artifact")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub service: String, + #[sea_orm(primary_key, auto_increment = false)] + pub space: String, + #[sea_orm(primary_key, auto_increment = false)] + pub name: String, + pub revision: i64, + pub content_hash: String, + pub payload: Vec, + pub size_bytes: i64, + pub backend: String, + pub storage_mode: String, + pub created_at: String, + pub updated_at: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/tinycloud-core/src/models/mod.rs b/tinycloud-core/src/models/mod.rs index 644d3ac..722215e 100644 --- a/tinycloud-core/src/models/mod.rs +++ b/tinycloud-core/src/models/mod.rs @@ -1,5 +1,6 @@ pub mod abilities; pub mod actor; +pub mod database_artifact; pub mod delegation; pub mod epoch; pub mod hook_delivery; diff --git a/tinycloud-core/src/sql/database.rs b/tinycloud-core/src/sql/database.rs index 7cbc4d5..5ef2a3a 100644 --- a/tinycloud-core/src/sql/database.rs +++ b/tinycloud-core/src/sql/database.rs @@ -135,22 +135,6 @@ pub fn spawn_actor( } } - // Flush in-memory database to file before shutdown so data is not lost - if matches!(mode, StorageMode::InMemory) { - if let Ok(size) = storage::database_size(&conn) { - if size > 0 { - match storage::promote_to_file(&conn, &file_path) { - Ok(_) => { - tracing::info!(space=%space_id, db=%db_name, "Flushed in-memory database to file on shutdown"); - } - Err(e) => { - tracing::error!(space=%space_id, db=%db_name, error=%e, "Failed to flush in-memory database on shutdown"); - } - } - } - } - } - databases.remove(&(space_id.clone(), db_name.clone())); tracing::debug!(space=%space_id, db=%db_name, "Database actor shutting down"); }); @@ -160,36 +144,28 @@ pub fn spawn_actor( fn handle_export( conn: &rusqlite::Connection, - mode: &StorageMode, - file_path: &PathBuf, + _mode: &StorageMode, + _file_path: &PathBuf, ) -> Result, SqlError> { - match mode { - StorageMode::File(_) => { - // File-backed: read the file directly - std::fs::read(file_path).map_err(|e| SqlError::Internal(e.to_string())) - } - StorageMode::InMemory => { - // In-memory: serialize via SQLite backup API to a temp file - let temp_dir = tempfile::tempdir().map_err(|e| SqlError::Internal(e.to_string()))?; - let temp_path = temp_dir.path().join("export.db"); + // Serialize through SQLite's backup API for both in-memory and WAL-backed + // file databases so the exported artifact contains a complete checkpoint. + let temp_dir = tempfile::tempdir().map_err(|e| SqlError::Internal(e.to_string()))?; + let temp_path = temp_dir.path().join("export.db"); - let mut dest = rusqlite::Connection::open(&temp_path) - .map_err(|e| SqlError::Internal(e.to_string()))?; + let mut dest = + rusqlite::Connection::open(&temp_path).map_err(|e| SqlError::Internal(e.to_string()))?; - { - let backup = rusqlite::backup::Backup::new(conn, &mut dest) - .map_err(|e| SqlError::Internal(e.to_string()))?; - backup - .run_to_completion(5, std::time::Duration::from_millis(250), None) - .map_err(|e| SqlError::Internal(e.to_string()))?; - } + { + let backup = rusqlite::backup::Backup::new(conn, &mut dest) + .map_err(|e| SqlError::Internal(e.to_string()))?; + backup + .run_to_completion(5, std::time::Duration::from_millis(250), None) + .map_err(|e| SqlError::Internal(e.to_string()))?; + } - // Close the connection so the file is flushed - drop(dest); + drop(dest); - std::fs::read(&temp_path).map_err(|e| SqlError::Internal(e.to_string())) - } - } + std::fs::read(&temp_path).map_err(|e| SqlError::Internal(e.to_string())) } fn handle_message( diff --git a/tinycloud-core/src/sql/service.rs b/tinycloud-core/src/sql/service.rs index 1def9ff..ef67ec3 100644 --- a/tinycloud-core/src/sql/service.rs +++ b/tinycloud-core/src/sql/service.rs @@ -1,8 +1,13 @@ -use std::sync::Arc; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; use dashmap::DashMap; use tinycloud_auth::resource::SpaceId; +use crate::database_artifacts::{DatabaseArtifactError, DatabaseArtifactRepository}; + use super::{ caveats::SqlCaveats, database::{spawn_actor, DatabaseHandle}, @@ -13,14 +18,20 @@ pub struct SqlService { databases: Arc>, base_path: String, memory_threshold: u64, + artifact_repository: Arc, } impl SqlService { - pub fn new(base_path: String, memory_threshold: u64) -> Self { + pub fn new( + base_path: String, + memory_threshold: u64, + artifact_repository: Arc, + ) -> Self { Self { databases: Arc::new(DashMap::new()), base_path, memory_threshold, + artifact_repository, } } @@ -33,21 +44,9 @@ impl SqlService { ability: String, ) -> Result { let key = (space.to_string(), db_name.to_string()); - let handle = self - .databases - .entry(key.clone()) - .or_insert_with(|| { - spawn_actor( - space.to_string(), - db_name.to_string(), - self.base_path.clone(), - self.memory_threshold, - self.databases.clone(), - ) - }) - .clone(); + let mut handle = self.handle(space, db_name).await?; - match handle + let result = match handle .execute(request.clone(), caveats.clone(), ability.clone()) .await { @@ -55,23 +54,31 @@ impl SqlService { // Actor is dead — remove stale entry and respawn tracing::warn!(space=%space, db=%db_name, "Dead SQL actor detected, respawning"); self.databases.remove(&key); - let new_handle = self - .databases - .entry(key) - .or_insert_with(|| { - spawn_actor( - space.to_string(), - db_name.to_string(), - self.base_path.clone(), - self.memory_threshold, - self.databases.clone(), - ) - }) - .clone(); - new_handle.execute(request, caveats, ability).await + handle = self.handle(space, db_name).await?; + handle.execute(request, caveats, ability).await } other => other, + }?; + + if !result.write_targets.is_empty() { + let payload = match handle.export().await { + Ok(payload) => payload, + Err(e) => { + let _ = self.discard_local_state(&key).await; + return Err(e); + } + }; + if let Err(e) = self + .artifact_repository + .save("sql", &space.to_string(), db_name, payload) + .await + { + let _ = self.discard_local_state(&key).await; + return Err(artifact_error_to_sql(e)); + } } + + Ok(result) } pub async fn export(&self, space: &SpaceId, db_name: &str) -> Result, SqlError> { @@ -91,20 +98,278 @@ impl SqlService { } } - // No live actor — try reading the file directly (cold database) - let path = std::path::PathBuf::from(&self.base_path) - .join(space.to_string()) - .join(format!("{}.db", db_name)); - - if !path.exists() { - return Err(SqlError::DatabaseNotFound); + match self + .artifact_repository + .load("sql", &space.to_string(), db_name) + .await + .map_err(artifact_error_to_sql)? + { + Some(artifact) => { + remove_sql_cache_files(&self.cache_path(space, db_name)).await?; + write_cache_file(&self.cache_path(space, db_name), &artifact.payload).await?; + Ok(artifact.payload) + } + None => Err(SqlError::DatabaseNotFound), } - - std::fs::read(&path).map_err(|e| SqlError::Internal(e.to_string())) } pub fn db_name_from_path(path: Option<&str>) -> String { path.map(|p| p.split('/').next_back().unwrap_or("default").to_string()) .unwrap_or_else(|| "default".to_string()) } + + async fn handle(&self, space: &SpaceId, db_name: &str) -> Result { + let key = (space.to_string(), db_name.to_string()); + if let Some(handle) = self.databases.get(&key).map(|h| h.clone()) { + return Ok(handle); + } + + self.hydrate_cache(space, db_name).await?; + + Ok(self + .databases + .entry(key) + .or_insert_with(|| { + spawn_actor( + space.to_string(), + db_name.to_string(), + self.base_path.clone(), + self.memory_threshold, + self.databases.clone(), + ) + }) + .clone()) + } + + async fn hydrate_cache(&self, space: &SpaceId, db_name: &str) -> Result<(), SqlError> { + let cache_path = self.cache_path(space, db_name); + match self + .artifact_repository + .load("sql", &space.to_string(), db_name) + .await + .map_err(artifact_error_to_sql)? + { + Some(artifact) => { + remove_sql_cache_files(&cache_path).await?; + write_cache_file(&cache_path, &artifact.payload).await + } + None => remove_sql_cache_files(&cache_path).await, + } + } + + fn cache_path(&self, space: &SpaceId, db_name: &str) -> PathBuf { + PathBuf::from(&self.base_path) + .join(space.to_string()) + .join(format!("{}.db", db_name)) + } + + async fn discard_local_state(&self, key: &(String, String)) -> Result<(), SqlError> { + self.databases.remove(key); + let cache_path = PathBuf::from(&self.base_path) + .join(&key.0) + .join(format!("{}.db", key.1)); + remove_sql_cache_files(&cache_path).await + } +} + +async fn write_cache_file(path: &Path, payload: &[u8]) -> Result<(), SqlError> { + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent) + .await + .map_err(|e| SqlError::Internal(e.to_string()))?; + } + + let temp_path = path.with_extension("db.tmp"); + tokio::fs::write(&temp_path, payload) + .await + .map_err(|e| SqlError::Internal(e.to_string()))?; + tokio::fs::rename(&temp_path, path) + .await + .map_err(|e| SqlError::Internal(e.to_string())) +} + +async fn remove_sql_cache_files(path: &Path) -> Result<(), SqlError> { + for candidate in [ + path.to_path_buf(), + PathBuf::from(format!("{}-wal", path.display())), + PathBuf::from(format!("{}-shm", path.display())), + ] { + match tokio::fs::remove_file(&candidate).await { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => return Err(SqlError::Internal(e.to_string())), + } + } + Ok(()) +} + +fn artifact_error_to_sql(err: DatabaseArtifactError) -> SqlError { + SqlError::Internal(err.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + database_artifacts::{DatabaseArtifact, SeaOrmDatabaseArtifactRepository}, + migrations::Migrator, + sea_orm::{ConnectOptions, Database}, + sea_orm_migration::MigratorTrait, + }; + use async_trait::async_trait; + use tempfile::TempDir; + use tinycloud_auth::{ + resolver::DID_METHODS, + ssi::{dids::DIDBuf, jwk::JWK}, + }; + + fn test_space_id(name: &str) -> SpaceId { + let jwk = JWK::generate_ed25519().unwrap(); + let did: DIDBuf = DID_METHODS.generate(&jwk, "key").unwrap(); + SpaceId::new(did, name.parse().unwrap()) + } + + async fn artifact_repository() -> Arc { + let db = Database::connect(ConnectOptions::new("sqlite::memory:".to_string())) + .await + .unwrap(); + Migrator::up(&db, None).await.unwrap(); + Arc::new(SeaOrmDatabaseArtifactRepository::new(db)) + } + + #[tokio::test] + async fn sql_write_survives_service_recreation_with_empty_cache() { + let repo = artifact_repository().await; + let cache_one = TempDir::new().unwrap(); + let cache_two = TempDir::new().unwrap(); + let space = test_space_id("sql-hydrate"); + + let service = SqlService::new( + cache_one.path().to_string_lossy().to_string(), + u64::MAX, + repo.clone(), + ); + service + .execute( + &space, + "main", + SqlRequest::Execute { + schema: Some(vec![ + "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT NOT NULL)" + .to_string(), + ]), + sql: "INSERT INTO items (name) VALUES (?)".to_string(), + params: vec![SqlValue::Text("durable".to_string())], + }, + None, + "tinycloud.sql/write".to_string(), + ) + .await + .unwrap(); + service + .execute( + &space, + "main", + SqlRequest::Execute { + schema: None, + sql: "INSERT INTO items (name) VALUES (?)".to_string(), + params: vec![SqlValue::Text("updated".to_string())], + }, + None, + "tinycloud.sql/write".to_string(), + ) + .await + .unwrap(); + drop(service); + + let recreated = SqlService::new( + cache_two.path().to_string_lossy().to_string(), + u64::MAX, + repo, + ); + let result = recreated + .execute( + &space, + "main", + SqlRequest::Query { + sql: "SELECT name FROM items ORDER BY id".to_string(), + params: Vec::new(), + }, + None, + "tinycloud.sql/read".to_string(), + ) + .await + .unwrap(); + + match result.response { + SqlResponse::Query(query) => { + assert_eq!(query.row_count, 2); + assert_eq!(query.rows[0][0], SqlValue::Text("durable".to_string())); + assert_eq!(query.rows[1][0], SqlValue::Text("updated".to_string())); + } + other => panic!("expected query response, got {:?}", other), + } + + let hydrated_path = cache_two.path().join(space.to_string()).join("main.db"); + assert!( + hydrated_path.exists(), + "durable artifact should hydrate cache" + ); + } + + struct FailingArtifactRepository; + + #[async_trait] + impl DatabaseArtifactRepository for FailingArtifactRepository { + async fn load( + &self, + _service: &str, + _space: &str, + _name: &str, + ) -> Result, DatabaseArtifactError> { + Ok(None) + } + + async fn save( + &self, + _service: &str, + _space: &str, + _name: &str, + _payload: Vec, + ) -> Result { + Err(DatabaseArtifactError::Backend("forced failure".to_string())) + } + } + + #[tokio::test] + async fn sql_write_fails_when_durable_persistence_fails() { + let cache = TempDir::new().unwrap(); + let space = test_space_id("sql-failure"); + let service = SqlService::new( + cache.path().to_string_lossy().to_string(), + u64::MAX, + Arc::new(FailingArtifactRepository), + ); + + let err = service + .execute( + &space, + "main", + SqlRequest::Execute { + schema: Some(vec!["CREATE TABLE items (name TEXT NOT NULL)".to_string()]), + sql: "INSERT INTO items (name) VALUES ('lost')".to_string(), + params: Vec::new(), + }, + None, + "tinycloud.sql/write".to_string(), + ) + .await + .expect_err("write must fail when durable save fails"); + + assert!(matches!(err, SqlError::Internal(_))); + assert!(matches!( + service.export(&space, "main").await, + Err(SqlError::DatabaseNotFound) + )); + } } diff --git a/tinycloud-core/src/sql/types.rs b/tinycloud-core/src/sql/types.rs index e4f12e0..88e408c 100644 --- a/tinycloud-core/src/sql/types.rs +++ b/tinycloud-core/src/sql/types.rs @@ -38,7 +38,7 @@ pub struct SqlStatement { pub params: Vec, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum SqlValue { Null, Integer(i64), diff --git a/tinycloud-node-server/src/lib.rs b/tinycloud-node-server/src/lib.rs index 164a249..b0e4054 100644 --- a/tinycloud-node-server/src/lib.rs +++ b/tinycloud-node-server/src/lib.rs @@ -7,7 +7,7 @@ extern crate tokio; use anyhow::{Context, Result}; use rocket::{fairing::AdHoc, figment::Figment, http::Header, Build, Rocket}; -use std::path::Path; +use std::{path::Path, sync::Arc}; pub mod allow_list; pub mod auth_guards; @@ -45,6 +45,7 @@ use storage::{ }; use tee::TeeContext; use tinycloud_core::{ + database_artifacts::SeaOrmDatabaseArtifactRepository, duckdb::DuckDbService, keys::{SecretsSetup, StaticSecret}, sea_orm::{ConnectOptions, Database, DatabaseConnection}, @@ -194,8 +195,13 @@ pub async fn app(config: &Figment) -> Result> { connect_opts.max_connections(100); } + let database_connection = Database::connect(connect_opts).await?; + let database_artifact_repository = Arc::new(SeaOrmDatabaseArtifactRepository::new( + database_connection.clone(), + )); + let tinycloud = TinyCloud::new( - Database::connect(connect_opts).await?, + database_connection, tinycloud_config.storage.blocks.open().await?, key_setup.setup(()).await?, ) @@ -205,6 +211,7 @@ pub async fn app(config: &Figment) -> Result> { let sql_service = SqlService::new( tinycloud_config.storage.sql.path.clone().expect("resolved"), tinycloud_config.storage.sql.memory_threshold.as_u64(), + database_artifact_repository.clone(), ); let duckdb_service = DuckDbService::new( @@ -221,6 +228,7 @@ pub async fn app(config: &Figment) -> Result> { .duckdb .max_memory_per_connection .clone(), + database_artifact_repository, ); let quota_cache = QuotaCache::new(