Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 14 additions & 56 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
};
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId};
use spacetimedb_datastore::system_tables::{
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
};
Expand Down Expand Up @@ -1617,62 +1617,20 @@ impl RelationalDB {
}
}

fn apply_history<H>(datastore: &Locking, database_identity: Identity, history: H) -> Result<(), DBError>
where
H: durability::History<TxData = Txdata>,
{
log::info!("[{database_identity}] DATABASE: applying transaction history...");

// TODO: Revisit once we actually replay history suffixes, ie. starting
// from an offset larger than the history's min offset.
// TODO: We may want to require that a `tokio::runtime::Handle` is
// always supplied when constructing a `RelationalDB`. This would allow
// to spawn a timer task here which just prints the progress periodically
// in case the history is finite but very long.
let (_, max_tx_offset) = history.tx_range_hint();
let mut last_logged_percentage = 0;
let progress = |tx_offset: u64| {
if let Some(max_tx_offset) = max_tx_offset {
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
if percentage > last_logged_percentage && percentage % 10 == 0 {
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
last_logged_percentage = percentage;
}
// Print _something_ even if we don't know what's still ahead.
} else if tx_offset.is_multiple_of(10_000) {
log::info!("[{database_identity}] Loading transaction {tx_offset}");
}
fn apply_history(
datastore: &Locking,
database_identity: Identity,
history: impl durability::History<TxData = Txdata>,
) -> Result<(), DBError> {
let counters = ApplyHistoryCounters {
replay_commitlog_time_seconds: WORKER_METRICS
.replay_commitlog_time_seconds
.with_label_values(&database_identity),
replay_commitlog_num_commits: WORKER_METRICS
.replay_commitlog_num_commits
.with_label_values(&database_identity),
};

let time_before = std::time::Instant::now();

let mut replay = datastore.replay(
progress,
// We don't want to instantiate an incorrect state;
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
spacetimedb_datastore::locking_tx_datastore::ErrorBehavior::FailFast,
);
let start_tx_offset = replay.next_tx_offset();
history
.fold_transactions_from(start_tx_offset, &mut replay)
.map_err(anyhow::Error::from)?;

let time_elapsed = time_before.elapsed();
WORKER_METRICS
.replay_commitlog_time_seconds
.with_label_values(&database_identity)
.set(time_elapsed.as_secs_f64());

let end_tx_offset = replay.next_tx_offset();
WORKER_METRICS
.replay_commitlog_num_commits
.with_label_values(&database_identity)
.set((end_tx_offset - start_tx_offset) as _);

log::info!("[{database_identity}] DATABASE: applied transaction history");
datastore.rebuild_state_after_replay()?;
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");

spacetimedb_datastore::locking_tx_datastore::apply_history(datastore, database_identity, history, counters)?;
Ok(())
}

Expand Down
30 changes: 6 additions & 24 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
},
};
use anyhow::anyhow;
use core::{cell::RefCell, ops::RangeBounds};
use core::ops::RangeBounds;
use parking_lot::{Mutex, RwLock};
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
use spacetimedb_durability::TxOffset;
Expand All @@ -33,7 +33,7 @@ use spacetimedb_lib::{ConnectionId, Identity};
use spacetimedb_paths::server::SnapshotDirPath;
use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId};
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{bsatn, AlgebraicValue, ProductValue};
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_schema::table_name::TableName;
use spacetimedb_schema::{
reducer_name::ReducerName,
Expand All @@ -48,7 +48,6 @@ use spacetimedb_table::{
use std::borrow::Cow;
use std::sync::Arc;
use std::time::{Duration, Instant};
use thiserror::Error;

pub type Result<T> = std::result::Result<T, DatastoreError>;

Expand Down Expand Up @@ -171,13 +170,9 @@ impl Locking {
/// The provided closure will be called for each transaction found in the
/// history, the parameter is the transaction's offset. The closure is called
/// _before_ the transaction is applied to the database state.
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<F> {
Replay {
database_identity: self.database_identity,
committed_state: self.committed_state.clone(),
progress: RefCell::new(progress),
error_behavior,
}
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<'_, F> {
let committed_state = self.committed_state.write();
Replay::new(self.database_identity, committed_state, progress, error_behavior)
}

/// Construct a new [`Locking`] datastore containing the state stored in `snapshot`.
Expand Down Expand Up @@ -999,18 +994,6 @@ impl Locking {
}
}

#[derive(Debug, Error)]
pub enum ReplayError {
#[error("Expected tx offset {expected}, encountered {encountered}")]
InvalidOffset { expected: u64, encountered: u64 },
#[error(transparent)]
Decode(#[from] bsatn::DecodeError),
#[error(transparent)]
Db(#[from] DatastoreError),
#[error(transparent)]
Any(#[from] anyhow::Error),
}

/// Construct a [`Metadata`] from the given [`RowRef`],
/// reading only the columns necessary to construct the value.
fn metadata_from_row(row: RowRef<'_>) -> Result<Metadata> {
Expand Down Expand Up @@ -1041,7 +1024,6 @@ pub(crate) mod tests {
};
use crate::traits::{IsolationLevel, MutTx};
use crate::Result;
use bsatn::to_vec;
use core::{fmt, mem};
use itertools::Itertools;
use pretty_assertions::{assert_eq, assert_matches};
Expand All @@ -1053,7 +1035,7 @@ pub(crate) mod tests {
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration};
use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId};
use spacetimedb_sats::algebraic_value::ser::value_serialize;
use spacetimedb_sats::bsatn::ToBsatn;
use spacetimedb_sats::bsatn::{to_vec, ToBsatn};
use spacetimedb_sats::layout::RowTypeLayout;
use spacetimedb_sats::raw_identifier::RawIdentifier;
use spacetimedb_sats::{product, AlgebraicType, GroundSpacetimeType, SumTypeVariant, SumValue};
Expand Down
2 changes: 1 addition & 1 deletion crates/datastore/src/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod state_view;
pub use state_view::{IterByColEqTx, IterByColRangeTx};
pub mod delete_table;
mod replay;
pub use replay::{ErrorBehavior, Replay};
pub use replay::{apply_history, ApplyHistoryCounters, ErrorBehavior, Replay};
mod tx;
pub use tx::{NumDistinctValues, TxId};
mod tx_state;
Expand Down
Loading
Loading