Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 6 additions & 194 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ use super::{
};
use crate::{
db_metrics::DB_METRICS,
error::{DatastoreError, TableError, ViewError},
error::TableError,
execution_context::ExecutionContext,
locking_tx_datastore::{mut_tx::ViewReadSets, state_view::ScanOrIndex, IterByColRangeTx},
system_tables::{
system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, StIndexRow,
StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable, ST_CLIENT_ID,
system_tables, StColumnRow, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, SystemTable, ST_CLIENT_ID,
ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX,
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX,
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID,
Expand All @@ -33,13 +32,13 @@ use crate::{
};
use anyhow::anyhow;
use core::{convert::Infallible, ops::RangeBounds};
use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet};
use spacetimedb_data_structures::map::{IntMap, IntSet};
use spacetimedb_durability::TxOffset;
use spacetimedb_lib::{db::auth::StTableType, Identity};
use spacetimedb_primitives::{ColList, ColSet, IndexId, SequenceId, TableId, ViewId};
use spacetimedb_primitives::{ColList, IndexId, TableId, ViewId};
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
Expand Down Expand Up @@ -200,93 +199,6 @@ impl CommittedState {
}
}

/// Delete all but the highest-allocation `st_sequence` row for each system sequence.
///
/// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
/// initialized newly-created system sequences to `allocation: 4097`,
/// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
/// This affected the system table migration which added
/// `st_view_view_id_seq` and `st_view_arg_id_seq`.
/// As a result, when replaying these databases' commitlogs without a snapshot,
/// we will end up with two rows in `st_sequence` for each of these sequences,
/// resulting in a unique constraint violation in `CommittedState::build_indexes`.
/// We call this method in [`super::datastore::Locking::rebuild_state_after_replay`]
/// to avoid that unique constraint violation.
pub(super) fn fixup_delete_duplicate_system_sequence_rows(&mut self) {
struct StSequenceRowInfo {
sequence_id: SequenceId,
allocated: i128,
row_pointer: RowPointer,
}

// Get all the `st_sequence` rows which refer to sequences on system tables,
// including any duplicates caused by the bug described above.
let sequence_rows = self
.table_scan(ST_SEQUENCE_ID)
.expect("`st_sequence` should exist")
.filter_map(|row_ref| {
// Read the table ID to which the sequence refers,
// in order to determine if this is a system sequence or not.
let table_id = row_ref
.read_col::<TableId>(StSequenceFields::TableId)
.expect("`st_sequence` row should conform to `st_sequence` schema");

// If this sequence refers to a system table, it may need a fixup.
// User tables' sequences will never need fixups.
table_id_is_reserved(table_id).then(|| {
let allocated = row_ref
.read_col::<i128>(StSequenceFields::Allocated)
.expect("`st_sequence` row should conform to `st_sequence` schema");
let sequence_id = row_ref
.read_col::<SequenceId>(StSequenceFields::SequenceId)
.expect("`st_sequence` row should conform to `st_sequence` schema");
StSequenceRowInfo {
allocated,
sequence_id,
row_pointer: row_ref.pointer(),
}
})
})
.collect::<Vec<_>>();

let (st_sequence, blob_store, ..) = self
.get_table_and_blob_store_mut(ST_SEQUENCE_ID)
.expect("`st_sequence` should exist");

// Track the row with the highest allocation for each sequence.
let mut highest_allocations: HashMap<SequenceId, (i128, RowPointer)> = HashMap::default();

for StSequenceRowInfo {
sequence_id,
allocated,
row_pointer,
} in sequence_rows
{
// For each `st_sequence` row which refers to a system table,
// if we've already seen a row for the same sequence,
// keep only the row with the higher allocation.
if let Some((prev_allocated, prev_row_pointer)) =
highest_allocations.insert(sequence_id, (allocated, row_pointer))
{
// We have a duplicate row. We want to keep whichever has the higher `allocated`,
// and delete the other.
let row_pointer_to_delete = if prev_allocated > allocated {
// The previous row has a higher allocation than the new row,
// so delete the new row and restore `previous` to `highest_allocations`.
highest_allocations.insert(sequence_id, (prev_allocated, prev_row_pointer));
row_pointer
} else {
// The previous row does not have a higher allocation than the new,
// so delete the previous row and keep the new one.
prev_row_pointer
};

st_sequence.delete(blob_store, row_pointer_to_delete, |_| ())
.expect("Duplicated `st_sequence` row at `row_pointer_to_delete` should be present in `st_sequence` during fixup");
}
}
}

