From d0f021673b5f846b7344f96d90c53acfb96e2fc7 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 21 Apr 2026 16:21:51 +0200 Subject: [PATCH] move rebuild_state_after_replay to ReplayCommittedState move fixup_delete_duplicate_system_sequence_rows to mod replay extract build_sequence_state to mod replay move more build_* methods to mod replay --- .../locking_tx_datastore/committed_state.rs | 200 +-------------- .../src/locking_tx_datastore/datastore.rs | 48 +--- .../src/locking_tx_datastore/replay.rs | 237 +++++++++++++++++- 3 files changed, 241 insertions(+), 244 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 03b02b6958f..c479be085d9 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -8,12 +8,11 @@ use super::{ }; use crate::{ db_metrics::DB_METRICS, - error::{DatastoreError, TableError, ViewError}, + error::TableError, execution_context::ExecutionContext, locking_tx_datastore::{mut_tx::ViewReadSets, state_view::ScanOrIndex, IterByColRangeTx}, system_tables::{ - system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, - StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable, ST_CLIENT_ID, + system_tables, StColumnRow, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID, @@ -33,13 +32,13 @@ use crate::{ }; use anyhow::anyhow; use core::{convert::Infallible, ops::RangeBounds}; -use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet}; +use spacetimedb_data_structures::map::{IntMap, IntSet}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StTableType, Identity}; -use spacetimedb_primitives::{ColList, ColSet, IndexId, SequenceId, TableId, ViewId}; +use spacetimedb_primitives::{ColList, IndexId, TableId, ViewId}; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::{AlgebraicValue, ProductValue}; -use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema}; +use spacetimedb_schema::schema::TableSchema; use spacetimedb_table::{ blob_store::{BlobStore, HashMapBlobStore}, indexes::{RowPointer, SquashedOffset}, @@ -200,93 +199,6 @@ impl CommittedState { } } - /// Delete all but the highest-allocation `st_sequence` row for each system sequence. - /// - /// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate) - /// initialized newly-created system sequences to `allocation: 4097`, - /// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`. - /// This affected the system table migration which added - /// `st_view_view_id_seq` and `st_view_arg_id_seq`. - /// As a result, when replaying these databases' commitlogs without a snapshot, - /// we will end up with two rows in `st_sequence` for each of these sequences, - /// resulting in a unique constraint violation in `CommittedState::build_indexes`. - /// We call this method in [`super::datastore::Locking::rebuild_state_after_replay`] - /// to avoid that unique constraint violation. - pub(super) fn fixup_delete_duplicate_system_sequence_rows(&mut self) { - struct StSequenceRowInfo { - sequence_id: SequenceId, - allocated: i128, - row_pointer: RowPointer, - } - - // Get all the `st_sequence` rows which refer to sequences on system tables, - // including any duplicates caused by the bug described above. - let sequence_rows = self - .table_scan(ST_SEQUENCE_ID) - .expect("`st_sequence` should exist") - .filter_map(|row_ref| { - // Read the table ID to which the sequence refers, - // in order to determine if this is a system sequence or not. - let table_id = row_ref - .read_col::(StSequenceFields::TableId) - .expect("`st_sequence` row should conform to `st_sequence` schema"); - - // If this sequence refers to a system table, it may need a fixup. - // User tables' sequences will never need fixups. - table_id_is_reserved(table_id).then(|| { - let allocated = row_ref - .read_col::(StSequenceFields::Allocated) - .expect("`st_sequence` row should conform to `st_sequence` schema"); - let sequence_id = row_ref - .read_col::(StSequenceFields::SequenceId) - .expect("`st_sequence` row should conform to `st_sequence` schema"); - StSequenceRowInfo { - allocated, - sequence_id, - row_pointer: row_ref.pointer(), - } - }) - }) - .collect::>(); - - let (st_sequence, blob_store, ..) = self - .get_table_and_blob_store_mut(ST_SEQUENCE_ID) - .expect("`st_sequence` should exist"); - - // Track the row with the highest allocation for each sequence. - let mut highest_allocations: HashMap = HashMap::default(); - - for StSequenceRowInfo { - sequence_id, - allocated, - row_pointer, - } in sequence_rows - { - // For each `st_sequence` row which refers to a system table, - // if we've already seen a row for the same sequence, - // keep only the row with the higher allocation. - if let Some((prev_allocated, prev_row_pointer)) = - highest_allocations.insert(sequence_id, (allocated, row_pointer)) - { - // We have a duplicate row. We want to keep whichever has the higher `allocated`, - // and delete the other. - let row_pointer_to_delete = if prev_allocated > allocated { - // The previous row has a higher allocation than the new row, - // so delete the new row and restore `previous` to `highest_allocations`. - highest_allocations.insert(sequence_id, (prev_allocated, prev_row_pointer)); - row_pointer - } else { - // The previous row does not have a higher allocation than the new, - // so delete the previous row and keep the new one. - prev_row_pointer - }; - - st_sequence.delete(blob_store, row_pointer_to_delete, |_| ()) - .expect("Duplicated `st_sequence` row at `row_pointer_to_delete` should be present in `st_sequence` during fixup"); - } - } - } - /// Extremely delicate function to bootstrap the system tables. /// Don't update this unless you know what you're doing. pub(super) fn bootstrap_system_tables(&mut self, database_identity: Identity) -> Result<()> { @@ -484,106 +396,6 @@ impl CommittedState { Ok(sequence_state) } - pub(super) fn build_indexes(&mut self) -> Result<()> { - let st_indexes = self.tables.get(&ST_INDEX_ID).unwrap(); - let rows = st_indexes - .scan_rows(&self.blob_store) - .map(StIndexRow::try_from) - .collect::>>()?; - - let st_constraints = self.tables.get(&ST_CONSTRAINT_ID).unwrap(); - let unique_constraints: HashSet<(TableId, ColSet)> = st_constraints - .scan_rows(&self.blob_store) - .map(StConstraintRow::try_from) - .filter_map(Result::ok) - .filter_map(|constraint| match constraint.constraint_data { - StConstraintData::Unique { columns } => Some((constraint.table_id, columns)), - _ => None, - }) - .collect(); - - for index_row in rows { - let index_id = index_row.index_id; - let table_id = index_row.table_id; - let (table, blob_store, index_id_map, _) = self - .get_table_and_blob_store_mut(table_id) - .expect("index should exist in committed state; cannot create it"); - let algo: IndexAlgorithm = index_row.index_algorithm.into(); - let columns: ColSet = algo.columns().into(); - let is_unique = unique_constraints.contains(&(table_id, columns)); - - let index = table.new_index(&algo, is_unique)?; - // SAFETY: `index` was derived from `table`. - unsafe { table.insert_index(blob_store, index_id, index) } - .expect("rebuilding should not cause constraint violations"); - index_id_map.insert(index_id, table_id); - } - Ok(()) - } - - pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> { - self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect(); - Ok(()) - } - - fn ephemeral_tables(&self) -> Result> { - let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID]; - - let Some(st_view) = self.tables.get(&ST_VIEW_ID) else { - return Ok(tables); - }; - let backing_tables = st_view - .scan_rows(&self.blob_store) - .map(|row_ref| { - let view_row = StViewRow::try_from(row_ref)?; - view_row - .table_id - .ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_row.view_id))) - }) - .collect::>>()?; - - tables.extend(backing_tables); - - Ok(tables) - } - - /// After replaying all old transactions, - /// inserts and deletes into the system tables - /// might not be reflected in the schemas of the built tables. - /// So we must re-schema every built table. - pub(super) fn reschema_tables(&mut self) -> Result<()> { - // For already built tables, we need to reschema them to account for constraints et al. - let mut schemas = Vec::with_capacity(self.tables.len()); - for table_id in self.tables.keys().copied() { - schemas.push(self.schema_for_table_raw(table_id)?); - } - for (table, schema) in self.tables.values_mut().zip(schemas) { - table.with_mut_schema(|s| *s = schema); - } - Ok(()) - } - - /// After replaying all old transactions, tables which have rows will - /// have been created in memory, but tables with no rows will not have - /// been created. This function ensures that they are created. - pub(super) fn build_missing_tables(&mut self) -> Result<()> { - // Find all ids of tables that are in `st_tables` but haven't been built. - let table_ids = self - .get_table(ST_TABLE_ID) - .unwrap() - .scan_rows(&self.blob_store) - .map(|r| r.read_col(StTableFields::TableId).unwrap()) - .filter(|table_id| self.get_table(*table_id).is_none()) - .collect::>(); - - // Construct their schemas and insert tables for them. - for table_id in table_ids { - let schema = self.schema_for_table(table_id)?; - self.create_table(table_id, schema); - } - Ok(()) - } - /// Returns an iterator doing a full table scan on `table_id`. pub(super) fn table_scan<'a>(&'a self, table_id: TableId) -> Option> { Some(self.get_table(table_id)?.scan_rows(&self.blob_store)) @@ -1008,7 +820,7 @@ impl CommittedState { Table::new(schema, SquashedOffset::COMMITTED_STATE) } - fn create_table(&mut self, table_id: TableId, schema: Arc) { + pub(super) fn create_table(&mut self, table_id: TableId, schema: Arc) { self.tables.insert(table_id, Self::make_table(schema)); } diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index bb872de5ffa..7dd915847d2 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -3,7 +3,7 @@ use super::{ tx_state::TxState, }; use crate::execution_context::{Workload, WorkloadType}; -use crate::locking_tx_datastore::replay::{ErrorBehavior, Replay}; +use crate::locking_tx_datastore::replay::{build_sequence_state, ErrorBehavior, Replay}; use crate::{ db_metrics::DB_METRICS, error::{DatastoreError, TableError}, @@ -68,7 +68,7 @@ pub struct Locking { // made private again. pub committed_state: Arc>, /// The state of sequence generation in this database. - sequence_state: Arc>, + pub(super) sequence_state: Arc>, /// The identity of this database. pub(crate) database_identity: Identity, } @@ -117,11 +117,7 @@ impl Locking { commit_state.bootstrap_system_tables(database_identity)?; // The database tables are now initialized with the correct data. // Now we have to build our in memory structures. - { - let sequence_state = commit_state.build_sequence_state()?; - // Reset our sequence state so that they start in the right places. - *datastore.sequence_state.lock() = sequence_state; - } + build_sequence_state(&datastore, &mut commit_state)?; // We don't want to build indexes here; we'll build those later, // in `rebuild_state_after_replay`. @@ -132,38 +128,6 @@ impl Locking { Ok(datastore) } - /// The purpose of this is to rebuild the state of the datastore - /// after having inserted all of rows from the message log. - /// This is necessary because, for example, inserting a row into `st_table` - /// is not equivalent to calling `create_table`. - /// There may eventually be better way to do this, but this will have to do for now. - pub fn rebuild_state_after_replay(&self) -> Result<()> { - let mut committed_state = self.committed_state.write_arc(); - - // Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate) - // initialized newly-created system sequences to `allocation: 4097`, - // while `committed_state::bootstrap_system_tables` sets `allocation: 4096`. - // This affected the system table migration which added - // `st_view_view_id_seq` and `st_view_arg_id_seq`. - // As a result, when replaying these databases' commitlogs without a snapshot, - // we will end up with two rows in `st_sequence` for each of these sequences, - // resulting in a unique constraint violation in `CommittedState::build_indexes`. - // We fix this by, for each system sequence, deleting all but the row with the highest allocation. - committed_state.fixup_delete_duplicate_system_sequence_rows(); - - // `build_missing_tables` must be called before indexes. - // Honestly this should maybe just be one big procedure. - // See John Carmack's philosophy on this. - committed_state.reschema_tables()?; - committed_state.build_missing_tables()?; - committed_state.build_indexes()?; - // Figure out where to pick up for each sequence. - *self.sequence_state.lock() = committed_state.build_sequence_state()?; - - committed_state.collect_ephemeral_tables()?; - Ok(()) - } - /// Obtain a [`spacetimedb_commitlog::Decoder`] suitable for replaying a /// [`spacetimedb_durability::History`] onto the currently committed state. /// @@ -242,11 +206,7 @@ impl Locking { // Set the sequence state. In practice we will end up doing this again after replaying // the commit log, but we do it here too just to avoid having an incorrectly restored // snapshot. - { - let sequence_state = committed_state.build_sequence_state()?; - // Reset our sequence state so that they start in the right places. - *datastore.sequence_state.lock() = sequence_state; - } + build_sequence_state(&datastore, &mut committed_state)?; // The next TX offset after restoring from a snapshot is one greater than the snapshotted offset. committed_state.next_tx_offset = tx_offset + 1; diff --git a/crates/datastore/src/locking_tx_datastore/replay.rs b/crates/datastore/src/locking_tx_datastore/replay.rs index 0ba3f3f8f78..cf0fd357f39 100644 --- a/crates/datastore/src/locking_tx_datastore/replay.rs +++ b/crates/datastore/src/locking_tx_datastore/replay.rs @@ -1,10 +1,12 @@ use super::committed_state::CommittedState; use super::datastore::{Locking, Result}; use crate::db_metrics::DB_METRICS; -use crate::error::{DatastoreError, IndexError, TableError}; +use crate::error::{DatastoreError, IndexError, TableError, ViewError}; use crate::locking_tx_datastore::state_view::{iter_st_column_for_table, StateView}; use crate::system_tables::{ - is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID, + is_built_in_meta_row, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, StFields as _, + StIndexRow, StSequenceFields, StTableFields, StTableRow, StViewRow, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, + ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ARG_ID, ST_VIEW_ID, ST_VIEW_SUB_ID, }; use anyhow::{anyhow, Context}; use core::cell::RefMut; @@ -13,14 +15,15 @@ use parking_lot::RwLockWriteGuard; use prometheus::core::{AtomicF64, GenericGauge}; use prometheus::IntGauge; use spacetimedb_commitlog::payload::txdata; -use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet}; +use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet}; use spacetimedb_durability::History; use spacetimedb_durability::Txdata; use spacetimedb_lib::Identity; -use spacetimedb_primitives::{ColId, ColList, TableId}; +use spacetimedb_primitives::{ColId, ColList, ColSet, SequenceId, TableId}; use spacetimedb_sats::algebraic_value::de::ValueDeserializer; use spacetimedb_sats::buffer::BufReader; use spacetimedb_sats::{bsatn, AlgebraicValue, Deserialize, ProductValue}; +use spacetimedb_schema::def::IndexAlgorithm; use spacetimedb_schema::schema::{ColumnSchema, TableSchema}; use spacetimedb_schema::table_name::TableName; use spacetimedb_table::indexes::RowPointer; @@ -80,8 +83,7 @@ pub fn apply_history( .set((end_tx_offset - start_tx_offset) as _); log::info!("[{database_identity}] DATABASE: applied transaction history"); - drop(replay); // Neccessary to avoid a deadlock. - datastore.rebuild_state_after_replay()?; + replay.committed_state().rebuild_state_after_replay(datastore)?; log::info!("[{database_identity}] DATABASE: rebuilt state after replay"); Ok(()) @@ -479,6 +481,222 @@ impl<'cs> ReplayCommittedState<'cs> { } } + /// The purpose of this is to rebuild the state of the datastore + /// after having inserted all of rows from the message log. + /// This is necessary because, for example, inserting a row into `st_table` + /// is not equivalent to calling `create_table`. + /// There may eventually be better way to do this, but this will have to do for now. + pub fn rebuild_state_after_replay(&mut self, datastore: &Locking) -> Result<()> { + // Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate) + // initialized newly-created system sequences to `allocation: 4097`, + // while `committed_state::bootstrap_system_tables` sets `allocation: 4096`. + // This affected the system table migration which added + // `st_view_view_id_seq` and `st_view_arg_id_seq`. + // As a result, when replaying these databases' commitlogs without a snapshot, + // we will end up with two rows in `st_sequence` for each of these sequences, + // resulting in a unique constraint violation in `Self::build_indexes`. + // We fix this by, for each system sequence, deleting all but the row with the highest allocation. + self.fixup_delete_duplicate_system_sequence_rows(); + + // `build_missing_tables` must be called before indexes. + // Honestly this should maybe just be one big procedure. + // See John Carmack's philosophy on this. + self.reschema_tables()?; + self.build_missing_tables()?; + self.build_indexes()?; + self.collect_ephemeral_tables()?; + + // Figure out where to pick up for each sequence. + build_sequence_state(datastore, self)?; + + Ok(()) + } + + /// Delete all but the highest-allocation `st_sequence` row for each system sequence. + /// + /// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate) + /// initialized newly-created system sequences to `allocation: 4097`, + /// while [`CommittedState::bootstrap_system_tables`] sets `allocation: 4096`. + /// This affected the system table migration which added + /// `st_view_view_id_seq` and `st_view_arg_id_seq`. + /// As a result, when replaying these databases' commitlogs without a snapshot, + /// we will end up with two rows in `st_sequence` for each of these sequences, + /// resulting in a unique constraint violation in `CommittedState::build_indexes`. + /// We call this method in [`ReplayCommittedState::rebuild_state_after_replay`] + /// to avoid that unique constraint violation. + pub(super) fn fixup_delete_duplicate_system_sequence_rows(&mut self) { + struct StSequenceRowInfo { + sequence_id: SequenceId, + allocated: i128, + row_pointer: RowPointer, + } + + // Get all the `st_sequence` rows which refer to sequences on system tables, + // including any duplicates caused by the bug described above. + let sequence_rows = self + .table_scan(ST_SEQUENCE_ID) + .expect("`st_sequence` should exist") + .filter_map(|row_ref| { + // Read the table ID to which the sequence refers, + // in order to determine if this is a system sequence or not. + let table_id = row_ref + .read_col::(StSequenceFields::TableId) + .expect("`st_sequence` row should conform to `st_sequence` schema"); + + // If this sequence refers to a system table, it may need a fixup. + // User tables' sequences will never need fixups. + table_id_is_reserved(table_id).then(|| { + let allocated = row_ref + .read_col::(StSequenceFields::Allocated) + .expect("`st_sequence` row should conform to `st_sequence` schema"); + let sequence_id = row_ref + .read_col::(StSequenceFields::SequenceId) + .expect("`st_sequence` row should conform to `st_sequence` schema"); + StSequenceRowInfo { + allocated, + sequence_id, + row_pointer: row_ref.pointer(), + } + }) + }) + .collect::>(); + + let (st_sequence, blob_store, ..) = self + .get_table_and_blob_store_mut(ST_SEQUENCE_ID) + .expect("`st_sequence` should exist"); + + // Track the row with the highest allocation for each sequence. + let mut highest_allocations: HashMap = HashMap::default(); + + for StSequenceRowInfo { + sequence_id, + allocated, + row_pointer, + } in sequence_rows + { + // For each `st_sequence` row which refers to a system table, + // if we've already seen a row for the same sequence, + // keep only the row with the higher allocation. + if let Some((prev_allocated, prev_row_pointer)) = + highest_allocations.insert(sequence_id, (allocated, row_pointer)) + { + // We have a duplicate row. We want to keep whichever has the higher `allocated`, + // and delete the other. + let row_pointer_to_delete = if prev_allocated > allocated { + // The previous row has a higher allocation than the new row, + // so delete the new row and restore `previous` to `highest_allocations`. + highest_allocations.insert(sequence_id, (prev_allocated, prev_row_pointer)); + row_pointer + } else { + // The previous row does not have a higher allocation than the new, + // so delete the previous row and keep the new one. + prev_row_pointer + }; + + st_sequence.delete(blob_store, row_pointer_to_delete, |_| ()) + .expect("Duplicated `st_sequence` row at `row_pointer_to_delete` should be present in `st_sequence` during fixup"); + } + } + } + + pub(super) fn build_indexes(&mut self) -> Result<()> { + let st_indexes = self.tables.get(&ST_INDEX_ID).unwrap(); + let rows = st_indexes + .scan_rows(&self.blob_store) + .map(StIndexRow::try_from) + .collect::>>()?; + + let st_constraints = self.tables.get(&ST_CONSTRAINT_ID).unwrap(); + let unique_constraints: HashSet<(TableId, ColSet)> = st_constraints + .scan_rows(&self.blob_store) + .map(StConstraintRow::try_from) + .filter_map(Result::ok) + .filter_map(|constraint| match constraint.constraint_data { + StConstraintData::Unique { columns } => Some((constraint.table_id, columns)), + _ => None, + }) + .collect(); + + for index_row in rows { + let index_id = index_row.index_id; + let table_id = index_row.table_id; + let (table, blob_store, index_id_map, _) = self + .get_table_and_blob_store_mut(table_id) + .expect("index should exist in committed state; cannot create it"); + let algo: IndexAlgorithm = index_row.index_algorithm.into(); + let columns: ColSet = algo.columns().into(); + let is_unique = unique_constraints.contains(&(table_id, columns)); + + let index = table.new_index(&algo, is_unique)?; + // SAFETY: `index` was derived from `table`. + unsafe { table.insert_index(blob_store, index_id, index) } + .expect("rebuilding should not cause constraint violations"); + index_id_map.insert(index_id, table_id); + } + Ok(()) + } + + pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> { + self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect(); + Ok(()) + } + + fn ephemeral_tables(&self) -> Result> { + let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID]; + + let Some(st_view) = self.tables.get(&ST_VIEW_ID) else { + return Ok(tables); + }; + let backing_tables = st_view + .scan_rows(&self.blob_store) + .map(|row_ref| { + let StViewRow { table_id, view_id, .. } = StViewRow::try_from(row_ref)?; + table_id.ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_id))) + }) + .collect::>>()?; + + tables.extend(backing_tables); + + Ok(tables) + } + + /// After replaying all old transactions, + /// inserts and deletes into the system tables + /// might not be reflected in the schemas of the built tables. + /// So we must re-schema every built table. + pub(super) fn reschema_tables(&mut self) -> Result<()> { + // For already built tables, we need to reschema them to account for constraints et al. + let mut schemas = Vec::with_capacity(self.tables.len()); + for table_id in self.tables.keys().copied() { + schemas.push(self.schema_for_table_raw(table_id)?); + } + for (table, schema) in self.tables.values_mut().zip(schemas) { + table.with_mut_schema(|s| *s = schema); + } + Ok(()) + } + + /// After replaying all old transactions, tables which have rows will + /// have been created in memory, but tables with no rows will not have + /// been created. This function ensures that they are created. + pub(super) fn build_missing_tables(&mut self) -> Result<()> { + // Find all ids of tables that are in `st_tables` but haven't been built. + let table_ids = self + .get_table(ST_TABLE_ID) + .unwrap() + .scan_rows(&self.blob_store) + .map(|r| r.read_col(StTableFields::TableId).unwrap()) + .filter(|table_id| self.get_table(*table_id).is_none()) + .collect::>(); + + // Construct their schemas and insert tables for them. + for table_id in table_ids { + let schema = self.schema_for_table(table_id)?; + self.create_table(table_id, schema); + } + Ok(()) + } + fn replay_insert(&mut self, table_id: TableId, schema: &Arc, row: &ProductValue) -> Result<()> { // Event table rows in the commitlog are preserved for future replay features // but don't rebuild state — event tables have no committed state. @@ -770,6 +988,13 @@ impl<'cs> ReplayCommittedState<'cs> { } } +pub(super) fn build_sequence_state(datastore: &Locking, cs: &mut CommittedState) -> Result<()> { + let sequence_state = cs.build_sequence_state()?; + // Reset our sequence state so that they start in the right places. + *datastore.sequence_state.lock() = sequence_state; + Ok(()) +} + impl StateView for ReplayCommittedState<'_> { /// Find the `st_table` row for `table_id`, /// first inspecting [`Self::replay_table_updated`],