Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 117 additions & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,9 +821,18 @@ impl RelationalDB {
Txdata,
};

let is_not_ephemeral_table = |table_id: &TableId| -> bool {
tx_data
.ephemeral_tables()
.map(|etables| !etables.contains(table_id))
.unwrap_or(true)
};

if tx_data.tx_offset().is_some() {
let inserts: Box<_> = tx_data
.inserts()
// Skip ephemeral tables
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
Expand All @@ -834,6 +843,7 @@ impl RelationalDB {

let deletes: Box<_> = tx_data
.deletes()
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
Expand All @@ -842,6 +852,8 @@ impl RelationalDB {
.filter(|ops| !truncates.contains(&ops.table_id))
.collect();

let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect();

let inputs = reducer_context.map(|rcx| rcx.into());

let txdata = Txdata {
Expand All @@ -850,7 +862,7 @@ impl RelationalDB {
mutations: Some(Mutations {
inserts,
deletes,
truncates: truncates.into_iter().collect(),
truncates,
}),
};

Expand Down Expand Up @@ -2408,6 +2420,27 @@ mod tests {
TableSchema::from_module_def(&def, table, (), TableId::SENTINEL)
}

fn view_module_def() -> ModuleDef {
let mut builder = RawModuleDefV9Builder::new();

let return_type_ref = builder.add_algebraic_type(
[],
"my_view_return_type",
AlgebraicType::product([("b", AlgebraicType::U8)]),
true,
);
builder.add_view(
"my_view",
0,
true,
false,
ProductType::unit(),
AlgebraicType::array(AlgebraicType::Ref(return_type_ref)),
);
let raw = builder.finish();
raw.try_into().expect("table validation failed")
}

fn table_auto_inc() -> TableSchema {
table(
"MyTable",
Expand Down Expand Up @@ -2492,6 +2525,89 @@ mod tests {
Ok(())
}

fn setup_view(stdb: &TestDB) -> ResultTest<(ViewId, TableId, ModuleDef, ViewDef)> {
let module_def = view_module_def();
let view_def = module_def.view("my_view").unwrap();

let mut tx = begin_mut_tx(stdb);
let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
stdb.commit_tx(tx)?;

Ok((view_id, table_id, module_def.clone(), view_def.clone()))
}

fn insert_view_row(
stdb: &TestDB,
view_id: ViewId,
table_id: TableId,
typespace: &Typespace,
row_type: AlgebraicTypeRef,
sender: Identity,
v: u8,
) -> ResultTest<()> {
let to_bsatn = |pv: &ProductValue| {
Bytes::from(bsatn::to_vec(&AlgebraicValue::Array([pv.clone()].into())).expect("bstan serialization failed"))
};

let row_pv = |v: u8| product![v];

let mut tx = begin_mut_tx(stdb);
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
stdb.materialize_view(&mut tx, table_id, sender, row_type, to_bsatn(&row_pv(v)), typespace)?;
stdb.commit_tx(tx)?;

Ok(())
}

fn project_views(stdb: &TestDB, table_id: TableId, sender: Identity) -> Vec<ProductValue> {
let tx = begin_tx(stdb);

stdb.iter_by_col_eq(&tx, table_id, 0, &sender.into())
.unwrap()
.map(|row| {
let pv = row.to_product_value();
ProductValue {
elements: pv.elements.iter().skip(1).cloned().collect(),
}
})
.collect()
}

#[test]
fn test_view_tables_are_ephemeral() -> ResultTest<()> {
let stdb = TestDB::durable()?;

let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?;
let row_type = view_def.product_type_ref;
let typespace = module_def.typespace();

// Write some rows (reusing the same helper)
insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ONE, 10)?;
insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ZERO, 20)?;

assert!(
!project_views(&stdb, table_id, Identity::ZERO).is_empty(),
"View table should NOT be empty after insert"
);

// Reopen the database — view tables must not persist
let stdb = stdb.reopen()?;

// Validate that the view's backing table has been removed
assert!(
project_views(&stdb, table_id, Identity::ZERO).is_empty(),
"View table should be empty after reopening the database"
);

let tx = begin_mut_tx(&stdb);
let subs_rows = tx.lookup_st_view_subs(view_id)?;
assert!(
subs_rows.is_empty(),
"st_view_subs should be empty after reopening the database"
);
Ok(())
}

#[test]
fn test_table_name() -> ResultTest<()> {
let stdb = TestDB::durable()?;
Expand Down
4 changes: 3 additions & 1 deletion crates/datastore/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::system_tables::SystemTable;
use enum_as_inner::EnumAsInner;
use spacetimedb_lib::db::raw_def::{v9::RawSql, RawIndexDefV8};
use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId, TableId};
use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId, TableId, ViewId};
use spacetimedb_sats::buffer::DecodeError;
use spacetimedb_sats::{product_value::InvalidFieldError, satn::Satn};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue};
Expand Down Expand Up @@ -41,6 +41,8 @@ pub enum DatastoreError {
pub enum ViewError {
#[error("view '{0}' not found")]
NotFound(Box<str>),
#[error("Table backing View '{0}' not found")]
TableNotFound(ViewId),
#[error("failed to deserialize view arguments from row")]
DeserializeArgs,
#[error("failed to deserialize view return value: {0}")]
Expand Down
51 changes: 48 additions & 3 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ use super::{
};
use crate::{
db_metrics::DB_METRICS,
error::{DatastoreError, IndexError, TableError},
error::{DatastoreError, IndexError, TableError, ViewError},
execution_context::ExecutionContext,
locking_tx_datastore::{mut_tx::ViewReadSets, state_view::iter_st_column_for_table},
system_tables::{
system_tables, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, StSequenceRow, StTableFields,
StTableRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME,
StTableRow, StViewRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME,
ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME,
ST_MODULE_ID, ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID,
ST_SCHEDULED_IDX, ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID,
ST_VAR_IDX, ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX,
},
traits::TxData,
traits::{EphemeralTables, TxData},
};
use crate::{
locking_tx_datastore::ViewCallInfo,
Expand Down Expand Up @@ -80,6 +80,12 @@ pub struct CommittedState {
/// Any overlap will trigger a re-evaluation of the affected view,
/// and its read set will be updated accordingly.
read_sets: ViewReadSets,

/// Tables which do not need to be made persistent.
/// These include:
/// - system tables: `st_view_sub`, `st_view_arg`
/// - Tables which back views.
pub(super) ephemeral_tables: EphemeralTables,
}

impl CommittedState {
Expand All @@ -99,6 +105,7 @@ impl MemoryUsage for CommittedState {
page_pool: _,
table_dropped,
read_sets,
ephemeral_tables,
} = self;
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
next_tx_offset.heap_usage()
Expand All @@ -107,6 +114,7 @@ impl MemoryUsage for CommittedState {
+ index_id_map.heap_usage()
+ table_dropped.heap_usage()
+ read_sets.heap_usage()
+ ephemeral_tables.heap_usage()
}
}

Expand Down Expand Up @@ -171,6 +179,7 @@ impl CommittedState {
table_dropped: <_>::default(),
read_sets: <_>::default(),
page_pool,
ephemeral_tables: <_>::default(),
}
}

Expand Down Expand Up @@ -518,6 +527,32 @@ impl CommittedState {
Ok(())
}

pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> {
self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect();
Ok(())
}

fn ephemeral_tables(&self) -> Result<Vec<TableId>> {
let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID];

let Some(st_view) = self.tables.get(&ST_VIEW_ID) else {
return Ok(tables);
};
let backing_tables = st_view
.scan_rows(&self.blob_store)
.map(|row_ref| {
let view_row = StViewRow::try_from(row_ref)?;
view_row
.table_id
.ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_row.view_id)))
})
.collect::<Result<Vec<_>>>()?;

tables.extend(backing_tables);

Ok(tables)
}