/// Extremely delicate function to bootstrap the system tables.
/// Don't update this unless you know what you're doing.
pub(super) fn bootstrap_system_tables(&mut self, database_identity: Identity) -> Result<()> {
Expand Down Expand Up @@ -484,106 +396,6 @@ impl CommittedState {
Ok(sequence_state)
}

pub(super) fn build_indexes(&mut self) -> Result<()> {
let st_indexes = self.tables.get(&ST_INDEX_ID).unwrap();
let rows = st_indexes
.scan_rows(&self.blob_store)
.map(StIndexRow::try_from)
.collect::<Result<Vec<_>>>()?;

let st_constraints = self.tables.get(&ST_CONSTRAINT_ID).unwrap();
let unique_constraints: HashSet<(TableId, ColSet)> = st_constraints
.scan_rows(&self.blob_store)
.map(StConstraintRow::try_from)
.filter_map(Result::ok)
.filter_map(|constraint| match constraint.constraint_data {
StConstraintData::Unique { columns } => Some((constraint.table_id, columns)),
_ => None,
})
.collect();

for index_row in rows {
let index_id = index_row.index_id;
let table_id = index_row.table_id;
let (table, blob_store, index_id_map, _) = self
.get_table_and_blob_store_mut(table_id)
.expect("index should exist in committed state; cannot create it");
let algo: IndexAlgorithm = index_row.index_algorithm.into();
let columns: ColSet = algo.columns().into();
let is_unique = unique_constraints.contains(&(table_id, columns));

let index = table.new_index(&algo, is_unique)?;
// SAFETY: `index` was derived from `table`.
unsafe { table.insert_index(blob_store, index_id, index) }
.expect("rebuilding should not cause constraint violations");
index_id_map.insert(index_id, table_id);
}
Ok(())
}

pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> {
self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect();
Ok(())
}

fn ephemeral_tables(&self) -> Result<Vec<TableId>> {
let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID];

let Some(st_view) = self.tables.get(&ST_VIEW_ID) else {
return Ok(tables);
};
let backing_tables = st_view
.scan_rows(&self.blob_store)
.map(|row_ref| {
let view_row = StViewRow::try_from(row_ref)?;
view_row
.table_id
.ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_row.view_id)))
})
.collect::<Result<Vec<_>>>()?;

tables.extend(backing_tables);

Ok(tables)
}

/// After replaying all old transactions,
/// inserts and deletes into the system tables
/// might not be reflected in the schemas of the built tables.
/// So we must re-schema every built table.
pub(super) fn reschema_tables(&mut self) -> Result<()> {
// For already built tables, we need to reschema them to account for constraints et al.
let mut schemas = Vec::with_capacity(self.tables.len());
for table_id in self.tables.keys().copied() {
schemas.push(self.schema_for_table_raw(table_id)?);
}
for (table, schema) in self.tables.values_mut().zip(schemas) {
table.with_mut_schema(|s| *s = schema);
}
Ok(())
}

/// After replaying all old transactions, tables which have rows will
/// have been created in memory, but tables with no rows will not have
/// been created. This function ensures that they are created.
pub(super) fn build_missing_tables(&mut self) -> Result<()> {
// Find all ids of tables that are in `st_tables` but haven't been built.
let table_ids = self
.get_table(ST_TABLE_ID)
.unwrap()
.scan_rows(&self.blob_store)
.map(|r| r.read_col(StTableFields::TableId).unwrap())
.filter(|table_id| self.get_table(*table_id).is_none())
.collect::<Vec<_>>();

// Construct their schemas and insert tables for them.
for table_id in table_ids {
let schema = self.schema_for_table(table_id)?;
self.create_table(table_id, schema);
}
Ok(())
}

