diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 3e10c6e920c..f01b8e34909 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -10,6 +10,7 @@ use futures::channel::mpsc; use futures::StreamExt; use parking_lot::RwLock; use spacetimedb_commitlog as commitlog; +use spacetimedb_data_structures::map::IntSet; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, TableError}; use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType}; @@ -23,7 +24,7 @@ use spacetimedb_datastore::system_tables::{system_tables, StModuleRow}; use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; use spacetimedb_datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, - TxTableTruncated, UpdateFlags, + UpdateFlags, }; use spacetimedb_datastore::{ locking_tx_datastore::{ @@ -889,18 +890,17 @@ impl RelationalDB { rowdata: rowdata.clone(), }) .collect(); + + let truncates: IntSet = tx_data.truncates().collect(); + let deletes: Box<_> = tx_data .deletes() - .filter(|(_, truncated, _)| *truncated == TxTableTruncated::No) - .map(|(table_id, _, rowdata)| Ops { + .map(|(table_id, rowdata)| Ops { table_id: *table_id, rowdata: rowdata.clone(), }) - .collect(); - let truncates: Box<_> = tx_data - .deletes() - .filter(|(_, truncated, _)| *truncated == TxTableTruncated::Yes) - .map(|(table_id, ..)| *table_id) + // filter out deletes for tables that are truncated in the same transaction. + .filter(|ops| !truncates.contains(&ops.table_id)) .collect(); let inputs = reducer_context.map(|rcx| rcx.into()); @@ -911,7 +911,7 @@ impl RelationalDB { mutations: Some(Mutations { inserts, deletes, - truncates, + truncates: truncates.into_iter().collect(), }), }; diff --git a/crates/core/src/subscription/tx.rs b/crates/core/src/subscription/tx.rs index 817140e5c06..43f99c4cf9f 100644 --- a/crates/core/src/subscription/tx.rs +++ b/crates/core/src/subscription/tx.rs @@ -76,10 +76,9 @@ impl DeltaTableIndexes { indexes } - let deletes = data.deletes().map(|(table_id, _, rows)| (table_id, rows)); Self { inserts: build_indexes_for_rows(tx, meta, data.inserts()), - deletes: build_indexes_for_rows(tx, meta, deletes), + deletes: build_indexes_for_rows(tx, meta, data.deletes()), } } } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 8f0e98bdde3..681e82b177a 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -610,13 +610,27 @@ impl CommittedState { pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData { let mut tx_data = TxData::default(); + let mut truncates = IntSet::default(); // First, apply deletes. This will free up space in the committed tables. - self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes); + self.merge_apply_deletes( + &mut tx_data, + tx_state.delete_tables, + tx_state.pending_schema_changes, + &mut truncates, + ); // Then, apply inserts. This will re-fill the holes freed by deletions // before allocating new pages. - self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store); + self.merge_apply_inserts( + &mut tx_data, + tx_state.insert_tables, + tx_state.blob_store, + &mut truncates, + ); + + // Record any truncated tables in the `TxData`. + tx_data.add_truncates(truncates); // If the TX will be logged, record its projected tx offset, // then increment the counter. @@ -633,6 +647,7 @@ impl CommittedState { tx_data: &mut TxData, delete_tables: BTreeMap, pending_schema_changes: ThinVec, + truncates: &mut IntSet, ) { fn delete_rows( tx_data: &mut TxData, @@ -641,6 +656,7 @@ impl CommittedState { blob_store: &mut dyn BlobStore, row_ptrs_len: usize, row_ptrs: impl Iterator, + truncates: &mut IntSet, ) { let mut deletes = Vec::with_capacity(row_ptrs_len); @@ -660,16 +676,25 @@ impl CommittedState { if !deletes.is_empty() { let table_name = &*table.get_schema().table_name; + tx_data.set_deletes_for_table(table_id, table_name, deletes.into()); let truncated = table.row_count == 0; - tx_data.set_deletes_for_table(table_id, table_name, deletes.into(), truncated); + if truncated { + truncates.insert(table_id); + } } } for (table_id, row_ptrs) in delete_tables { match self.get_table_and_blob_store_mut(table_id) { - Ok((table, blob_store, ..)) => { - delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter()) - } + Ok((table, blob_store, ..)) => delete_rows( + tx_data, + table_id, + table, + blob_store, + row_ptrs.len(), + row_ptrs.iter(), + truncates, + ), Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"), Err(_) => {} } @@ -688,6 +713,7 @@ impl CommittedState { &mut self.blob_store, row_ptrs.len(), row_ptrs.into_iter(), + truncates, ); } } @@ -698,6 +724,7 @@ impl CommittedState { tx_data: &mut TxData, insert_tables: BTreeMap, tx_blob_store: impl BlobStore, + truncates: &mut IntSet, ) { // TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state, // rather than copying individual rows out of them. @@ -726,6 +753,11 @@ impl CommittedState { if !inserts.is_empty() { let table_name = &*commit_table.get_schema().table_name; tx_data.set_inserts_for_table(table_id, table_name, inserts.into()); + + // if table has inserted rows, it cannot be truncated + if truncates.contains(&table_id) { + truncates.remove(&table_id); + } } let (schema, _indexes, pages) = tx_table.consume_for_merge(); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 4781ef2ea43..39f67696fd0 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -6,7 +6,6 @@ use super::{ tx::TxId, tx_state::TxState, }; -use crate::execution_context::{Workload, WorkloadType}; use crate::{ db_metrics::DB_METRICS, error::{DatastoreError, TableError}, @@ -23,18 +22,24 @@ use crate::{ DataRow, IsolationLevel, Metadata, MutTx, MutTxDatastore, Program, RowTypeForTable, Tx, TxData, TxDatastore, }, }; +use crate::{ + execution_context::{Workload, WorkloadType}, + system_tables::StTableRow, +}; use anyhow::{anyhow, Context}; use core::{cell::RefCell, ops::RangeBounds}; use parking_lot::{Mutex, RwLock}; use spacetimedb_commitlog::payload::{txdata, Txdata}; -use spacetimedb_data_structures::map::{HashCollectionExt, HashMap}; +use spacetimedb_data_structures::map::{HashCollectionExt, HashMap, IntMap}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics}; use spacetimedb_lib::{ConnectionId, Identity}; use spacetimedb_paths::server::SnapshotDirPath; use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId}; -use spacetimedb_sats::memory_usage::MemoryUsage; -use spacetimedb_sats::{bsatn, buffer::BufReader, AlgebraicValue, ProductValue}; +use spacetimedb_sats::{ + algebraic_value::de::ValueDeserializer, bsatn, buffer::BufReader, AlgebraicValue, ProductValue, +}; +use spacetimedb_sats::{memory_usage::MemoryUsage, Deserialize}; use spacetimedb_schema::schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema}; use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository}; use spacetimedb_table::{indexes::RowPointer, page_pool::PagePool, table::RowRef}; @@ -978,6 +983,7 @@ impl Replay { database_identity: &self.database_identity, committed_state: &mut committed_state, progress: &mut *self.progress.borrow_mut(), + dropped_table_names: IntMap::default(), }; f(&mut visitor) } @@ -1083,6 +1089,10 @@ struct ReplayVisitor<'a, F> { database_identity: &'a Identity, committed_state: &'a mut CommittedState, progress: &'a mut F, + // Since deletes are handled before truncation / drop, sometimes the schema + // info is gone. We save the name on the first delete of that table so metrics + // can still show a name. + dropped_table_names: IntMap>, } impl spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> { @@ -1139,6 +1149,14 @@ impl spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi let table_name = schema.table_name.clone(); let row = ProductValue::decode(schema.get_row_type(), reader)?; + // If this is a delete from the `st_table` system table, save the name + if table_id == ST_TABLE_ID { + let ab = AlgebraicValue::Product(row.clone()); + let st_table_row = StTableRow::deserialize(ValueDeserializer::from_ref(&ab)).unwrap(); + self.dropped_table_names + .insert(st_table_row.table_id, st_table_row.table_name); + } + self.committed_state .replay_delete_by_rel(table_id, &row) .with_context(|| { @@ -1158,9 +1176,18 @@ impl spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi } fn visit_truncate(&mut self, table_id: TableId) -> std::result::Result<(), Self::Error> { - let schema = self.committed_state.schema_for_table(table_id)?; - // TODO: avoid clone - let table_name = schema.table_name.clone(); + let table_name = match self.committed_state.schema_for_table(table_id) { + // TODO: avoid clone + Ok(schema) => schema.table_name.clone(), + + Err(_) => { + if let Some(name) = self.dropped_table_names.remove(&table_id) { + name + } else { + return Err(anyhow!("Error looking up name for truncated table {:?}", table_id).into()); + } + } + }; self.committed_state.replay_truncate(table_id).with_context(|| { format!( @@ -1231,7 +1258,7 @@ mod tests { ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, }; - use crate::traits::{IsolationLevel, MutTx, TxTableTruncated}; + use crate::traits::{IsolationLevel, MutTx}; use crate::Result; use bsatn::to_vec; use core::{fmt, mem}; @@ -3201,12 +3228,12 @@ mod tests { // Now drop the table again and commit. assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok()); let tx_data = commit(&datastore, tx)?; - let (_, truncated, deleted_rows) = tx_data + let (_, deleted_rows) = tx_data .deletes() .find(|(id, ..)| **id == table_id) .expect("should have deleted rows for `table_id`"); assert_eq!(&**deleted_rows, [row]); - assert_eq!(truncated, TxTableTruncated::Yes); + assert!(tx_data.truncates().contains(&table_id), "table should be truncated"); // In the next transaction, the table doesn't exist. assert!( @@ -3410,9 +3437,12 @@ mod tests { let to_product = |col: &ColumnSchema| value_serialize(&StColumnRow::from(col.clone())).into_product().unwrap(); let (_, inserts) = tx_data.inserts().find(|(id, _)| **id == ST_COLUMN_ID).unwrap(); assert_eq!(&**inserts, [to_product(&columns[1])].as_slice()); - let (_, truncated, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap(); + let (_, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap(); assert_eq!(&**deletes, [to_product(&columns_original[1])].as_slice()); - assert_eq!(truncated, TxTableTruncated::No); + assert!( + !tx_data.truncates().contains(&ST_COLUMN_ID), + "table should not be truncated" + ); // Check that we can successfully scan using the new schema type post commit. let tx = begin_tx(&datastore); diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index 2dfadc77900..d6fc69d57fb 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -8,7 +8,7 @@ use super::system_tables::ModuleKind; use super::Result; use crate::execution_context::{ReducerContext, Workload}; use crate::system_tables::ST_TABLE_ID; -use spacetimedb_data_structures::map::IntMap; +use spacetimedb_data_structures::map::{IntMap, IntSet}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{hash_bytes, Identity}; use spacetimedb_primitives::*; @@ -175,9 +175,13 @@ pub struct TxData { /// The inserted rows per table. inserts: BTreeMap>, /// The deleted rows per table. + deletes: BTreeMap>, + /// *Truncating* means that all rows in the table have been deleted. + /// In other words, a truncated table is a cleared table. /// - /// Also stores per table whether it has been truncated. - deletes: BTreeMap, + /// Note that when a table has an entry in `truncates`, + /// it will also have an entry in `deletes`. + truncates: IntSet, /// Map of all `TableId`s in both `inserts` and `deletes` to their /// corresponding table name. // TODO: Store table name as ref counted string. @@ -189,24 +193,6 @@ pub struct TxData { tx_offset: Option, } -/// A record of a list of deletes for and potential truncation of a table, -/// within a transaction. -pub struct TxDeleteEntry { - /// Were all rows previously in the table deleted within this transaction? - truncated: TxTableTruncated, - /// The deleted rows of the table. - rows: Arc<[ProductValue]>, -} - -/// Whether a table was truncated in a transaction. -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] -pub enum TxTableTruncated { - /// It was truncated. - Yes, - /// It was not truncated. - No, -} - impl TxData { /// Set `tx_offset` as the expected on-disk transaction offset of this transaction. pub fn set_tx_offset(&mut self, tx_offset: u64) { @@ -231,23 +217,15 @@ impl TxData { /// Set `rows` as the deleted rows for `(table_id, table_name)`. /// /// When `truncated` is set, the table has been emptied in this transaction. - pub fn set_deletes_for_table( - &mut self, - table_id: TableId, - table_name: &str, - rows: Arc<[ProductValue]>, - truncated: bool, - ) { - let truncated = if truncated { - TxTableTruncated::Yes - } else { - TxTableTruncated::No - }; - let entry = TxDeleteEntry { truncated, rows }; - self.deletes.insert(table_id, entry); + pub fn set_deletes_for_table(&mut self, table_id: TableId, table_name: &str, rows: Arc<[ProductValue]>) { + self.deletes.insert(table_id, rows); self.tables.entry(table_id).or_insert_with(|| table_name.to_owned()); } + pub fn add_truncates(&mut self, truncated_tables: impl IntoIterator) { + self.truncates.extend(truncated_tables); + } + /// Obtain an iterator over the inserted rows per table. pub fn inserts(&self) -> impl Iterator)> + '_ { self.inserts.iter() @@ -273,15 +251,13 @@ impl TxData { } /// Obtain an iterator over the deleted rows per table. - pub fn deletes(&self) -> impl Iterator)> + '_ { - self.deletes - .iter() - .map(|(table_id, entry)| (table_id, entry.truncated, &entry.rows)) + pub fn deletes(&self) -> impl Iterator)> + '_ { + self.deletes.iter() } /// Get the `i`th deleted row for `table_id` if it exists pub fn get_ith_delete(&self, table_id: TableId, i: usize) -> Option<&ProductValue> { - self.deletes.get(&table_id).and_then(|entry| entry.rows.get(i)) + self.deletes.get(&table_id).and_then(|rows| rows.get(i)) } /// Obtain an iterator over the inserted rows per table. @@ -289,15 +265,19 @@ impl TxData { /// If you don't need access to the table name, [`Self::deletes`] is /// slightly more efficient. pub fn deletes_with_table_name(&self) -> impl Iterator)> + '_ { - self.deletes.iter().map(|(table_id, entry)| { + self.deletes.iter().map(|(table_id, rows)| { let table_name = self .tables .get(table_id) .expect("invalid `TxData`: partial table name mapping"); - (table_id, table_name.as_str(), &entry.rows) + (table_id, table_name.as_str(), rows) }) } + pub fn truncates(&self) -> impl Iterator + '_ { + self.truncates.iter().copied() + } + /// Check if this [`TxData`] contains any `inserted | deleted` rows or `connect/disconnect` operations. /// /// This is used to determine if a transaction should be written to disk.