diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 89e385ebe12..d8cd4884bcc 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView, }; -use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId}; +use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId}; use spacetimedb_datastore::system_tables::{ system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID, }; @@ -1617,62 +1617,20 @@ impl RelationalDB { } } -fn apply_history(datastore: &Locking, database_identity: Identity, history: H) -> Result<(), DBError> -where - H: durability::History, -{ - log::info!("[{database_identity}] DATABASE: applying transaction history..."); - - // TODO: Revisit once we actually replay history suffixes, ie. starting - // from an offset larger than the history's min offset. - // TODO: We may want to require that a `tokio::runtime::Handle` is - // always supplied when constructing a `RelationalDB`. This would allow - // to spawn a timer task here which just prints the progress periodically - // in case the history is finite but very long. - let (_, max_tx_offset) = history.tx_range_hint(); - let mut last_logged_percentage = 0; - let progress = |tx_offset: u64| { - if let Some(max_tx_offset) = max_tx_offset { - let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32; - if percentage > last_logged_percentage && percentage % 10 == 0 { - log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})"); - last_logged_percentage = percentage; - } - // Print _something_ even if we don't know what's still ahead. - } else if tx_offset.is_multiple_of(10_000) { - log::info!("[{database_identity}] Loading transaction {tx_offset}"); - } +fn apply_history( + datastore: &Locking, + database_identity: Identity, + history: impl durability::History, +) -> Result<(), DBError> { + let counters = ApplyHistoryCounters { + replay_commitlog_time_seconds: WORKER_METRICS + .replay_commitlog_time_seconds + .with_label_values(&database_identity), + replay_commitlog_num_commits: WORKER_METRICS + .replay_commitlog_num_commits + .with_label_values(&database_identity), }; - - let time_before = std::time::Instant::now(); - - let mut replay = datastore.replay( - progress, - // We don't want to instantiate an incorrect state; - // if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data. - spacetimedb_datastore::locking_tx_datastore::ErrorBehavior::FailFast, - ); - let start_tx_offset = replay.next_tx_offset(); - history - .fold_transactions_from(start_tx_offset, &mut replay) - .map_err(anyhow::Error::from)?; - - let time_elapsed = time_before.elapsed(); - WORKER_METRICS - .replay_commitlog_time_seconds - .with_label_values(&database_identity) - .set(time_elapsed.as_secs_f64()); - - let end_tx_offset = replay.next_tx_offset(); - WORKER_METRICS - .replay_commitlog_num_commits - .with_label_values(&database_identity) - .set((end_tx_offset - start_tx_offset) as _); - - log::info!("[{database_identity}] DATABASE: applied transaction history"); - datastore.rebuild_state_after_replay()?; - log::info!("[{database_identity}] DATABASE: rebuilt state after replay"); - + spacetimedb_datastore::locking_tx_datastore::apply_history(datastore, database_identity, history, counters)?; Ok(()) } diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index cde37aeff08..bb872de5ffa 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -24,7 +24,7 @@ use crate::{ }, }; use anyhow::anyhow; -use core::{cell::RefCell, ops::RangeBounds}; +use core::ops::RangeBounds; use parking_lot::{Mutex, RwLock}; use spacetimedb_data_structures::map::{HashCollectionExt, HashMap}; use spacetimedb_durability::TxOffset; @@ -33,7 +33,7 @@ use spacetimedb_lib::{ConnectionId, Identity}; use spacetimedb_paths::server::SnapshotDirPath; use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId}; use spacetimedb_sats::memory_usage::MemoryUsage; -use spacetimedb_sats::{bsatn, AlgebraicValue, ProductValue}; +use spacetimedb_sats::{AlgebraicValue, ProductValue}; use spacetimedb_schema::table_name::TableName; use spacetimedb_schema::{ reducer_name::ReducerName, @@ -48,7 +48,6 @@ use spacetimedb_table::{ use std::borrow::Cow; use std::sync::Arc; use std::time::{Duration, Instant}; -use thiserror::Error; pub type Result = std::result::Result; @@ -171,13 +170,9 @@ impl Locking { /// The provided closure will be called for each transaction found in the /// history, the parameter is the transaction's offset. The closure is called /// _before_ the transaction is applied to the database state. - pub fn replay(&self, progress: F, error_behavior: ErrorBehavior) -> Replay { - Replay { - database_identity: self.database_identity, - committed_state: self.committed_state.clone(), - progress: RefCell::new(progress), - error_behavior, - } + pub fn replay(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<'_, F> { + let committed_state = self.committed_state.write(); + Replay::new(self.database_identity, committed_state, progress, error_behavior) } /// Construct a new [`Locking`] datastore containing the state stored in `snapshot`. @@ -999,18 +994,6 @@ impl Locking { } } -#[derive(Debug, Error)] -pub enum ReplayError { - #[error("Expected tx offset {expected}, encountered {encountered}")] - InvalidOffset { expected: u64, encountered: u64 }, - #[error(transparent)] - Decode(#[from] bsatn::DecodeError), - #[error(transparent)] - Db(#[from] DatastoreError), - #[error(transparent)] - Any(#[from] anyhow::Error), -} - /// Construct a [`Metadata`] from the given [`RowRef`], /// reading only the columns necessary to construct the value. fn metadata_from_row(row: RowRef<'_>) -> Result { @@ -1041,7 +1024,6 @@ pub(crate) mod tests { }; use crate::traits::{IsolationLevel, MutTx}; use crate::Result; - use bsatn::to_vec; use core::{fmt, mem}; use itertools::Itertools; use pretty_assertions::{assert_eq, assert_matches}; @@ -1053,7 +1035,7 @@ pub(crate) mod tests { use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration}; use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId}; use spacetimedb_sats::algebraic_value::ser::value_serialize; - use spacetimedb_sats::bsatn::ToBsatn; + use spacetimedb_sats::bsatn::{to_vec, ToBsatn}; use spacetimedb_sats::layout::RowTypeLayout; use spacetimedb_sats::raw_identifier::RawIdentifier; use spacetimedb_sats::{product, AlgebraicType, GroundSpacetimeType, SumTypeVariant, SumValue}; diff --git a/crates/datastore/src/locking_tx_datastore/mod.rs b/crates/datastore/src/locking_tx_datastore/mod.rs index 21954e703df..8f77b462bdd 100644 --- a/crates/datastore/src/locking_tx_datastore/mod.rs +++ b/crates/datastore/src/locking_tx_datastore/mod.rs @@ -9,7 +9,7 @@ pub mod state_view; pub use state_view::{IterByColEqTx, IterByColRangeTx}; pub mod delete_table; mod replay; -pub use replay::{ErrorBehavior, Replay}; +pub use replay::{apply_history, ApplyHistoryCounters, ErrorBehavior, Replay}; mod tx; pub use tx::{NumDistinctValues, TxId}; mod tx_state; diff --git a/crates/datastore/src/locking_tx_datastore/replay.rs b/crates/datastore/src/locking_tx_datastore/replay.rs index 76611c776c5..0ba3f3f8f78 100644 --- a/crates/datastore/src/locking_tx_datastore/replay.rs +++ b/crates/datastore/src/locking_tx_datastore/replay.rs @@ -1,46 +1,137 @@ use super::committed_state::CommittedState; -use super::datastore::Result; +use super::datastore::{Locking, Result}; use crate::db_metrics::DB_METRICS; -use crate::error::{IndexError, TableError}; -use crate::locking_tx_datastore::datastore::ReplayError; -use crate::locking_tx_datastore::state_view::iter_st_column_for_table; -use crate::locking_tx_datastore::state_view::StateView; -use crate::system_tables::{is_built_in_meta_row, StFields as _}; -use crate::system_tables::{StColumnRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID}; +use crate::error::{DatastoreError, IndexError, TableError}; +use crate::locking_tx_datastore::state_view::{iter_st_column_for_table, StateView}; +use crate::system_tables::{ + is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID, +}; use anyhow::{anyhow, Context}; +use core::cell::RefMut; use core::ops::{Deref, DerefMut, RangeBounds}; -use parking_lot::{RwLock, RwLockReadGuard}; +use parking_lot::RwLockWriteGuard; +use prometheus::core::{AtomicF64, GenericGauge}; +use prometheus::IntGauge; use spacetimedb_commitlog::payload::txdata; use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet}; +use spacetimedb_durability::History; +use spacetimedb_durability::Txdata; use spacetimedb_lib::Identity; use spacetimedb_primitives::{ColId, ColList, TableId}; use spacetimedb_sats::algebraic_value::de::ValueDeserializer; use spacetimedb_sats::buffer::BufReader; -use spacetimedb_sats::{AlgebraicValue, Deserialize, ProductValue}; +use spacetimedb_sats::{bsatn, AlgebraicValue, Deserialize, ProductValue}; use spacetimedb_schema::schema::{ColumnSchema, TableSchema}; use spacetimedb_schema::table_name::TableName; use spacetimedb_table::indexes::RowPointer; use spacetimedb_table::table::{InsertError, RowRef}; use std::cell::RefCell; use std::sync::Arc; +use thiserror::Error; + +pub fn apply_history( + datastore: &Locking, + database_identity: Identity, + history: impl History>, + counters: ApplyHistoryCounters, +) -> Result<()> { + log::info!("[{database_identity}] DATABASE: applying transaction history..."); + + // TODO: Revisit once we actually replay history suffixes, ie. starting + // from an offset larger than the history's min offset. + // TODO: We may want to require that a `tokio::runtime::Handle` is + // always supplied when constructing a `RelationalDB`. This would allow + // to spawn a timer task here which just prints the progress periodically + // in case the history is finite but very long. + let (_, max_tx_offset) = history.tx_range_hint(); + let mut last_logged_percentage = 0; + let progress = |tx_offset: u64| { + if let Some(max_tx_offset) = max_tx_offset { + let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32; + if percentage > last_logged_percentage && percentage % 10 == 0 { + log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})"); + last_logged_percentage = percentage; + } + // Print _something_ even if we don't know what's still ahead. + } else if tx_offset.is_multiple_of(10_000) { + log::info!("[{database_identity}] Loading transaction {tx_offset}"); + } + }; + + let time_before = std::time::Instant::now(); + + let mut replay = datastore.replay( + progress, + // We don't want to instantiate an incorrect state; + // if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data. + ErrorBehavior::FailFast, + ); + let start_tx_offset = replay.next_tx_offset(); + history + .fold_transactions_from(start_tx_offset, &mut replay) + .map_err(anyhow::Error::from)?; + + let time_elapsed = time_before.elapsed(); + counters.replay_commitlog_time_seconds.set(time_elapsed.as_secs_f64()); + + let end_tx_offset = replay.next_tx_offset(); + counters + .replay_commitlog_num_commits + .set((end_tx_offset - start_tx_offset) as _); + + log::info!("[{database_identity}] DATABASE: applied transaction history"); + drop(replay); // Neccessary to avoid a deadlock. + datastore.rebuild_state_after_replay()?; + log::info!("[{database_identity}] DATABASE: rebuilt state after replay"); + + Ok(()) +} + +pub struct ApplyHistoryCounters { + pub replay_commitlog_time_seconds: GenericGauge, + pub replay_commitlog_num_commits: IntGauge, +} + +#[derive(Debug, Error)] +pub enum ReplayError { + #[error("Expected tx offset {expected}, encountered {encountered}")] + InvalidOffset { expected: u64, encountered: u64 }, + #[error(transparent)] + Decode(#[from] bsatn::DecodeError), + #[error(transparent)] + Db(#[from] DatastoreError), + #[error(transparent)] + Any(#[from] anyhow::Error), +} /// A [`spacetimedb_commitlog::Decoder`] suitable for replaying a transaction /// history into the database state. -pub struct Replay { - pub(super) database_identity: Identity, - pub(super) committed_state: Arc>, - pub(super) progress: RefCell, - pub(super) error_behavior: ErrorBehavior, +pub struct Replay<'a, F> { + database_identity: Identity, + committed_state: RefCell>, + progress: RefCell, + error_behavior: ErrorBehavior, } -impl Replay { - fn using_visitor(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T { - let mut committed_state = self.committed_state.write(); - let state = &mut *committed_state; - let committed_state = ReplayCommittedState::new(state); +impl<'a, F> Replay<'a, F> { + pub fn new( + database_identity: Identity, + committed_state: RwLockWriteGuard<'a, CommittedState>, + progress: F, + error_behavior: ErrorBehavior, + ) -> Self { + Self { + database_identity, + committed_state: RefCell::new(ReplayCommittedState::new(committed_state)), + progress: RefCell::new(progress), + error_behavior, + } + } + + fn using_visitor(&self, f: impl FnOnce(&mut ReplayVisitor<'_, '_, F>) -> T) -> T { let mut visitor = ReplayVisitor { database_identity: &self.database_identity, - committed_state, + committed_state: &mut self.committed_state.borrow_mut(), progress: &mut *self.progress.borrow_mut(), dropped_table_names: IntMap::default(), error_behavior: self.error_behavior, @@ -49,16 +140,16 @@ impl Replay { } pub fn next_tx_offset(&self) -> u64 { - self.committed_state.read_arc().next_tx_offset + self.committed_state.borrow().next_tx_offset } // NOTE: This is not unused. - pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> { - self.committed_state.read() + pub fn committed_state(&self) -> RefMut<'_, ReplayCommittedState<'a>> { + self.committed_state.borrow_mut() } } -impl spacetimedb_commitlog::Decoder for &mut Replay { +impl spacetimedb_commitlog::Decoder for &mut Replay<'_, F> { type Record = txdata::Txdata; type Error = txdata::DecoderError; @@ -139,9 +230,9 @@ pub enum ErrorBehavior { Warn, } -struct ReplayVisitor<'a, F> { +struct ReplayVisitor<'a, 'cs, F> { database_identity: &'a Identity, - committed_state: ReplayCommittedState<'a>, + committed_state: &'a mut ReplayCommittedState<'cs>, progress: &'a mut F, // Since deletes are handled before truncation / drop, sometimes the schema // info is gone. We save the name on the first delete of that table so metrics @@ -150,7 +241,7 @@ struct ReplayVisitor<'a, F> { error_behavior: ErrorBehavior, } -impl ReplayVisitor<'_, F> { +impl ReplayVisitor<'_, '_, F> { /// Process `err` according to `self.error_behavior`, /// either warning about it or returning it. /// @@ -166,7 +257,7 @@ impl ReplayVisitor<'_, F> { } } -impl spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> { +impl spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, '_, F> { type Error = ReplayError; // NOTE: Technically, this could be `()` if and when we can extract the // row data without going through `ProductValue` (PV). @@ -314,9 +405,9 @@ impl spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi } /// A `CommittedState` under construction during replay. -struct ReplayCommittedState<'cs> { - /// The committed state being contructed. - state: &'cs mut CommittedState, +pub struct ReplayCommittedState<'cs> { + /// The committed state being constructed. + state: RwLockWriteGuard<'cs, CommittedState>, /// Whether the table was dropped within the current transaction during replay. /// @@ -361,25 +452,25 @@ struct ReplayCommittedState<'cs> { /// /// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`], /// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows. - pub(super) replay_table_updated: IntMap, + replay_table_updated: IntMap, } impl Deref for ReplayCommittedState<'_> { type Target = CommittedState; fn deref(&self) -> &Self::Target { - self.state + &self.state } } impl DerefMut for ReplayCommittedState<'_> { fn deref_mut(&mut self) -> &mut Self::Target { - &mut *self.state + &mut self.state } } impl<'cs> ReplayCommittedState<'cs> { - fn new(state: &'cs mut CommittedState) -> Self { + fn new(state: RwLockWriteGuard<'cs, CommittedState>) -> Self { Self { state, replay_table_dropped: <_>::default(), @@ -512,7 +603,7 @@ impl<'cs> ReplayCommittedState<'cs> { let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&st_column_row.elements[1])) .expect("second field in `st_column` should decode to a `ColId`"); - let outdated_st_column_rows = iter_st_column_for_table(self.state, &target_table_id.into())? + let outdated_st_column_rows = iter_st_column_for_table(self, &target_table_id.into())? .filter_map(|row_ref| { StColumnRow::try_from(row_ref) .map(|c| (c.col_pos == target_col_id && row_ref.pointer() != row_ptr).then(|| row_ref.pointer())) @@ -542,7 +633,7 @@ impl<'cs> ReplayCommittedState<'cs> { // and not the other one, as it is being replaced. // `Self::ignore_previous_version_of_column` has marked the old version as ignored, // so filter only the non-ignored columns. - let mut columns = iter_st_column_for_table(self.state, &table_id.into())? + let mut columns = iter_st_column_for_table(self, &table_id.into())? .filter(|row_ref| !self.replay_columns_to_ignore.contains(&row_ref.pointer())) .map(|row_ref| { let row = StColumnRow::try_from(row_ref)?; @@ -766,7 +857,7 @@ mod tests { // Directly call replay_insert on committed state. let row = u32_str_u32(1, "Carol", 40); { - let state = &mut *datastore.committed_state.write(); + let state = datastore.committed_state.write(); let mut committed_state = ReplayCommittedState::new(state); committed_state.replay_insert(table_id, &schema, &row)?; }