/// Returns an iterator doing a full table scan on `table_id`.
pub(super) fn table_scan<'a>(&'a self, table_id: TableId) -> Option<TableScanIter<'a>> {
Some(self.get_table(table_id)?.scan_rows(&self.blob_store))
Expand Down Expand Up @@ -1008,7 +820,7 @@ impl CommittedState {
Table::new(schema, SquashedOffset::COMMITTED_STATE)
}

fn create_table(&mut self, table_id: TableId, schema: Arc<TableSchema>) {
pub(super) fn create_table(&mut self, table_id: TableId, schema: Arc<TableSchema>) {
self.tables.insert(table_id, Self::make_table(schema));
}

Expand Down
48 changes: 4 additions & 44 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
tx_state::TxState,
};
use crate::execution_context::{Workload, WorkloadType};
use crate::locking_tx_datastore::replay::{ErrorBehavior, Replay};
use crate::locking_tx_datastore::replay::{build_sequence_state, ErrorBehavior, Replay};
use crate::{
db_metrics::DB_METRICS,
error::{DatastoreError, TableError},
Expand Down Expand Up @@ -68,7 +68,7 @@ pub struct Locking {
// made private again.
pub committed_state: Arc<RwLock<CommittedState>>,
/// The state of sequence generation in this database.
sequence_state: Arc<Mutex<SequencesState>>,
pub(super) sequence_state: Arc<Mutex<SequencesState>>,
/// The identity of this database.
pub(crate) database_identity: Identity,
}
Expand Down Expand Up @@ -117,11 +117,7 @@ impl Locking {
commit_state.bootstrap_system_tables(database_identity)?;
// The database tables are now initialized with the correct data.
// Now we have to build our in memory structures.
{
let sequence_state = commit_state.build_sequence_state()?;
// Reset our sequence state so that they start in the right places.
*datastore.sequence_state.lock() = sequence_state;
}
build_sequence_state(&datastore, &mut commit_state)?;

// We don't want to build indexes here; we'll build those later,
// in `rebuild_state_after_replay`.
Expand All @@ -132,38 +128,6 @@ impl Locking {
Ok(datastore)
}

/// The purpose of this is to rebuild the state of the datastore
/// after having inserted all of rows from the message log.
/// This is necessary because, for example, inserting a row into `st_table`
/// is not equivalent to calling `create_table`.
/// There may eventually be better way to do this, but this will have to do for now.
pub fn rebuild_state_after_replay(&self) -> Result<()> {
let mut committed_state = self.committed_state.write_arc();

// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
// initialized newly-created system sequences to `allocation: 4097`,
// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
// This affected the system table migration which added
// `st_view_view_id_seq` and `st_view_arg_id_seq`.
// As a result, when replaying these databases' commitlogs without a snapshot,
// we will end up with two rows in `st_sequence` for each of these sequences,
// resulting in a unique constraint violation in `CommittedState::build_indexes`.
// We fix this by, for each system sequence, deleting all but the row with the highest allocation.
committed_state.fixup_delete_duplicate_system_sequence_rows();

// `build_missing_tables` must be called before indexes.
// Honestly this should maybe just be one big procedure.
// See John Carmack's philosophy on this.
committed_state.reschema_tables()?;
committed_state.build_missing_tables()?;
committed_state.build_indexes()?;
// Figure out where to pick up for each sequence.
*self.sequence_state.lock() = committed_state.build_sequence_state()?;

committed_state.collect_ephemeral_tables()?;
Ok(())
}

/// Obtain a [`spacetimedb_commitlog::Decoder`] suitable for replaying a
/// [`spacetimedb_durability::History`] onto the currently committed state.
///
Expand Down Expand Up @@ -242,11 +206,7 @@ impl Locking {
// Set the sequence state. In practice we will end up doing this again after replaying
// the commit log, but we do it here too just to avoid having an incorrectly restored
// snapshot.
{
let sequence_state = committed_state.build_sequence_state()?;
// Reset our sequence state so that they start in the right places.
*datastore.sequence_state.lock() = sequence_state;
}
build_sequence_state(&datastore, &mut committed_state)?;

// The next TX offset after restoring from a snapshot is one greater than the snapshotted offset.
committed_state.next_tx_offset = tx_offset + 1;
Expand Down
Loading
Loading