From 6cbb3302a6f9e31fa2e191cff225b04e68b3d08a Mon Sep 17 00:00:00 2001 From: Obiajulu-gif Date: Wed, 27 May 2026 22:35:07 +0100 Subject: [PATCH] Add interrupted write recovery schema --- .../20260527000002_write_recovery_records.sql | 28 +++++++ backend/src/db.rs | 59 +++++++++++++- backend/src/main.rs | 6 +- backend/src/routes/mod.rs | 2 + backend/src/routes/state.rs | 77 +++++++++++++++++++ 5 files changed, 166 insertions(+), 6 deletions(-) create mode 100644 backend/migrations/20260527000002_write_recovery_records.sql create mode 100644 backend/src/routes/state.rs diff --git a/backend/migrations/20260527000002_write_recovery_records.sql b/backend/migrations/20260527000002_write_recovery_records.sql new file mode 100644 index 00000000..1ea9d83a --- /dev/null +++ b/backend/migrations/20260527000002_write_recovery_records.sql @@ -0,0 +1,28 @@ +-- Durable ledger for database writes that may be interrupted after the request is accepted. +-- A pending/failed row gives operators and retry workers a stable idempotency key to inspect. +CREATE TABLE IF NOT EXISTS write_recovery_records ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + idempotency_key TEXT NOT NULL UNIQUE, + operation TEXT NOT NULL, + entity_type TEXT NOT NULL, + entity_id UUID, + status TEXT NOT NULL DEFAULT 'pending', + attempts INT NOT NULL DEFAULT 0, + last_error TEXT, + recovery_payload JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT write_recovery_status_check + CHECK (status IN ('pending', 'committed', 'failed', 'abandoned')) +); + +CREATE INDEX IF NOT EXISTS idx_write_recovery_status_updated + ON write_recovery_records (status, updated_at DESC, id DESC); + +CREATE INDEX IF NOT EXISTS idx_write_recovery_entity + ON write_recovery_records (entity_type, entity_id) + WHERE entity_id IS NOT NULL; + +CREATE TRIGGER write_recovery_records_updated_at + BEFORE UPDATE ON write_recovery_records + FOR EACH ROW EXECUTE FUNCTION set_updated_at(); diff --git a/backend/src/db.rs b/backend/src/db.rs index 09602aad..9c095b52 100644 --- a/backend/src/db.rs +++ b/backend/src/db.rs @@ -1,6 +1,49 @@ use crate::services::judge::JudgeService; use crate::services::stellar::StellarService; -use sqlx::PgPool; +use sqlx::{postgres::PgPoolOptions, PgPool}; +use std::time::Duration; + +#[derive(Debug, Clone)] +pub struct DbPoolConfig { + pub max_connections: u32, + pub min_connections: u32, + pub acquire_timeout_secs: u64, + pub idle_timeout_secs: u64, + pub max_lifetime_secs: u64, +} + +impl DbPoolConfig { + pub fn from_env() -> Self { + Self { + max_connections: env_u32("DB_MAX_CONNECTIONS", 10), + min_connections: env_u32("DB_MIN_CONNECTIONS", 1), + acquire_timeout_secs: env_u64("DB_ACQUIRE_TIMEOUT_SECS", 5), + idle_timeout_secs: env_u64("DB_IDLE_TIMEOUT_SECS", 300), + max_lifetime_secs: env_u64("DB_MAX_LIFETIME_SECS", 1800), + } + } +} + +pub async fn connect_pool(database_url: &str, config: DbPoolConfig) -> sqlx::Result { + PgPoolOptions::new() + .max_connections(config.max_connections) + .min_connections(config.min_connections) + .acquire_timeout(Duration::from_secs(config.acquire_timeout_secs)) + .idle_timeout(Duration::from_secs(config.idle_timeout_secs)) + .max_lifetime(Duration::from_secs(config.max_lifetime_secs)) + .after_connect(|conn, _meta| { + Box::pin(async move { + sqlx::query( + "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED", + ) + .execute(conn) + .await?; + Ok(()) + }) + }) + .connect(database_url) + .await +} #[derive(Clone)] pub struct AppState { @@ -18,3 +61,17 @@ impl AppState { } } } + +fn env_u32(key: &str, default: u32) -> u32 { + std::env::var(key) + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(default) +} + +fn env_u64(key: &str, default: u64) -> u64 { + std::env::var(key) + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(default) +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 7c2aeca9..19b9aa0c 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,6 +1,5 @@ use axum::Router; use dotenvy::dotenv; -use sqlx::postgres::PgPoolOptions; use std::net::SocketAddr; use tower_http::{cors::CorsLayer, trace::TraceLayer}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -27,10 +26,7 @@ async fn main() -> anyhow::Result<()> { let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let pool = PgPoolOptions::new() - .max_connections(10) - .connect(&database_url) - .await?; + let pool = db::connect_pool(&database_url, db::DbPoolConfig::from_env()).await?; sqlx::migrate!("./migrations").run(&pool).await?; diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs index 233b820c..5c0e7fae 100644 --- a/backend/src/routes/mod.rs +++ b/backend/src/routes/mod.rs @@ -6,6 +6,7 @@ pub mod evidence; pub mod health; pub mod jobs; pub mod milestones; +pub mod state; pub mod uploads; pub mod users; pub mod verdicts; @@ -24,6 +25,7 @@ pub fn api_router() -> Router { .nest("/jobs", jobs::router()) .nest("/disputes", disputes::router()) .nest("/appeals", appeals::router()) + .nest("/state", state::router()) .nest("/users", users::router()) .nest("/uploads", uploads::router()), ) diff --git a/backend/src/routes/state.rs b/backend/src/routes/state.rs new file mode 100644 index 00000000..8fa890c8 --- /dev/null +++ b/backend/src/routes/state.rs @@ -0,0 +1,77 @@ +use axum::{ + extract::{Query, State}, + routing::get, + Json, Router, +}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::{db::AppState, error::Result}; + +const DEFAULT_RECOVERY_LIMIT: i64 = 50; +const MAX_RECOVERY_LIMIT: i64 = 200; + +pub fn router() -> Router { + Router::new().route("/write-recovery", get(list_write_recovery)) +} + +#[derive(Debug, Deserialize)] +struct RecoveryQuery { + status: Option, + limit: Option, +} + +#[derive(Debug, Serialize, sqlx::FromRow)] +struct WriteRecoveryRecord { + id: Uuid, + idempotency_key: String, + operation: String, + entity_type: String, + entity_id: Option, + status: String, + attempts: i32, + last_error: Option, + created_at: DateTime, + updated_at: DateTime, +} + +#[tracing::instrument(skip(state), fields(status = query.status.as_deref().unwrap_or("any")))] +async fn list_write_recovery( + State(state): State, + Query(query): Query, +) -> Result>> { + let limit = query + .limit + .unwrap_or(DEFAULT_RECOVERY_LIMIT) + .clamp(1, MAX_RECOVERY_LIMIT); + + let rows = if let Some(status) = query.status { + sqlx::query_as::<_, WriteRecoveryRecord>( + r#"SELECT id, idempotency_key, operation, entity_type, entity_id, status, + attempts, last_error, created_at, updated_at + FROM write_recovery_records + WHERE status = $1 + ORDER BY updated_at DESC, id DESC + LIMIT $2"#, + ) + .bind(status) + .bind(limit) + .fetch_all(&state.pool) + .await? + } else { + sqlx::query_as::<_, WriteRecoveryRecord>( + r#"SELECT id, idempotency_key, operation, entity_type, entity_id, status, + attempts, last_error, created_at, updated_at + FROM write_recovery_records + ORDER BY updated_at DESC, id DESC + LIMIT $1"#, + ) + .bind(limit) + .fetch_all(&state.pool) + .await? + }; + + tracing::debug!(records = rows.len()); + Ok(Json(rows)) +}