diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index f6046af85736b..b9c9b5ddbf8e3 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -72,7 +72,7 @@ use crate::util::{ClientTransmitter, ResultExt}; use crate::webhook::{ AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator, }; -use crate::{AppendWebhookError, ExecuteContext, TimestampProvider, catalog, metrics}; +use crate::{AppendWebhookError, ExecuteContext, catalog, metrics}; use super::ExecuteContextExtra; @@ -1221,7 +1221,7 @@ impl Coordinator { // after its creation might see input changes that happened after the CRATE MATERIALIZED // VIEW statement returned. let oracle_timestamp = timestamp; - let least_valid_read = self.least_valid_read(&read_holds); + let least_valid_read = read_holds.least_valid_read(); timestamp.advance_by(least_valid_read.borrow()); if oracle_timestamp != timestamp { diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index e7701193319c4..01e5e97f1379e 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -64,7 +64,7 @@ use crate::session::{Session, Transaction, TransactionOps}; use crate::statement_logging::StatementEndedExecutionReason; use crate::telemetry::{EventDetails, SegmentClientExt}; use crate::util::ResultExt; -use crate::{AdapterError, ExecuteContext, TimestampProvider, catalog, flags}; +use crate::{AdapterError, ExecuteContext, catalog, flags}; impl Coordinator { /// Same as [`Self::catalog_transact_conn`] but takes a [`Session`]. @@ -1280,7 +1280,7 @@ impl Coordinator { // TODO: Maybe in the future, pass those holds on to storage, to hold on // to them and downgrade when possible? let read_holds = self.acquire_read_holds(&id_bundle); - let as_of = self.least_valid_read(&read_holds); + let as_of = read_holds.least_valid_read(); let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from); let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc { diff --git a/src/adapter/src/coord/sequencer/inner/create_index.rs b/src/adapter/src/coord/sequencer/inner/create_index.rs index 27157de25f990..7abbc8dcaefc4 100644 --- a/src/adapter/src/coord/sequencer/inner/create_index.rs +++ b/src/adapter/src/coord/sequencer/inner/create_index.rs @@ -33,7 +33,7 @@ use crate::explain::optimizer_trace::OptimizerTrace; use crate::optimize::dataflows::dataflow_import_id_bundle; use crate::optimize::{self, Optimize}; use crate::session::Session; -use crate::{AdapterNotice, ExecuteContext, TimestampProvider, catalog}; +use crate::{AdapterNotice, ExecuteContext, catalog}; impl Staged for CreateIndexStage { type Ctx = ExecuteContext; @@ -484,7 +484,7 @@ impl Coordinator { // TODO: Maybe in the future, pass those holds on to compute, to // hold on to them and downgrade when possible? let read_holds = coord.acquire_read_holds(&id_bundle); - let since = coord.least_valid_read(&read_holds); + let since = read_holds.least_valid_read(); df_desc.set_as_of(since); coord diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index 7c04cfab56050..5b69a3d72a8af 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -762,7 +762,7 @@ impl Coordinator { // For non-REFRESH MVs both the `dataflow_as_of` and the `storage_as_of` should be simply // `least_valid_read`. - let least_valid_read = self.least_valid_read(read_holds); + let least_valid_read = read_holds.least_valid_read(); let mut dataflow_as_of = least_valid_read.clone(); let mut storage_as_of = least_valid_read.clone(); diff --git a/src/adapter/src/coord/timestamp_selection.rs b/src/adapter/src/coord/timestamp_selection.rs index f63cd7bcb7259..fd966dbc573d7 100644 --- a/src/adapter/src/coord/timestamp_selection.rs +++ b/src/adapter/src/coord/timestamp_selection.rs @@ -228,13 +228,13 @@ pub trait TimestampProvider { } /// Returns true if-and-only-if the given configuration needs a linearized - /// read timetamp from a timestamp oracle. + /// read timestamp from a timestamp oracle. /// /// This assumes that the query happens in the context of a timeline. If /// there is no timeline, we cannot and don't have to get a linearized read /// timestamp. fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool { - // When we're in the context of a timline (assumption) and one of these + // When we're in the context of a timeline (assumption) and one of these // scenarios hold, we need to use a linearized read timestamp: // - The isolation level is Strict Serializable and the `when` allows us to use the // the timestamp oracle (ex: queries with no AS OF). @@ -436,7 +436,7 @@ pub trait TimestampProvider { // TODO: Refine the detail about which identifiers are binding and which are not. // TODO(dov): It's not entirely clear to me that there ever would be a non // binding constraint introduced by the `id_bundle`. We should revisit this. - let since = self.least_valid_read(read_holds); + let since = read_holds.least_valid_read(); let storage = id_bundle .storage_ids .iter() @@ -625,7 +625,7 @@ pub trait TimestampProvider { let read_holds = self.acquire_read_holds(id_bundle); let timeline = Self::get_timeline(timeline_context); - let since = self.least_valid_read(&read_holds); + let since = read_holds.least_valid_read(); let upper = self.least_valid_write(id_bundle); let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper); @@ -734,15 +734,6 @@ pub trait TimestampProvider { Ok((determination, read_holds)) } - /// The smallest common valid read frontier among times in the given - /// [ReadHolds]. - fn least_valid_read( - &self, - read_holds: &ReadHolds, - ) -> Antichain { - read_holds.least_valid_read() - } - /// Acquires [ReadHolds], for the given `id_bundle` at the earliest possible /// times. fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds;