Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use crate::util::{ClientTransmitter, ResultExt};
use crate::webhook::{
AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
};
use crate::{AppendWebhookError, ExecuteContext, TimestampProvider, catalog, metrics};
use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};

use super::ExecuteContextExtra;

Expand Down Expand Up @@ -1221,7 +1221,7 @@ impl Coordinator {
// after its creation might see input changes that happened after the CRATE MATERIALIZED
// VIEW statement returned.
let oracle_timestamp = timestamp;
let least_valid_read = self.least_valid_read(&read_holds);
let least_valid_read = read_holds.least_valid_read();
timestamp.advance_by(least_valid_read.borrow());

if oracle_timestamp != timestamp {
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use crate::session::{Session, Transaction, TransactionOps};
use crate::statement_logging::StatementEndedExecutionReason;
use crate::telemetry::{EventDetails, SegmentClientExt};
use crate::util::ResultExt;
use crate::{AdapterError, ExecuteContext, TimestampProvider, catalog, flags};
use crate::{AdapterError, ExecuteContext, catalog, flags};

impl Coordinator {
/// Same as [`Self::catalog_transact_conn`] but takes a [`Session`].
Expand Down Expand Up @@ -1280,7 +1280,7 @@ impl Coordinator {
// TODO: Maybe in the future, pass those holds on to storage, to hold on
// to them and downgrade when possible?
let read_holds = self.acquire_read_holds(&id_bundle);
let as_of = self.least_valid_read(&read_holds);
let as_of = read_holds.least_valid_read();

let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/coord/sequencer/inner/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::explain::optimizer_trace::OptimizerTrace;
use crate::optimize::dataflows::dataflow_import_id_bundle;
use crate::optimize::{self, Optimize};
use crate::session::Session;
use crate::{AdapterNotice, ExecuteContext, TimestampProvider, catalog};
use crate::{AdapterNotice, ExecuteContext, catalog};

impl Staged for CreateIndexStage {
type Ctx = ExecuteContext;
Expand Down Expand Up @@ -484,7 +484,7 @@ impl Coordinator {
// TODO: Maybe in the future, pass those holds on to compute, to
// hold on to them and downgrade when possible?
let read_holds = coord.acquire_read_holds(&id_bundle);
let since = coord.least_valid_read(&read_holds);
let since = read_holds.least_valid_read();
df_desc.set_as_of(since);

coord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ impl Coordinator {

// For non-REFRESH MVs both the `dataflow_as_of` and the `storage_as_of` should be simply
// `least_valid_read`.
let least_valid_read = self.least_valid_read(read_holds);
let least_valid_read = read_holds.least_valid_read();
let mut dataflow_as_of = least_valid_read.clone();
let mut storage_as_of = least_valid_read.clone();

Expand Down
17 changes: 4 additions & 13 deletions src/adapter/src/coord/timestamp_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ pub trait TimestampProvider {
}

/// Returns true if-and-only-if the given configuration needs a linearized
/// read timetamp from a timestamp oracle.
/// read timestamp from a timestamp oracle.
///
/// This assumes that the query happens in the context of a timeline. If
/// there is no timeline, we cannot and don't have to get a linearized read
/// timestamp.
fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
// When we're in the context of a timline (assumption) and one of these
// When we're in the context of a timeline (assumption) and one of these
// scenarios hold, we need to use a linearized read timestamp:
// - The isolation level is Strict Serializable and the `when` allows us to use the
// the timestamp oracle (ex: queries with no AS OF).
Expand Down Expand Up @@ -436,7 +436,7 @@ pub trait TimestampProvider {
// TODO: Refine the detail about which identifiers are binding and which are not.
// TODO(dov): It's not entirely clear to me that there ever would be a non
// binding constraint introduced by the `id_bundle`. We should revisit this.
let since = self.least_valid_read(read_holds);
let since = read_holds.least_valid_read();
let storage = id_bundle
.storage_ids
.iter()
Expand Down Expand Up @@ -625,7 +625,7 @@ pub trait TimestampProvider {
let read_holds = self.acquire_read_holds(id_bundle);
let timeline = Self::get_timeline(timeline_context);

let since = self.least_valid_read(&read_holds);
let since = read_holds.least_valid_read();
let upper = self.least_valid_write(id_bundle);
let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);

Expand Down Expand Up @@ -734,15 +734,6 @@ pub trait TimestampProvider {
Ok((determination, read_holds))
}

/// The smallest common valid read frontier among times in the given
/// [ReadHolds].
fn least_valid_read(
&self,
read_holds: &ReadHolds<mz_repr::Timestamp>,
) -> Antichain<mz_repr::Timestamp> {
read_holds.least_valid_read()
}

/// Acquires [ReadHolds], for the given `id_bundle` at the earliest possible
/// times.
fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
Expand Down