/// After replaying all old transactions,
/// inserts and deletes into the system tables
/// might not be reflected in the schemas of the built tables.
Expand Down Expand Up @@ -675,6 +710,8 @@ impl CommittedState {
self.next_tx_offset += 1;
}

tx_data.set_ephemeral_tables(&self.ephemeral_tables);

tx_data
}

Expand Down Expand Up @@ -847,17 +884,25 @@ impl CommittedState {
}
// A table was removed. Add it back.
TableRemoved(table_id, table) => {
let is_view_table = table.schema.is_view();
// We don't need to deal with sub-components.
// That is, we don't need to add back indices and such.
// Instead, there will be separate pending schema changes like `IndexRemoved`.
self.tables.insert(table_id, table);

// Incase, the table was ephemeral, add it back to that set as well.
if is_view_table {
self.ephemeral_tables.insert(table_id);
}
}
// A table was added. Remove it.
TableAdded(table_id) => {
// We don't need to deal with sub-components.
// That is, we don't need to remove indices and such.
// Instead, there will be separate pending schema changes like `IndexAdded`.
self.tables.remove(&table_id);
// Incase, the table was ephemeral, remove it from that set as well.
self.ephemeral_tables.remove(&table_id);
}
// A table's access was changed. Change back to the old one.
TableAlterAccess(table_id, access) => {
Expand Down
2 changes: 2 additions & 0 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ impl Locking {
committed_state.build_indexes()?;
// Figure out where to pick up for each sequence.
*self.sequence_state.lock() = committed_state.build_sequence_state()?;

committed_state.collect_ephemeral_tables()?;
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ impl MutTxId {
self.insert_into_st_view_param(view_id, param_columns)?;
self.insert_into_st_view_column(view_id, return_columns)?;

self.committed_state_write_lock.ephemeral_tables.insert(table_id);

Ok((view_id, table_id))
}

Expand Down
26 changes: 26 additions & 0 deletions crates/datastore/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ pub enum IsolationLevel {
Serializable,
}

pub type EphemeralTables = IntSet<TableId>;

/// A record of all the operations within a transaction.
///
/// Some extra information is embedded here
Expand All @@ -191,6 +193,11 @@ pub struct TxData {
/// `None` implies that `inserts` and `deletes` are both empty,
/// but `Some` does not necessarily imply that either is non-empty.
tx_offset: Option<u64>,

/// Set of ephemeral tables modified in this transaction (only populated when a view is executed).
/// These tables do not need to be persisted to disk.
/// Every table listed here must appear in either `inserts` or `deletes`.
ephemeral_tables: Option<EphemeralTables>,
}

impl TxData {
Expand Down Expand Up @@ -226,6 +233,25 @@ impl TxData {
self.truncates.extend(truncated_tables);
}

/// Determines which ephemeral tables were modified in this transaction.
///
/// Iterates over the tables updated in this transaction and records those that
/// also appear in `all_ephemeral_tables`.
/// `self.ephemeral_tables` remains `None` if no ephemeral tables were modified.
pub fn set_ephemeral_tables(&mut self, all_ephemeral_tables: &EphemeralTables) {
for tid in self.tables.keys() {
if all_ephemeral_tables.contains(tid) {
self.ephemeral_tables
.get_or_insert_with(EphemeralTables::default)
.insert(*tid);
}
}
}

pub fn ephemeral_tables(&self) -> Option<&EphemeralTables> {
self.ephemeral_tables.as_ref()
}

/// Obtain an iterator over the inserted rows per table.
pub fn inserts(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
self.inserts.iter()
Expand Down
Loading