diff --git a/subgraph-radio/benches/attestations.rs b/subgraph-radio/benches/attestations.rs index 8da502c..cbf310d 100644 --- a/subgraph-radio/benches/attestations.rs +++ b/subgraph-radio/benches/attestations.rs @@ -89,12 +89,11 @@ mod attestation { .expect("Failed to connect to the in-memory database"), ); - black_box( - sqlx::migrate!("../migrations") - .run(&pool) - .await - .expect("Could not run migration"), - ); + sqlx::migrate!("../migrations") + .run(&pool) + .await + .expect("Could not run migration"); + black_box(()); let attestations = vec![ black_box(Attestation::new( diff --git a/subgraph-radio/src/operator/mod.rs b/subgraph-radio/src/operator/mod.rs index ec67600..1b2ecaf 100644 --- a/subgraph-radio/src/operator/mod.rs +++ b/subgraph-radio/src/operator/mod.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use std::env; -use std::path::Path; + +use std::str::FromStr; use std::sync::{atomic::Ordering, mpsc::Receiver, Arc}; use std::time::Duration; @@ -15,6 +15,7 @@ use graphcast_sdk::{ WakuMessage, }; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::SqlitePool; use tokio::time::{interval, timeout}; use tracing::{debug, error, info, trace, warn}; @@ -58,39 +59,55 @@ pub struct RadioOperator { pub db: SqlitePool, } -impl RadioOperator { - /// Create a radio operator with radio configurations, persisted data, - /// graphcast agent, and control flow - pub async fn new(config: &Config, agent: GraphcastAgent) -> RadioOperator { - debug!("Connecting to database"); +async fn setup_database_connection(db_url: &str) -> Result { + let connect_options = SqliteConnectOptions::from_str(db_url)? + .create_if_missing(true) + .journal_mode(SqliteJournalMode::Wal) + .busy_timeout(std::time::Duration::from_secs(5)); - let db = match &config.radio_setup().sqlite_file_path { - Some(path) => { - let cwd = env::current_dir().unwrap(); - let absolute_path = cwd.join(path); + let pool = SqlitePool::connect_with(connect_options).await?; - if !Path::new(&absolute_path).exists() { - std::fs::File::create(&absolute_path) - .expect("Failed to create the database file"); - debug!("Database file created at {}", absolute_path.display()); - } + Ok(pool) +} - let db_url = format!("sqlite://{}", absolute_path.display()); +async fn connect_to_database(config: &Config) -> Result> { + debug!("Connecting to database"); - SqlitePool::connect(&db_url) - .await - .expect("Could not connect to the SQLite database") + let db_url = match &config.radio_setup().sqlite_file_path { + Some(path) => { + let cwd = std::env::current_dir().unwrap(); + let absolute_path = cwd.join(path); + + if !std::path::Path::new(&absolute_path).exists() { + std::fs::File::create(&absolute_path).expect("Failed to create the database file"); + debug!("Database file created at {}", absolute_path.display()); } - None => SqlitePool::connect("sqlite::memory:") - .await - .expect("Failed to connect to the in-memory database"), - }; - debug!("Check for database migration"); - sqlx::migrate!("../migrations") - .run(&db) + format!("sqlite://{}", absolute_path.display()) + } + None => String::from("sqlite::memory:"), + }; + + let db_pool = setup_database_connection(&db_url).await?; + + debug!("Check for database migration"); + sqlx::migrate!("../migrations") + .run(&db_pool) + .await + .expect("Could not run migration"); + + Ok(db_pool) +} + +impl RadioOperator { + /// Create a radio operator with radio configurations, persisted data, + /// graphcast agent, and control flow + pub async fn new(config: &Config, agent: GraphcastAgent) -> RadioOperator { + debug!("Connecting to database"); + + let db = connect_to_database(config) .await - .expect("Could not run migration"); + .expect("Failed to connect to database"); debug!("Initializing Graphcast Agent"); let graphcast_agent = Arc::new(agent);