From 00b146984c4aa8fb251bf96ae17cdce1e9531a1a Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Wed, 11 Sep 2024 15:28:20 +0200 Subject: [PATCH 1/6] adapter: simplify read policy initialization This commit adds `initialize_{storage,compute}_read_policy` methods that work like `initialize_{storage,compute}_read_policies` but operate on a single collection only. It also remove the generic `initialize_read_policies` method. The next commit will change bootstrapping to initialize collection read policies while iterating over the catalog objects, rather than at a separate step, so the bulk APIs are no longer useful there. Most other places that initialize read polcies only do so for a single collection as well. Apart from simplifying things, this also prepares the read policy API for getting read holds passed in by the caller. Previously that was cumbersome because the `CollectionIdBundle` type wouldn't allow passing additional per-collection data. With the new methods, we can simply pass a `ReadHold` instead of the collection's `GlobalId`. --- src/adapter/src/coord.rs | 62 +++----- src/adapter/src/coord/peek.rs | 6 +- src/adapter/src/coord/read_policy.rs | 150 ++++++++---------- src/adapter/src/coord/sequencer/cluster.rs | 10 +- src/adapter/src/coord/sequencer/inner.rs | 16 +- .../inner/create_materialized_view.rs | 5 +- 6 files changed, 102 insertions(+), 147 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 08c7e8c1cad53..49c68f619f206 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1810,7 +1810,8 @@ impl Coordinator { self.controller .set_arrangement_exert_proportionality(exert_prop); - let mut policies_to_set: BTreeMap = + let mut storage_policies: BTreeMap = Default::default(); + let mut compute_policies: BTreeMap = Default::default(); let enable_worker_core_affinity = @@ -1930,40 +1931,29 @@ impl Coordinator { ); } } - policies_to_set - .entry(policy.expect("sources have a compaction window")) - .or_insert_with(Default::default) - .storage_ids - .insert(entry.id()); + let cw = policy.expect("sources have a compaction window"); + storage_policies.insert(entry.id, cw); } CatalogItem::Table(_) => { - policies_to_set - .entry(policy.expect("tables have a compaction window")) - .or_insert_with(Default::default) - .storage_ids - .insert(entry.id()); + let cw = policy.expect("tables have a compaction window"); + storage_policies.insert(entry.id, cw); } CatalogItem::Index(idx) => { - let policy_entry = policies_to_set - .entry(policy.expect("indexes have a compaction window")) - .or_insert_with(Default::default); + let id = entry.id; + let cw = policy.expect("indexes have a compaction window"); if logs.contains(&idx.on) { - policy_entry - .compute_ids - .entry(idx.cluster_id) - .or_insert_with(BTreeSet::new) - .insert(entry.id()); + compute_policies.insert(id, (idx.cluster_id, cw)); } else { let df_desc = self .catalog() - .try_get_physical_plan(&entry.id()) + .try_get_physical_plan(&id) .expect("added in `bootstrap_dataflow_plans`") .clone(); let df_meta = self .catalog() - .try_get_dataflow_metainfo(&entry.id()) + .try_get_dataflow_metainfo(&id) .expect("added in `bootstrap_dataflow_plans`"); if self.catalog().state().system_config().enable_mz_notices() { @@ -1977,11 +1967,7 @@ impl Coordinator { // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`, // but we cannot call that as it will also downgrade the read hold on the index. - policy_entry - .compute_ids - .entry(idx.cluster_id) - .or_insert_with(Default::default) - .extend(df_desc.export_ids()); + compute_policies.insert(id, (idx.cluster_id, cw)); self.controller .compute @@ -1991,11 +1977,8 @@ impl Coordinator { } CatalogItem::View(_) => (), CatalogItem::MaterializedView(mview) => { - policies_to_set - .entry(policy.expect("materialized views have a compaction window")) - .or_insert_with(Default::default) - .storage_ids - .insert(entry.id()); + let cw = policy.expect("materialized views have a compaction window"); + storage_policies.insert(entry.id, cw); let mut df_desc = self .catalog() @@ -2081,9 +2064,12 @@ impl Coordinator { // Having installed all entries, creating all constraints, we can now drop read holds and // relax read policies. drop(dataflow_read_holds); - // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop. - for (cw, policies) in policies_to_set { - self.initialize_read_policies(&policies, cw).await; + for (id, cw) in storage_policies { + self.initialize_storage_read_policy(id, cw).await; + } + for (id, (cluster_id, cw)) in compute_policies { + self.initialize_compute_read_policy(id, cluster_id, cw) + .await; } // Expose mapping from T-shirt sizes to actual sizes @@ -3060,15 +3046,17 @@ impl Coordinator { ) { // We must only install read policies for indexes, not for sinks. // Sinks are write-only compute collections that don't have read policies. - let export_ids = dataflow.exported_index_ids().collect(); + let export_ids: Vec<_> = dataflow.exported_index_ids().collect(); self.controller .compute .create_dataflow(instance, dataflow, subscribe_target_replica) .unwrap_or_terminate("dataflow creation cannot fail"); - self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default) - .await; + for id in export_ids { + self.initialize_compute_read_policy(id, instance, CompactionWindow::Default) + .await; + } } /// Like `ship_dataflow`, but also await on builtin table updates. diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 7229d6b129bac..1057d927d0968 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -562,15 +562,13 @@ impl crate::coord::Coordinator { permutation: index_permutation, thinned_arity: index_thinned_arity, }) => { - let output_ids = dataflow.export_ids().collect(); - // Very important: actually create the dataflow (here, so we can destructure). self.controller .compute .create_dataflow(compute_instance, dataflow, None) .unwrap_or_terminate("cannot fail to create dataflows"); - self.initialize_compute_read_policies( - output_ids, + self.initialize_compute_read_policy( + index_id, compute_instance, // Disable compaction so that nothing can compact before the peek occurs below. CompactionWindow::DisableCompaction, diff --git a/src/adapter/src/coord/read_policy.rs b/src/adapter/src/coord/read_policy.rs index 5b2e300256b67..bf89e6ccd11bd 100644 --- a/src/adapter/src/coord/read_policy.rs +++ b/src/adapter/src/coord/read_policy.rs @@ -14,14 +14,13 @@ //! the controller from compacting the associated collections, and ensures that they //! remain "readable" at a specific time, as long as the hold is held. -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::fmt::Debug; use differential_dataflow::lattice::Lattice; use itertools::Itertools; use mz_adapter_types::compaction::CompactionWindow; use mz_compute_types::ComputeInstanceId; -use mz_ore::instrument; use mz_repr::{GlobalId, Timestamp}; use mz_sql::session::metadata::SessionMetadata; use mz_storage_types::read_holds::ReadHold; @@ -95,6 +94,16 @@ impl ReadHolds { } } + fn insert_storage_collection(&mut self, id: GlobalId, hold: ReadHold) { + let prev = self.storage_holds.insert(id, hold); + assert!(prev.is_none(), "duplicate storage read hold: {id}"); + } + + fn insert_compute_collection(&mut self, id: (ComputeInstanceId, GlobalId), hold: ReadHold) { + let prev = self.compute_holds.insert(id, hold); + assert!(prev.is_none(), "duplicate compute read hold: {id:?}"); + } + pub fn remove_storage_collection(&mut self, id: GlobalId) { self.storage_holds.remove(&id); } @@ -166,24 +175,6 @@ impl ReadHolds { } } } - - /// Extend the contained read holds with those in `other`. - /// - /// - /// # Panics - /// - /// In contrast to [`ReadHolds::merge`], this method expects the collection - /// IDs in `self` and `other` to be distinct and panics otherwise. - fn extend(&mut self, other: Self) { - for (id, other_hold) in other.storage_holds { - let prev = self.storage_holds.insert(id, other_hold); - assert!(prev.is_none(), "duplicate storage read hold: {id}"); - } - for (id, other_hold) in other.compute_holds { - let prev = self.compute_holds.insert(id, other_hold); - assert!(prev.is_none(), "duplicate compute read hold: {id:?}"); - } - } } impl Default for ReadHolds { @@ -193,94 +184,81 @@ impl Default for ReadHolds { } impl crate::coord::Coordinator { - /// Initialize the storage read policies. + /// Initialize the read policy for a storage collection. /// /// This should be called only after a storage collection is created, and /// ideally very soon afterwards. The collection is otherwise initialized /// with a read policy that allows no compaction. - pub(crate) async fn initialize_storage_read_policies( + pub(crate) async fn initialize_storage_read_policy( &mut self, - ids: BTreeSet, + id: GlobalId, compaction_window: CompactionWindow, ) { - self.initialize_read_policies( - &CollectionIdBundle { - storage_ids: ids, - compute_ids: BTreeMap::new(), - }, - compaction_window, - ) - .await; + // Install read hold in the Coordinator's timeline state. + if let TimelineContext::TimelineDependent(timeline) = self.get_timeline_context(id) { + let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await; + let read_ts = oracle.read_ts().await; + + let mut read_hold = self + .controller + .storage + .acquire_read_hold(id) + .expect("missing storage collection"); + let _ = read_hold.try_downgrade(Antichain::from_elem(read_ts)); + + let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await; + read_holds.insert_storage_collection(id, read_hold); + }; + + // Install read policy. + self.controller + .storage + .set_read_policy(vec![(id, compaction_window.into())]); } - /// Initialize the compute read policies. - /// - /// This should be called only after a compute collection is created, and - /// ideally very soon afterwards. The collection is otherwise initialized - /// with a read policy that allows no compaction. - pub(crate) async fn initialize_compute_read_policies( + pub(crate) async fn initialize_storage_read_policies( &mut self, - ids: Vec, - instance: ComputeInstanceId, + ids: impl IntoIterator, compaction_window: CompactionWindow, ) { - let mut compute_ids: BTreeMap<_, BTreeSet<_>> = BTreeMap::new(); - compute_ids.insert(instance, ids.into_iter().collect()); - self.initialize_read_policies( - &CollectionIdBundle { - storage_ids: BTreeSet::new(), - compute_ids, - }, - compaction_window, - ) - .await; + for id in ids { + self.initialize_storage_read_policy(id, compaction_window) + .await; + } } - /// Initialize the storage and compute read policies. + /// Initialize the read policy for a compute collection. /// - /// This should be called only after a collection is created, and + /// This should be called only after a compute collection is created, and /// ideally very soon afterwards. The collection is otherwise initialized /// with a read policy that allows no compaction. - #[instrument(name = "coord::initialize_read_policies")] - pub(crate) async fn initialize_read_policies( + pub(crate) async fn initialize_compute_read_policy( &mut self, - id_bundle: &CollectionIdBundle, + id: GlobalId, + instance: ComputeInstanceId, compaction_window: CompactionWindow, ) { - // Install read holds in the Coordinator's timeline state. - for (timeline_context, id_bundle) in self.partition_ids_by_timeline_context(id_bundle) { - if let TimelineContext::TimelineDependent(timeline) = timeline_context { - let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await; - let read_ts = oracle.read_ts().await; + // Install read hold in the Coordinator's timeline state. + if let TimelineContext::TimelineDependent(timeline) = self.get_timeline_context(id) { + let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await; + let read_ts = oracle.read_ts().await; - let mut new_read_holds = self.acquire_read_holds(&id_bundle); - new_read_holds.downgrade(read_ts); - - let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await; - read_holds.extend(new_read_holds); - } - } - - // Install read policies. - let read_policy = ReadPolicy::from(compaction_window); - - let storage_policies = id_bundle - .storage_ids - .iter() - .map(|id| (*id, read_policy.clone())) - .collect(); - self.controller.storage.set_read_policy(storage_policies); - - for (instance_id, collection_ids) in &id_bundle.compute_ids { - let compute_policies = collection_ids - .iter() - .map(|id| (*id, read_policy.clone())) - .collect(); - self.controller + let mut read_hold = self + .controller .compute - .set_read_policy(*instance_id, compute_policies) - .expect("cannot fail to set read policy"); - } + .acquire_read_hold(instance, id) + .expect("missing compute collection"); + let _ = read_hold.try_downgrade(Antichain::from_elem(read_ts)); + + let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await; + read_holds.insert_compute_collection((instance, id), read_hold); + }; + + // Install read policy. + self.controller + .compute + .set_read_policy(instance, vec![(id, compaction_window.into())]) + .expect("cannot fail to set read policy"); } pub(crate) fn update_storage_read_policies( diff --git a/src/adapter/src/coord/sequencer/cluster.rs b/src/adapter/src/coord/sequencer/cluster.rs index 78d539d07123b..b3cf08fa21792 100644 --- a/src/adapter/src/coord/sequencer/cluster.rs +++ b/src/adapter/src/coord/sequencer/cluster.rs @@ -390,13 +390,9 @@ impl Coordinator { self.create_cluster_replica(cluster_id, replica_id).await; } - if !introspection_source_ids.is_empty() { - self.initialize_compute_read_policies( - introspection_source_ids, - cluster_id, - CompactionWindow::Default, - ) - .await; + for id in introspection_source_ids { + self.initialize_compute_read_policy(id, cluster_id, CompactionWindow::Default) + .await; } } diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 802b2207f509b..5c9e9a1d3a3f8 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -19,7 +19,6 @@ use futures::future::BoxFuture; use futures::stream::FuturesOrdered; use futures::{future, Future, FutureExt}; use itertools::Itertools; -use maplit::btreeset; use mz_adapter_types::compaction::CompactionWindow; use mz_cloud_resources::VpcEndpointConfig; use mz_controller_types::ReplicaId; @@ -620,9 +619,9 @@ impl Coordinator { .catalog() .state() .source_compaction_windows(source_ids); - for (compaction_window, storage_policies) in read_policies { + for (compaction_window, ids) in read_policies { coord - .initialize_storage_read_policies(storage_policies, compaction_window) + .initialize_storage_read_policies(ids, compaction_window) .await; } }) @@ -970,8 +969,8 @@ impl Coordinator { coord.apply_local_write(register_ts).await; coord - .initialize_storage_read_policies( - btreeset![table_id], + .initialize_storage_read_policy( + table_id, table .custom_logical_compaction_window .unwrap_or(CompactionWindow::Default), @@ -1020,12 +1019,9 @@ impl Coordinator { .catalog() .state() .source_compaction_windows(vec![table_id]); - for (compaction_window, storage_policies) in read_policies { + for (compaction_window, ids) in read_policies { coord - .initialize_storage_read_policies( - storage_policies, - compaction_window, - ) + .initialize_storage_read_policies(ids, compaction_window) .await; } } diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index 12c1d63efaca5..cc22e48a3b934 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -10,7 +10,6 @@ use anyhow::anyhow; use differential_dataflow::lattice::Lattice; use maplit::btreemap; -use maplit::btreeset; use mz_adapter_types::compaction::CompactionWindow; use mz_catalog::memory::objects::{CatalogItem, MaterializedView}; use mz_expr::{CollectionPlan, ResultSpec}; @@ -686,8 +685,8 @@ impl Coordinator { .unwrap_or_terminate("cannot fail to append"); coord - .initialize_storage_read_policies( - btreeset![sink_id], + .initialize_storage_read_policy( + sink_id, compaction_window.unwrap_or(CompactionWindow::Default), ) .await; From 2c119b051a1f672f86ac2f566447dcc2fbdfcd17 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 12 Sep 2024 16:59:37 +0200 Subject: [PATCH 2/6] coord: collect read holds during bootstrap This commit makes the coordinator collect read holds during bootstrapping. This is necessary so we can pass them to `create_dataflow` calls, but it also has the nice side effect of letting us initialize read policies immediately, simplifying that logic a bit. --- src/adapter/src/coord.rs | 83 +++++++++++++---------- src/compute-client/src/as_of_selection.rs | 37 ++++------ 2 files changed, 62 insertions(+), 58 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 49c68f619f206..04904c7de7c3e 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1810,9 +1810,10 @@ impl Coordinator { self.controller .set_arrangement_exert_proportionality(exert_prop); - let mut storage_policies: BTreeMap = Default::default(); - let mut compute_policies: BTreeMap = - Default::default(); + // Keep read holds for each storage and compute collection we create during bootstrapping. + // We need these to pass copies of them to `create_dataflow` commands, and to ensure + // collections don't get compacted before we have created all their dependants. + let mut read_holds: BTreeMap> = Default::default(); let enable_worker_core_affinity = self.catalog().system_config().enable_worker_core_affinity(); @@ -1834,6 +1835,15 @@ impl Coordinator { enable_worker_core_affinity, )?; } + + for &log_id in instance.log_indexes.values() { + let read_hold = self + .controller + .compute + .acquire_read_hold(instance.id, log_id) + .expect("log index was created with the cluster"); + read_holds.insert(log_id, read_hold); + } } info!( @@ -1843,8 +1853,10 @@ impl Coordinator { let init_storage_collections_start = Instant::now(); info!("startup: coordinator init: bootstrap: storage collections init beginning"); - self.bootstrap_storage_collections(&migrated_storage_collections_0dt) + let storage_holds = self + .bootstrap_storage_collections(&migrated_storage_collections_0dt) .await; + read_holds.extend(storage_holds.into_iter().map(|r| (r.id(), r))); info!( "startup: coordinator init: bootstrap: storage collections init complete in {:?}", init_storage_collections_start.elapsed() @@ -1864,7 +1876,7 @@ impl Coordinator { // `bootstrap_dataflow_plans`. let bootstrap_as_ofs_start = Instant::now(); info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning"); - let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await; + self.bootstrap_dataflow_as_ofs(&read_holds).await; info!( "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}", bootstrap_as_ofs_start.elapsed() @@ -1932,19 +1944,16 @@ impl Coordinator { } } let cw = policy.expect("sources have a compaction window"); - storage_policies.insert(entry.id, cw); + self.initialize_storage_read_policy(entry.id, cw).await; } CatalogItem::Table(_) => { let cw = policy.expect("tables have a compaction window"); - storage_policies.insert(entry.id, cw); + self.initialize_storage_read_policy(entry.id, cw).await; } CatalogItem::Index(idx) => { let id = entry.id; - let cw = policy.expect("indexes have a compaction window"); - if logs.contains(&idx.on) { - compute_policies.insert(id, (idx.cluster_id, cw)); - } else { + if !logs.contains(&idx.on) { let df_desc = self .catalog() .try_get_physical_plan(&id) @@ -1965,20 +1974,27 @@ impl Coordinator { ); } - // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`, - // but we cannot call that as it will also downgrade the read hold on the index. - compute_policies.insert(id, (idx.cluster_id, cw)); - self.controller .compute .create_dataflow(idx.cluster_id, df_desc, None) .unwrap_or_terminate("cannot fail to create dataflows"); + + let read_hold = self + .controller + .compute + .acquire_read_hold(idx.cluster_id, id) + .expect("collection just created"); + read_holds.insert(id, read_hold); } + + let cw = policy.expect("indexes have a compaction window"); + self.initialize_compute_read_policy(id, idx.cluster_id, cw) + .await; } CatalogItem::View(_) => (), CatalogItem::MaterializedView(mview) => { let cw = policy.expect("materialized views have a compaction window"); - storage_policies.insert(entry.id, cw); + self.initialize_storage_read_policy(entry.id, cw).await; let mut df_desc = self .catalog() @@ -2061,16 +2077,8 @@ impl Coordinator { } } - // Having installed all entries, creating all constraints, we can now drop read holds and - // relax read policies. - drop(dataflow_read_holds); - for (id, cw) in storage_policies { - self.initialize_storage_read_policy(id, cw).await; - } - for (id, (cluster_id, cw)) in compute_policies { - self.initialize_compute_read_policy(id, cluster_id, cw) - .await; - } + // Having installed all entries, creating all constraints, we can now drop the read holds. + drop(read_holds); // Expose mapping from T-shirt sizes to actual sizes builtin_table_updates.extend( @@ -2368,7 +2376,7 @@ impl Coordinator { async fn bootstrap_storage_collections( &mut self, migrated_storage_collections: &BTreeSet, - ) { + ) -> Vec> { let catalog = self.catalog(); let source_status_collection_id = catalog .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY); @@ -2461,6 +2469,8 @@ impl Coordinator { }) .collect(); + let collection_ids: Vec<_> = collections.iter().map(|(id, _)| *id).collect(); + let register_ts = if self.controller.read_only() { self.get_local_read_ts().await } else { @@ -2485,6 +2495,11 @@ impl Coordinator { if !self.controller.read_only() { self.apply_local_write(register_ts).await; } + + self.controller + .storage_collections + .acquire_read_holds(collection_ids) + .expect("collections created above") } /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the @@ -2635,14 +2650,13 @@ impl Coordinator { /// Selects for each compute dataflow an as-of suitable for bootstrapping it. /// - /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay - /// in place and that must not be dropped before all compute dataflows have been created with - /// the compute controller. - /// /// This method expects all storage collections and dataflow plans to be available, so it must /// run after [`Coordinator::bootstrap_storage_collections`] and /// [`Coordinator::bootstrap_dataflow_plans`]. - async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap> { + async fn bootstrap_dataflow_as_ofs( + &mut self, + read_holds: &BTreeMap>, + ) { let mut catalog_ids = Vec::new(); let mut dataflows = Vec::new(); let mut read_policies = BTreeMap::new(); @@ -2659,9 +2673,10 @@ impl Coordinator { } let read_ts = self.get_local_read_ts().await; - let read_holds = as_of_selection::run( + as_of_selection::run( &mut dataflows, &read_policies, + read_holds, &*self.controller.storage_collections, read_ts, ); @@ -2670,8 +2685,6 @@ impl Coordinator { for (id, plan) in catalog_ids.into_iter().zip(dataflows) { catalog.set_physical_plan(id, plan); } - - read_holds } /// Serves the coordinator, receiving commands from users over `cmd_rx` diff --git a/src/compute-client/src/as_of_selection.rs b/src/compute-client/src/as_of_selection.rs index fe1b84dc55663..a79dc26cd7405 100644 --- a/src/compute-client/src/as_of_selection.rs +++ b/src/compute-client/src/as_of_selection.rs @@ -84,7 +84,6 @@ use std::rc::Rc; use differential_dataflow::lattice::Lattice; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::plan::Plan; -use mz_ore::collections::CollectionExt; use mz_ore::soft_panic_or_log; use mz_repr::{GlobalId, TimestampManipulation}; use mz_storage_client::storage_collections::StorageCollections; @@ -96,30 +95,18 @@ use tracing::{info, warn}; /// Runs as-of selection for the given dataflows. /// -/// Assigns the selected as-of to the provided dataflow descriptions and returns a set of -/// `ReadHold`s that must not be dropped nor downgraded until the dataflows have been installed -/// with the compute controller. +/// Assigns the selected as-of to the provided dataflow descriptions. +/// +/// The provided `storage_read_holds` must contain an entry for each storage collection imported by +/// any of the provided `dataflows`. After this function returns, the read holds must not be +/// dropped nor downgraded until the dataflows have been installed with the compute controller. pub fn run( dataflows: &mut [DataflowDescription, (), T>], read_policies: &BTreeMap>, + storage_read_holds: &BTreeMap>, storage_collections: &dyn StorageCollections, current_time: T, -) -> BTreeMap> { - // Get read holds for the storage inputs of the dataflows. - // This ensures that storage frontiers don't advance past the selected as-ofs. - let mut storage_read_holds = BTreeMap::new(); - for dataflow in &*dataflows { - for id in dataflow.source_imports.keys() { - if !storage_read_holds.contains_key(id) { - let read_hold = storage_collections - .acquire_read_holds(vec![*id]) - .expect("storage collection exists") - .into_element(); - storage_read_holds.insert(*id, read_hold); - } - } - } - +) { let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time); // Dataflows that sink into a storage collection that has advanced to the empty frontier don't @@ -128,7 +115,7 @@ pub fn run( ctx.prune_sealed_persist_sinks(); // Apply hard constraints from upstream and downstream storage collections. - ctx.apply_upstream_storage_constraints(&storage_read_holds); + ctx.apply_upstream_storage_constraints(storage_read_holds); ctx.apply_downstream_storage_constraints(); // At this point all collections have as-of bounds that reflect what is required for @@ -157,8 +144,6 @@ pub fn run( let as_of = first_export.map_or(Antichain::new(), |id| ctx.best_as_of(id)); dataflow.as_of = Some(as_of); } - - storage_read_holds } /// Bounds for possible as-of values of a dataflow. @@ -1030,9 +1015,15 @@ mod tests { $($( ($policy_id.parse().unwrap(), $policy), )*)? ]); + + let ids = storage_ids.iter().map(|s| s.parse().unwrap()).collect(); + let holds = storage_frontiers.acquire_read_holds(ids).unwrap(); + let storage_read_holds = holds.into_iter().map(|r| (r.id(), r)).collect(); + super::run( &mut dataflows, &read_policies, + &storage_read_holds, &storage_frontiers, $current_time.into(), ); From f6be3059d0e4668e7591274d5942fcf959e5178b Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 12 Sep 2024 13:43:25 +0200 Subject: [PATCH 3/6] compute: expect read holds in controller API This commit changes the compute controller API to expect any required read holds to be passed in by the caller, rather than attempting to acquire them ourselves. This commit does not yet adjust callers to pass in read holds as well. The next commit will deal with that. --- src/compute-client/src/controller.rs | 23 ++- src/compute-client/src/controller/error.rs | 35 ++-- src/compute-client/src/controller/instance.rs | 175 +++++++----------- src/compute-client/src/protocol/command.rs | 10 + 4 files changed, 108 insertions(+), 135 deletions(-) diff --git a/src/compute-client/src/controller.rs b/src/compute-client/src/controller.rs index 0782fa3bf25d5..914badf45bebe 100644 --- a/src/compute-client/src/controller.rs +++ b/src/compute-client/src/controller.rs @@ -709,21 +709,26 @@ where Ok(()) } - /// Create and maintain the described dataflows, and initialize state for their output. + /// Creates the described dataflow and initializes state for its output. /// - /// This method creates dataflows whose inputs are still readable at the dataflow `as_of` - /// frontier, and initializes the outputs as readable from that frontier onward. - /// It installs read dependencies from the outputs to the inputs, so that the input read - /// capabilities will be held back to the output read capabilities, ensuring that we are - /// always able to return to a state that can serve the output read capabilities. + /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as + /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`. + /// + /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are + /// configured to target that replica, i.e., only subscribe responses sent by that replica are + /// considered. pub fn create_dataflow( &mut self, instance_id: ComputeInstanceId, dataflow: DataflowDescription, (), T>, + import_read_holds: Vec>, subscribe_target_replica: Option, ) -> Result<(), DataflowCreationError> { - self.instance_mut(instance_id)? - .create_dataflow(dataflow, subscribe_target_replica)?; + self.instance_mut(instance_id)?.create_dataflow( + dataflow, + import_read_holds, + subscribe_target_replica, + )?; Ok(()) } @@ -749,6 +754,7 @@ where timestamp: T, finishing: RowSetFinishing, map_filter_project: mz_expr::SafeMfpPlan, + read_hold: ReadHold, target_replica: Option, ) -> Result<(), PeekError> { self.instance_mut(instance_id)?.peek( @@ -758,6 +764,7 @@ where timestamp, finishing, map_filter_project, + read_hold, target_replica, )?; Ok(()) diff --git a/src/compute-client/src/controller/error.rs b/src/compute-client/src/controller/error.rs index 2b19c4fad6153..4712cd522e700 100644 --- a/src/compute-client/src/controller/error.rs +++ b/src/compute-client/src/controller/error.rs @@ -131,7 +131,7 @@ pub enum DataflowCreationError { /// The given instance does not exist. #[error("instance does not exist: {0}")] InstanceMissing(ComputeInstanceId), - /// One of the imported collections does not exist. + /// One of the referenced collections does not exist. #[error("collection does not exist: {0}")] CollectionMissing(GlobalId), /// The targeted replica does not exist. @@ -140,9 +140,6 @@ pub enum DataflowCreationError { /// The dataflow definition has doesn't have an `as_of` set. #[error("dataflow definition lacks an as_of value")] MissingAsOf, - /// One of the imported collections has a read frontier greater than the dataflow `as_of`. - #[error("dataflow has an as_of not beyond the since of collection: {0}")] - SinceViolation(GlobalId), /// We skip dataflow creation for empty `as_of`s, which would be a problem for a SUBSCRIBE, /// because an initial response is expected. #[error("subscribe dataflow has an empty as_of")] @@ -151,6 +148,13 @@ pub enum DataflowCreationError { /// because it should always have an external side effect. #[error("copy to dataflow has an empty as_of")] EmptyAsOfForCopyTo, + /// Caller did not provide a read hold for one of the imported collections. + #[error("no read hold provided for dataflow import: {0}")] + ReadHoldMissing(GlobalId), + /// Caller provided a read hold with a `since` > the dataflow `as_of` for one of the imported + /// collections. + #[error("insufficient read hold provided for dataflow import: {0}")] + ReadHoldInsufficient(GlobalId), } impl From for DataflowCreationError { @@ -166,9 +170,10 @@ impl From for DataflowCreationError { CollectionMissing(id) => Self::CollectionMissing(id), ReplicaMissing(id) => Self::ReplicaMissing(id), MissingAsOf => Self::MissingAsOf, - SinceViolation(id) => Self::SinceViolation(id), EmptyAsOfForSubscribe => Self::EmptyAsOfForSubscribe, EmptyAsOfForCopyTo => Self::EmptyAsOfForCopyTo, + ReadHoldMissing(id) => Self::ReadHoldMissing(id), + ReadHoldInsufficient(id) => Self::ReadHoldInsufficient(id), } } } @@ -176,18 +181,18 @@ impl From for DataflowCreationError { /// Errors arising during peek processing. #[derive(Error, Debug)] pub enum PeekError { - /// TODO(#25239): Add documentation. + /// The given instance does not exist. #[error("instance does not exist: {0}")] InstanceMissing(ComputeInstanceId), - /// TODO(#25239): Add documentation. - #[error("collection does not exist: {0}")] - CollectionMissing(GlobalId), - /// TODO(#25239): Add documentation. + /// The targeted replica does not exist. #[error("replica does not exist: {0}")] ReplicaMissing(ReplicaId), - /// TODO(#25239): Add documentation. - #[error("peek timestamp is not beyond the since of collection: {0}")] - SinceViolation(GlobalId), + /// Caller provided a read hold for a collection that's not the peeked collect. + #[error("read hold ID does not match peeked collection: {0}")] + ReadHoldIdMismatch(GlobalId), + /// Caller provided a read hold with a `since` > the peek timestamp. + #[error("insufficient read hold provided: {0}")] + ReadHoldInsufficient(GlobalId), } impl From for PeekError { @@ -200,9 +205,9 @@ impl From for PeekError { fn from(error: instance::PeekError) -> Self { use instance::PeekError::*; match error { - CollectionMissing(id) => Self::CollectionMissing(id), ReplicaMissing(id) => Self::ReplicaMissing(id), - SinceViolation(id) => Self::SinceViolation(id), + ReadHoldIdMismatch(id) => Self::ReadHoldIdMismatch(id), + ReadHoldInsufficient(id) => Self::ReadHoldInsufficient(id), } } } diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 7b8a25d354b95..e6ef64a2e31c8 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -37,7 +37,7 @@ use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::{Datum, Diff, GlobalId, Row}; use mz_storage_client::controller::IntrospectionType; use mz_storage_client::storage_collections::StorageCollections; -use mz_storage_types::read_holds::{ReadHold, ReadHoldError}; +use mz_storage_types::read_holds::ReadHold; use mz_storage_types::read_policy::ReadPolicy; use serde::Serialize; use thiserror::Error; @@ -83,58 +83,24 @@ pub(super) enum DataflowCreationError { ReplicaMissing(ReplicaId), #[error("dataflow definition lacks an as_of value")] MissingAsOf, - #[error("dataflow has an as_of not beyond the since of collection: {0}")] - SinceViolation(GlobalId), #[error("subscribe dataflow has an empty as_of")] EmptyAsOfForSubscribe, #[error("copy to dataflow has an empty as_of")] EmptyAsOfForCopyTo, -} - -impl From for DataflowCreationError { - fn from(error: CollectionMissing) -> Self { - Self::CollectionMissing(error.0) - } -} - -impl From for DataflowCreationError { - fn from(error: ReadHoldError) -> Self { - match error { - ReadHoldError::CollectionMissing(id) => DataflowCreationError::CollectionMissing(id), - ReadHoldError::SinceViolation(id) => DataflowCreationError::SinceViolation(id), - } - } + #[error("no read hold provided for dataflow import: {0}")] + ReadHoldMissing(GlobalId), + #[error("insufficient read hold provided for dataflow import: {0}")] + ReadHoldInsufficient(GlobalId), } #[derive(Error, Debug)] pub(super) enum PeekError { - #[error("collection does not exist: {0}")] - CollectionMissing(GlobalId), #[error("replica does not exist: {0}")] ReplicaMissing(ReplicaId), - #[error("peek timestamp is not beyond the since of collection: {0}")] - SinceViolation(GlobalId), -} - -impl From for PeekError { - fn from(error: CollectionMissing) -> Self { - Self::CollectionMissing(error.0) - } -} - -impl From for PeekError { - fn from(error: ReadHoldError) -> Self { - match error { - ReadHoldError::CollectionMissing(id) => PeekError::CollectionMissing(id), - ReadHoldError::SinceViolation(id) => PeekError::SinceViolation(id), - } - } -} - -impl From for ReadHoldError { - fn from(error: CollectionMissing) -> Self { - ReadHoldError::CollectionMissing(error.0) - } + #[error("read hold ID does not match peeked collection: {0}")] + ReadHoldIdMismatch(GlobalId), + #[error("insufficient read hold provided: {0}")] + ReadHoldInsufficient(GlobalId), } #[derive(Error, Debug)] @@ -371,20 +337,6 @@ impl Instance { self.replicas.insert(id, replica); } - fn acquire_storage_read_hold_at( - &self, - id: GlobalId, - frontier: Antichain, - ) -> Result, ReadHoldError> { - let mut hold = self - .storage_collections - .acquire_read_holds(vec![id])? - .into_element(); - hold.try_downgrade(frontier) - .map_err(|_| ReadHoldError::SinceViolation(id))?; - Ok(hold) - } - /// Enqueue the given response for delivery to the controller clients. fn deliver_response(&mut self, response: ComputeControllerResponse) { self.response_tx @@ -1143,7 +1095,10 @@ where } } - /// Create the described dataflows and initializes state for their output. + /// Creates the described dataflow and initializes state for its output. + /// + /// This method expects a `DataflowDescription` with an `as_of` frontier specified, as well as + /// for each imported collection a read hold in `import_read_holds` at at least the `as_of`. /// /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are /// configured to target that replica, i.e., only subscribe responses sent by that replica are @@ -1151,11 +1106,14 @@ where pub fn create_dataflow( &mut self, dataflow: DataflowDescription, (), T>, + import_read_holds: Vec>, subscribe_target_replica: Option, ) -> Result<(), DataflowCreationError> { + use DataflowCreationError::*; + if let Some(replica_id) = subscribe_target_replica { if !self.replica_exists(replica_id) { - return Err(DataflowCreationError::ReplicaMissing(replica_id)); + return Err(ReplicaMissing(replica_id)); } } @@ -1165,48 +1123,51 @@ where .as_ref() .ok_or(DataflowCreationError::MissingAsOf)?; if as_of.is_empty() && dataflow.subscribe_ids().next().is_some() { - return Err(DataflowCreationError::EmptyAsOfForSubscribe); + return Err(EmptyAsOfForSubscribe); } if as_of.is_empty() && dataflow.copy_to_ids().next().is_some() { - return Err(DataflowCreationError::EmptyAsOfForCopyTo); + return Err(EmptyAsOfForCopyTo); } - // Validate the dataflow as having inputs whose `since` is less or equal to the dataflow's `as_of`. - // Start tracking frontiers for each dataflow, using its `as_of` for each index and sink. + // Collect all dependencies of the dataflow, and read holds on them at the `as_of`. + let mut import_read_holds: BTreeMap<_, _> = + import_read_holds.into_iter().map(|r| (r.id(), r)).collect(); - // When we install per-replica input read holds, we cannot use the `as_of` because of - // reconciliation: Existing slow replicas might be reading from the inputs at times before - // the `as_of` and we would rather not crash them by allowing their inputs to compact too - // far. So instead we take read holds at the least time available. - let mut replica_input_read_holds = Vec::new(); + let mut prepare_read_hold = |id| { + let mut read_hold = import_read_holds.remove(&id).ok_or(ReadHoldMissing(id))?; + read_hold + .try_downgrade(as_of.clone()) + .map_err(|_| ReadHoldInsufficient(id))?; + Ok(read_hold) + }; - // Collect all dependencies of the dataflow, and read holds on them. let mut storage_dependencies = BTreeMap::new(); - let mut compute_dependencies = BTreeMap::new(); - for &id in dataflow.source_imports.keys() { - let mut read_hold = self - .storage_collections - .acquire_read_holds(vec![id])? - .into_element(); - replica_input_read_holds.push(read_hold.clone()); - - read_hold - .try_downgrade(as_of.clone()) - .map_err(|_| DataflowCreationError::SinceViolation(id))?; - storage_dependencies.insert(id, read_hold); + storage_dependencies.insert(id, prepare_read_hold(id)?); } - + let mut compute_dependencies = BTreeMap::new(); for &id in dataflow.index_imports.keys() { - let as_of_read_hold = self.acquire_read_hold_at(id, as_of.clone())?; - compute_dependencies.insert(id, as_of_read_hold); + compute_dependencies.insert(id, prepare_read_hold(id)?); } - // If the `as_of` is empty, we are not going to create a dataflow, so replicas won't read - // from the inputs. - if as_of.is_empty() { - replica_input_read_holds = Default::default(); - } + // When we install per-replica input read holds, we cannot use the `as_of` because of + // reconciliation: Existing slow replicas might be reading from the inputs at times before + // the `as_of` and we would rather not crash them by allowing their inputs to compact too + // far. So instead we take read holds at the least time available. + let replica_input_read_holds: Vec<_> = if as_of.is_empty() { + // We are not going to create a dataflow, so replicas won't read from the inputs. + Default::default() + } else { + let storage_ids = dataflow.source_imports.keys().copied(); + storage_ids + .map(|id| { + self.storage_collections + .acquire_read_holds(vec![id]) + .expect("we already have a read hold on this collection") + .into_element() + }) + .collect() + }; // Install collection state for each of the exports. for export_id in dataflow.export_ids() { @@ -1243,7 +1204,7 @@ where let collection_metadata = self .storage_collections .collection_metadata(id) - .map_err(|_| DataflowCreationError::CollectionMissing(id))?; + .expect("we have a read hold on this collection"); let desc = SourceInstanceDesc { storage_metadata: collection_metadata.clone(), @@ -1449,21 +1410,23 @@ where timestamp: T, finishing: RowSetFinishing, map_filter_project: mz_expr::SafeMfpPlan, + mut read_hold: ReadHold, target_replica: Option, ) -> Result<(), PeekError> { - // Install a compaction hold on `id` at `timestamp`. - let read_hold = match &peek_target { - PeekTarget::Index { id } => { - self.acquire_read_hold_at(*id, Antichain::from_elem(timestamp.clone()))? - } - PeekTarget::Persist { id, .. } => { - self.acquire_storage_read_hold_at(*id, Antichain::from_elem(timestamp.clone()))? - } - }; + use PeekError::*; + + // Downgrade the provided read hold to the peek time. + let target_id = peek_target.id(); + if read_hold.id() != target_id { + return Err(ReadHoldIdMismatch(read_hold.id())); + } + read_hold + .try_downgrade(Antichain::from_elem(timestamp.clone())) + .map_err(|_| ReadHoldInsufficient(target_id))?; if let Some(target) = target_replica { if !self.replica_exists(target) { - return Err(PeekError::ReplicaMissing(target)); + return Err(ReplicaMissing(target)); } } @@ -1573,18 +1536,6 @@ where Ok(hold) } - /// Acquires a [`ReadHold`] for the identified compute collection at the desired frontier. - fn acquire_read_hold_at( - &mut self, - id: GlobalId, - frontier: Antichain, - ) -> Result, ReadHoldError> { - let mut hold = self.acquire_read_hold(id)?; - hold.try_downgrade(frontier) - .map_err(|_| ReadHoldError::SinceViolation(id))?; - Ok(hold) - } - /// Advance the global write frontier of the given collection. /// /// Frontier regressions are gracefully ignored. diff --git a/src/compute-client/src/protocol/command.rs b/src/compute-client/src/protocol/command.rs index 542785476cbc4..92274b121c663 100644 --- a/src/compute-client/src/protocol/command.rs +++ b/src/compute-client/src/protocol/command.rs @@ -508,6 +508,16 @@ pub enum PeekTarget { }, } +impl PeekTarget { + /// Returns the ID of the peeked collection. + pub fn id(&self) -> GlobalId { + match self { + Self::Index { id } => *id, + Self::Persist { id, .. } => *id, + } + } +} + /// Peek a collection, either in an arrangement or Persist. /// /// This request elicits data from the worker, by naming the From 8cc5c390421f6eab8f58bbe89812e4aa0393438b Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 12 Sep 2024 17:41:04 +0200 Subject: [PATCH 4/6] coord: pass read holds to compute controller This commit makes the coordinator pass read holds to the compute controller's `create_dataflow` and `peek` APIs. --- src/adapter/src/coord.rs | 42 +++- src/adapter/src/coord/introspection.rs | 3 +- src/adapter/src/coord/peek.rs | 184 ++++++++++-------- src/adapter/src/coord/read_policy.rs | 30 +++ .../src/coord/sequencer/inner/create_index.rs | 13 +- .../inner/create_materialized_view.rs | 10 +- .../sequencer/inner/explain_timestamp.rs | 2 +- src/adapter/src/coord/sequencer/inner/peek.rs | 40 +++- .../src/coord/sequencer/inner/subscribe.rs | 17 +- src/adapter/src/util.rs | 15 +- 10 files changed, 225 insertions(+), 131 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 04904c7de7c3e..3fff24c4fc514 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -496,6 +496,7 @@ pub struct PeekStageOptimize { id_bundle: CollectionIdBundle, target_replica: Option, determination: TimestampDetermination, + read_holds: ReadHolds, optimizer: Either, /// An optional context set iff the state machine is initiated from /// sequencing an EXPLAIN for this statement. @@ -511,6 +512,7 @@ pub struct PeekStageFinish { target_replica: Option, source_ids: BTreeSet, determination: TimestampDetermination, + read_holds: ReadHolds, cluster_id: ComputeInstanceId, finishing: RowSetFinishing, /// When present, an optimizer trace to be used for emitting a plan insights @@ -528,6 +530,7 @@ pub struct PeekStageCopyTo { global_lir_plan: optimize::copy_to::GlobalLirPlan, optimization_finished_at: EpochMillis, source_ids: BTreeSet, + read_holds: ReadHolds, } #[derive(Debug)] @@ -808,6 +811,7 @@ pub struct SubscribeFinish { replica_id: Option, plan: plan::SubscribePlan, global_lir_plan: optimize::subscribe::GlobalLirPlan, + read_holds: ReadHolds, dependency_ids: BTreeSet, } @@ -1974,9 +1978,14 @@ impl Coordinator { ); } + let import_read_holds = df_desc + .import_ids() + .map(|id| read_holds[&id].clone()) + .collect(); + self.controller .compute - .create_dataflow(idx.cluster_id, df_desc, None) + .create_dataflow(idx.cluster_id, df_desc, import_read_holds, None) .unwrap_or_terminate("cannot fail to create dataflows"); let read_hold = self @@ -2030,7 +2039,13 @@ impl Coordinator { ); } - self.ship_dataflow(df_desc, mview.cluster_id, None).await; + let import_read_holds = df_desc + .import_ids() + .map(|id| read_holds[&id].clone()) + .collect(); + + self.ship_dataflow(df_desc, mview.cluster_id, import_read_holds, None) + .await; } CatalogItem::Sink(sink) => { let id = entry.id(); @@ -3055,6 +3070,7 @@ impl Coordinator { &mut self, dataflow: DataflowDescription, instance: ComputeInstanceId, + import_read_holds: Vec>, subscribe_target_replica: Option, ) { // We must only install read policies for indexes, not for sinks. @@ -3063,7 +3079,12 @@ impl Coordinator { self.controller .compute - .create_dataflow(instance, dataflow, subscribe_target_replica) + .create_dataflow( + instance, + dataflow, + import_read_holds, + subscribe_target_replica, + ) .unwrap_or_terminate("dataflow creation cannot fail"); for id in export_ids { @@ -3077,14 +3098,16 @@ impl Coordinator { &mut self, dataflow: DataflowDescription, instance: ComputeInstanceId, + import_read_holds: Vec>, notice_builtin_updates_fut: Option, ) { if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut { - let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None); + let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, import_read_holds, None); let ((), ()) = futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await; } else { - self.ship_dataflow(dataflow, instance, None).await; + self.ship_dataflow(dataflow, instance, import_read_holds, None) + .await; } } @@ -3388,11 +3411,14 @@ impl Coordinator { // prevent us from incorrectly teaching those functions how to return errors (which has // happened twice and is the motivation for this test). - // An arbitrary compute instance ID to satisfy the function calls below. Note that - // this only works because this function will never run. + // An arbitrary compute instance ID and empty read holds to satisfy the function calls + // below. Note that this only works because this function will never run. let compute_instance = ComputeInstanceId::User(1); + let import_read_holds = Default::default(); - let _: () = self.ship_dataflow(dataflow, compute_instance, None).await; + let _: () = self + .ship_dataflow(dataflow, compute_instance, import_read_holds, None) + .await; } } diff --git a/src/adapter/src/coord/introspection.rs b/src/adapter/src/coord/introspection.rs index e425a74df4d76..0b3b5a0374926 100644 --- a/src/adapter/src/coord/introspection.rs +++ b/src/adapter/src/coord/introspection.rs @@ -297,7 +297,7 @@ impl Coordinator { // dataflow for it. let response = if self.introspection_subscribes.contains_key(&subscribe_id) { let (df_desc, _df_meta) = global_lir_plan.unapply(); - self.ship_dataflow(df_desc, cluster_id, Some(replica_id)) + self.ship_dataflow(df_desc, cluster_id, read_holds.into(), Some(replica_id)) .await; Ok(StageResult::Response( @@ -310,7 +310,6 @@ impl Coordinator { )) }; - drop(read_holds); response } diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 1057d927d0968..71e7a7b9eb6bb 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -43,10 +43,11 @@ use tokio::sync::oneshot; use uuid::Uuid; use crate::coord::timestamp_selection::TimestampDetermination; +use crate::optimize::dataflows::dataflow_import_id_bundle; use crate::optimize::OptimizerError; use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy}; use crate::util::ResultExt; -use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse}; +use crate::{AdapterError, ExecuteContext, ExecuteContextExtra, ExecuteResponse, ReadHolds}; #[derive(Debug)] pub(crate) struct PendingPeek { @@ -438,11 +439,12 @@ impl crate::coord::Coordinator { #[mz_ore::instrument(level = "debug")] pub async fn implement_peek_plan( &mut self, - ctx_extra: &mut ExecuteContextExtra, + ctx: &mut ExecuteContext, plan: PlannedPeek, finishing: RowSetFinishing, compute_instance: ComputeInstanceId, target_replica: Option, + mut read_holds: ReadHolds, max_result_size: u64, max_returned_query_size: Option, ) -> Result { @@ -505,12 +507,12 @@ impl crate::coord::Coordinator { StatementEndedExecutionReason::Errored { error }, ), }; - self.retire_execution(reason, std::mem::take(ctx_extra)); + self.retire_execution(reason, std::mem::take(ctx.extra_mut())); return ret; } let timestamp = determination.timestamp_context.timestamp_or_default(); - if let Some(id) = ctx_extra.contents() { + if let Some(id) = ctx.extra().contents() { self.set_statement_execution_timestamp(id, timestamp) } @@ -520,78 +522,105 @@ impl crate::coord::Coordinator { // differently. // If we must build the view, ship the dataflow. - let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path { - PeekPlan::FastPath(FastPathPlan::PeekExisting( - _coll_id, - idx_id, - literal_constraints, - map_filter_project, - )) => ( - (literal_constraints, timestamp, map_filter_project), - None, - true, - PeekTarget::Index { id: idx_id }, - StatementExecutionStrategy::FastPath, - ), - PeekPlan::FastPath(FastPathPlan::PeekPersist(coll_id, map_filter_project)) => { - let peek_command = (None, timestamp, map_filter_project); - let metadata = self - .controller - .storage - .collection_metadata(coll_id) - .expect("storage collection for fast-path peek") - .clone(); - ( - peek_command, - None, - true, - PeekTarget::Persist { - id: coll_id, - metadata, - }, - StatementExecutionStrategy::PersistFastPath, - ) - } - PeekPlan::SlowPath(PeekDataflowPlan { - desc: dataflow, - // n.b. this index_id identifies a transient index the - // caller created, so it is guaranteed to be on - // `compute_instance`. - id: index_id, - key: index_key, - permutation: index_permutation, - thinned_arity: index_thinned_arity, - }) => { - // Very important: actually create the dataflow (here, so we can destructure). - self.controller - .compute - .create_dataflow(compute_instance, dataflow, None) - .unwrap_or_terminate("cannot fail to create dataflows"); - self.initialize_compute_read_policy( - index_id, - compute_instance, - // Disable compaction so that nothing can compact before the peek occurs below. - CompactionWindow::DisableCompaction, - ) - .await; - - // Create an identity MFP operator. - let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity); - map_filter_project - .permute(index_permutation, index_key.len() + index_thinned_arity); - let map_filter_project = mfp_to_safe_plan(map_filter_project)?; - ( - (None, timestamp, map_filter_project), - Some(index_id), - false, - PeekTarget::Index { id: index_id }, - StatementExecutionStrategy::Standard, - ) - } - _ => { - unreachable!() - } - }; + let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy, read_hold) = + match fast_path { + PeekPlan::FastPath(FastPathPlan::PeekExisting( + _coll_id, + idx_id, + literal_constraints, + map_filter_project, + )) => { + let read_hold = read_holds + .compute_holds + .remove(&(compute_instance, idx_id)) + .expect("read hold must exist"); + + ( + (literal_constraints, timestamp, map_filter_project), + None, + true, + PeekTarget::Index { id: idx_id }, + StatementExecutionStrategy::FastPath, + read_hold, + ) + } + PeekPlan::FastPath(FastPathPlan::PeekPersist(coll_id, map_filter_project)) => { + let peek_command = (None, timestamp, map_filter_project); + let metadata = self + .controller + .storage + .collection_metadata(coll_id) + .expect("storage collection for fast-path peek") + .clone(); + + let read_hold = read_holds + .storage_holds + .remove(&coll_id) + .expect("read hold must exist"); + + ( + peek_command, + None, + true, + PeekTarget::Persist { + id: coll_id, + metadata, + }, + StatementExecutionStrategy::PersistFastPath, + read_hold, + ) + } + PeekPlan::SlowPath(PeekDataflowPlan { + desc: dataflow, + // n.b. this index_id identifies a transient index the + // caller created, so it is guaranteed to be on + // `compute_instance`. + id: index_id, + key: index_key, + permutation: index_permutation, + thinned_arity: index_thinned_arity, + }) => { + let id_bundle = dataflow_import_id_bundle(&dataflow, compute_instance); + let import_read_holds = read_holds.clone_for(&id_bundle).into(); + + // Very important: actually create the dataflow (here, so we can destructure). + self.controller + .compute + .create_dataflow(compute_instance, dataflow, import_read_holds, None) + .unwrap_or_terminate("cannot fail to create dataflows"); + + let read_hold = self + .controller + .compute + .acquire_read_hold(compute_instance, index_id) + .expect("collection just created"); + + self.initialize_compute_read_policy( + index_id, + compute_instance, + // Disable compaction so that nothing can compact before the peek occurs below. + CompactionWindow::DisableCompaction, + ) + .await; + + // Create an identity MFP operator. + let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity); + map_filter_project + .permute(index_permutation, index_key.len() + index_thinned_arity); + let map_filter_project = mfp_to_safe_plan(map_filter_project)?; + ( + (None, timestamp, map_filter_project), + Some(index_id), + false, + PeekTarget::Index { id: index_id }, + StatementExecutionStrategy::Standard, + read_hold, + ) + } + _ => { + unreachable!() + } + }; // Endpoints for sending and receiving peek responses. let (rows_tx, rows_rx) = tokio::sync::oneshot::channel(); @@ -612,7 +641,7 @@ impl crate::coord::Coordinator { conn_id: conn_id.clone(), cluster_id: compute_instance, depends_on: source_ids, - ctx_extra: std::mem::take(ctx_extra), + ctx_extra: std::mem::take(ctx.extra_mut()), is_fast_path, limit: finishing.limit.map(|x| usize::cast_from(u64::from(x))), offset: finishing.offset, @@ -634,6 +663,7 @@ impl crate::coord::Coordinator { timestamp, finishing.clone(), map_filter_project, + read_hold, target_replica, ) .unwrap_or_terminate("cannot fail to peek"); diff --git a/src/adapter/src/coord/read_policy.rs b/src/adapter/src/coord/read_policy.rs index bf89e6ccd11bd..4a564152fe39d 100644 --- a/src/adapter/src/coord/read_policy.rs +++ b/src/adapter/src/coord/read_policy.rs @@ -111,6 +111,28 @@ impl ReadHolds { pub fn remove_compute_collection(&mut self, instance_id: ComputeInstanceId, id: GlobalId) { self.compute_holds.remove(&(instance_id, id)); } + + /// Return copies of a subset of the contained read holds, as specified by the given `ids`. + pub fn clone_for(&self, ids: &CollectionIdBundle) -> Self { + let mut storage_holds = BTreeMap::new(); + let mut compute_holds = BTreeMap::new(); + + for &id in &ids.storage_ids { + let hold = self.storage_holds[&id].clone(); + storage_holds.insert(id, hold); + } + for (&instance_id, compute_ids) in &ids.compute_ids { + for &id in compute_ids { + let hold = self.compute_holds[&(instance_id, id)].clone(); + compute_holds.insert((instance_id, id), hold); + } + } + + Self { + storage_holds, + compute_holds, + } + } } impl ReadHolds { @@ -183,6 +205,14 @@ impl Default for ReadHolds { } } +impl From> for Vec> { + fn from(holds: ReadHolds) -> Self { + let storage = holds.storage_holds.into_values(); + let compute = holds.compute_holds.into_values(); + storage.chain(compute).collect() + } +} + impl crate::coord::Coordinator { /// Initialize the read policy for a storage collection. /// diff --git a/src/adapter/src/coord/sequencer/inner/create_index.rs b/src/adapter/src/coord/sequencer/inner/create_index.rs index 9ef721be60b66..f912cb58bb47e 100644 --- a/src/adapter/src/coord/sequencer/inner/create_index.rs +++ b/src/adapter/src/coord/sequencer/inner/create_index.rs @@ -460,14 +460,6 @@ impl Coordinator { // Timestamp selection let id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id); - - // We're putting in place read holds, such that ship_dataflow, - // below, which calls update_read_capabilities, can successfully - // do so. Otherwise, the since of dependencies might move along - // concurrently, pulling the rug from under us! - // - // TODO: Maybe in the future, pass those holds on to compute, to - // hold on to them and downgrade when possible? let read_holds = coord.acquire_read_holds(&id_bundle); let since = coord.least_valid_read(&read_holds); df_desc.set_as_of(since); @@ -476,14 +468,11 @@ impl Coordinator { .ship_dataflow_and_notice_builtin_table_updates( df_desc, cluster_id, + read_holds.into(), notice_builtin_updates_fut, ) .await; - // Drop read holds after the dataflow has been shipped, at which - // point compute will have put in its own read holds. - drop(read_holds); - coord.update_compute_read_policy( cluster_id, exported_index_id, diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index cc22e48a3b934..48ecc3607a9c8 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -569,21 +569,18 @@ impl Coordinator { // Timestamp selection let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id); - let read_holds_owned; let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) { // In some cases, for example when REFRESH is used, the preparatory // stages will already have acquired ReadHolds, we can re-use those. - - txn_reads + txn_reads.clone_for(&id_bundle) } else { // No one has acquired holds, make sure we can determine an as_of // and render our dataflow below. - read_holds_owned = self.acquire_read_holds(&id_bundle); - &read_holds_owned + self.acquire_read_holds(&id_bundle) }; let (dataflow_as_of, storage_as_of, until) = - self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?; + self.select_timestamps(id_bundle, refresh_schedule.as_ref(), &read_holds)?; tracing::info!( dataflow_as_of = ?dataflow_as_of, @@ -695,6 +692,7 @@ impl Coordinator { .ship_dataflow_and_notice_builtin_table_updates( df_desc, cluster_id, + read_holds.into(), notice_builtin_updates_fut, ) .await; diff --git a/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs b/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs index cc60f450e4471..d1a4999f1af91 100644 --- a/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs +++ b/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs @@ -313,7 +313,7 @@ impl Coordinator { let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await; - let determination = self.sequence_peek_timestamp( + let (determination, _read_holds) = self.sequence_peek_timestamp( session, &when, cluster_id, diff --git a/src/adapter/src/coord/sequencer/inner/peek.rs b/src/adapter/src/coord/sequencer/inner/peek.rs index 343c23d47b14a..771422cafa34d 100644 --- a/src/adapter/src/coord/sequencer/inner/peek.rs +++ b/src/adapter/src/coord/sequencer/inner/peek.rs @@ -52,10 +52,13 @@ use crate::error::AdapterError; use crate::explain::insights::PlanInsightsContext; use crate::explain::optimizer_trace::OptimizerTrace; use crate::notice::AdapterNotice; -use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle}; +use crate::optimize::dataflows::{ + dataflow_import_id_bundle, prep_scalar_expr, EvalTime, ExprPrepStyle, +}; use crate::optimize::{self, Optimize}; use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus}; use crate::statement_logging::StatementLifecycleEvent; +use crate::ReadHolds; impl Staged for PeekStage { type Ctx = ExecuteContext; @@ -482,7 +485,7 @@ impl Coordinator { // sufficient collections for safety. validity.extend_dependencies(id_bundle.iter()); - let determination = self.sequence_peek_timestamp( + let (determination, read_holds) = self.sequence_peek_timestamp( session, &plan.when, cluster_id, @@ -502,6 +505,7 @@ impl Coordinator { id_bundle, target_replica, determination, + read_holds, optimizer, explain_ctx, }); @@ -520,6 +524,7 @@ impl Coordinator { id_bundle, target_replica, determination, + read_holds, mut optimizer, explain_ctx, }: PeekStageOptimize, @@ -634,6 +639,7 @@ impl Coordinator { target_replica, source_ids, determination, + read_holds, cluster_id: optimizer.cluster_id(), finishing: optimizer.finishing().clone(), plan_insights_optimizer_trace: Some(optimizer_trace), @@ -650,6 +656,7 @@ impl Coordinator { target_replica, source_ids, determination, + read_holds, cluster_id: optimizer.cluster_id(), finishing: optimizer.finishing().clone(), plan_insights_optimizer_trace: None, @@ -686,6 +693,7 @@ impl Coordinator { global_lir_plan, optimization_finished_at, source_ids, + read_holds, }) } // Internal optimizer errors are handled differently @@ -800,6 +808,7 @@ impl Coordinator { target_replica, source_ids, determination, + read_holds, cluster_id, finishing, plan_insights_optimizer_trace, @@ -903,11 +912,12 @@ impl Coordinator { // Implement the peek, and capture the response. let resp = self .implement_peek_plan( - ctx.extra_mut(), + ctx, planned_peek, finishing, cluster_id, target_replica, + read_holds, max_result_size, max_query_result_size, ) @@ -940,6 +950,7 @@ impl Coordinator { global_lir_plan, optimization_finished_at, source_ids, + read_holds, }: PeekStageCopyTo, ) -> Result>, AdapterError> { if let Some(id) = ctx.extra.contents() { @@ -971,8 +982,12 @@ impl Coordinator { .await, ); + let id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id); + let import_read_holds = read_holds.clone_for(&id_bundle).into(); + // Ship dataflow. - self.ship_dataflow(df_desc, cluster_id, None).await; + self.ship_dataflow(df_desc, cluster_id, import_read_holds, None) + .await; let span = Span::current(); Ok(StageResult::HandleRetire(mz_ore::task::spawn( @@ -1085,7 +1100,7 @@ impl Coordinator { source_ids: &BTreeSet, real_time_recency_ts: Option, requires_linearization: RequireLinearization, - ) -> Result, AdapterError> { + ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> { let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when); let timedomain_bundle; @@ -1200,6 +1215,19 @@ impl Coordinator { })?; }; - Ok(determination) + // Collect the read holds we'll pass to the compute controller when executing the peek. + // Usually those will be available in `txn_read_holds` (see above), except if we read from + // indexes over constant collections. + let read_holds = if determination.timestamp_context.timestamp().is_some() { + let txn_holds = self + .txn_read_holds + .get(session.conn_id()) + .expect("set above"); + txn_holds.clone_for(source_bundle) + } else { + self.acquire_read_holds(source_bundle) + }; + + Ok((determination, read_holds)) } } diff --git a/src/adapter/src/coord/sequencer/inner/subscribe.rs b/src/adapter/src/coord/sequencer/inner/subscribe.rs index cf8fd1289dba8..3778247eb1e7a 100644 --- a/src/adapter/src/coord/sequencer/inner/subscribe.rs +++ b/src/adapter/src/coord/sequencer/inner/subscribe.rs @@ -281,8 +281,6 @@ impl Coordinator { } } - self.store_transaction_read_holds(ctx.session(), read_holds); - let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of)); // Optimize LIR @@ -300,6 +298,7 @@ impl Coordinator { cluster_id: optimizer.cluster_id(), plan, global_lir_plan, + read_holds, dependency_ids, replica_id, }); @@ -324,6 +323,7 @@ impl Coordinator { .. }, global_lir_plan, + read_holds, dependency_ids, replica_id, }: SubscribeFinish, @@ -355,22 +355,15 @@ impl Coordinator { let write_notify_fut = self .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe)) .await; + // Ship dataflow. - let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id); + let ship_dataflow_fut = + self.ship_dataflow(df_desc, cluster_id, read_holds.into(), replica_id); // Both adding metadata for the new SUBSCRIBE and shipping the underlying dataflow, send // requests to external services, which can take time, so we run them concurrently. let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await; - // Release the pre-optimization read holds because the controller is now handling those. - let txn_read_holds = self - .txn_read_holds - .remove(ctx.session().conn_id()) - .expect("must have previously installed read holds"); - - // Explicitly drop read holds, just to make it obvious what's happening. - drop(txn_read_holds); - let resp = ExecuteResponse::Subscribing { rx, ctx_extra: std::mem::take(ctx.extra_mut()), diff --git a/src/adapter/src/util.rs b/src/adapter/src/util.rs index f850ef019846c..805e3f100d60e 100644 --- a/src/adapter/src/util.rs +++ b/src/adapter/src/util.rs @@ -404,13 +404,14 @@ impl ShouldTerminateGracefully for StorageError { impl ShouldTerminateGracefully for DataflowCreationError { fn should_terminate_gracefully(&self) -> Terminate { match self { - DataflowCreationError::SinceViolation(_) - | DataflowCreationError::InstanceMissing(_) + DataflowCreationError::InstanceMissing(_) | DataflowCreationError::CollectionMissing(_) | DataflowCreationError::ReplicaMissing(_) | DataflowCreationError::MissingAsOf | DataflowCreationError::EmptyAsOfForSubscribe - | DataflowCreationError::EmptyAsOfForCopyTo => Terminate::Panic, + | DataflowCreationError::EmptyAsOfForCopyTo + | DataflowCreationError::ReadHoldMissing(_) + | DataflowCreationError::ReadHoldInsufficient(_) => Terminate::Panic, } } } @@ -427,10 +428,10 @@ impl ShouldTerminateGracefully for CollectionUpdateError { impl ShouldTerminateGracefully for PeekError { fn should_terminate_gracefully(&self) -> Terminate { match self { - PeekError::SinceViolation(_) - | PeekError::InstanceMissing(_) - | PeekError::CollectionMissing(_) - | PeekError::ReplicaMissing(_) => Terminate::Panic, + PeekError::InstanceMissing(_) + | PeekError::ReplicaMissing(_) + | PeekError::ReadHoldIdMismatch(_) + | PeekError::ReadHoldInsufficient(_) => Terminate::Panic, } } } From 062658e2842ceef2ae3f504f8a3d4e229e311fee Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 12 Sep 2024 18:02:48 +0200 Subject: [PATCH 5/6] controller: return read holds when creating collections This commit makes the storage and compute controller APIs that create storage/compute collections, return read holds for any readable collections created, simplifying the usage in the coordinator a bit. --- src/adapter/src/coord.rs | 43 ++++++------------- src/adapter/src/coord/peek.rs | 11 ++--- src/compute-client/src/as_of_selection.rs | 2 +- src/compute-client/src/controller.rs | 34 +++++++++++---- src/controller/src/clusters.rs | 12 ++++-- src/storage-client/src/controller.rs | 6 ++- src/storage-client/src/storage_collections.rs | 15 +++++-- src/storage-controller/src/lib.rs | 7 +-- 8 files changed, 71 insertions(+), 59 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 3fff24c4fc514..7865765057353 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -74,6 +74,7 @@ use mz_adapter_types::dyncfgs::{ }; use mz_compute_client::as_of_selection; use mz_ore::channel::trigger; +use mz_ore::collections::CollectionExt; use mz_sql::names::{ResolvedIds, SchemaSpecifier}; use mz_sql::session::user::User; use mz_storage_types::read_holds::ReadHold; @@ -1822,7 +1823,7 @@ impl Coordinator { let enable_worker_core_affinity = self.catalog().system_config().enable_worker_core_affinity(); for instance in self.catalog.clusters() { - self.controller.create_cluster( + let log_holds = self.controller.create_cluster( instance.id, ClusterConfig { arranged_logs: instance.log_indexes.clone(), @@ -1840,14 +1841,7 @@ impl Coordinator { )?; } - for &log_id in instance.log_indexes.values() { - let read_hold = self - .controller - .compute - .acquire_read_hold(instance.id, log_id) - .expect("log index was created with the cluster"); - read_holds.insert(log_id, read_hold); - } + read_holds.extend(log_holds.into_iter().map(|r| (r.id(), r))); } info!( @@ -1983,16 +1977,13 @@ impl Coordinator { .map(|id| read_holds[&id].clone()) .collect(); - self.controller - .compute - .create_dataflow(idx.cluster_id, df_desc, import_read_holds, None) - .unwrap_or_terminate("cannot fail to create dataflows"); - let read_hold = self .controller .compute - .acquire_read_hold(idx.cluster_id, id) - .expect("collection just created"); + .create_dataflow(idx.cluster_id, df_desc, import_read_holds, None) + .unwrap_or_terminate("cannot fail to create dataflows") + .into_element(); + read_holds.insert(id, read_hold); } @@ -2484,8 +2475,6 @@ impl Coordinator { }) .collect(); - let collection_ids: Vec<_> = collections.iter().map(|(id, _)| *id).collect(); - let register_ts = if self.controller.read_only() { self.get_local_read_ts().await } else { @@ -2496,7 +2485,8 @@ impl Coordinator { let storage_metadata = self.catalog.state().storage_metadata(); - self.controller + let read_holds = self + .controller .storage .create_collections_for_bootstrap( storage_metadata, @@ -2511,10 +2501,7 @@ impl Coordinator { self.apply_local_write(register_ts).await; } - self.controller - .storage_collections - .acquire_read_holds(collection_ids) - .expect("collections created above") + read_holds } /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the @@ -3073,11 +3060,8 @@ impl Coordinator { import_read_holds: Vec>, subscribe_target_replica: Option, ) { - // We must only install read policies for indexes, not for sinks. - // Sinks are write-only compute collections that don't have read policies. - let export_ids: Vec<_> = dataflow.exported_index_ids().collect(); - - self.controller + let read_holds = self + .controller .compute .create_dataflow( instance, @@ -3087,7 +3071,8 @@ impl Coordinator { ) .unwrap_or_terminate("dataflow creation cannot fail"); - for id in export_ids { + for hold in read_holds { + let id = hold.id(); self.initialize_compute_read_policy(id, instance, CompactionWindow::Default) .await; } diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 71e7a7b9eb6bb..63c36fc601967 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -32,6 +32,7 @@ use mz_expr::{ OptimizedMirRelationExpr, RowSetFinishing, }; use mz_ore::cast::CastFrom; +use mz_ore::collections::CollectionExt; use mz_ore::str::{separated, StrExt}; use mz_ore::tracing::OpenTelemetryContext; use mz_repr::explain::text::DisplayText; @@ -584,16 +585,12 @@ impl crate::coord::Coordinator { let import_read_holds = read_holds.clone_for(&id_bundle).into(); // Very important: actually create the dataflow (here, so we can destructure). - self.controller - .compute - .create_dataflow(compute_instance, dataflow, import_read_holds, None) - .unwrap_or_terminate("cannot fail to create dataflows"); - let read_hold = self .controller .compute - .acquire_read_hold(compute_instance, index_id) - .expect("collection just created"); + .create_dataflow(compute_instance, dataflow, import_read_holds, None) + .unwrap_or_terminate("cannot fail to create dataflows") + .into_element(); self.initialize_compute_read_policy( index_id, diff --git a/src/compute-client/src/as_of_selection.rs b/src/compute-client/src/as_of_selection.rs index a79dc26cd7405..5c824575bdbfa 100644 --- a/src/compute-client/src/as_of_selection.rs +++ b/src/compute-client/src/as_of_selection.rs @@ -847,7 +847,7 @@ mod tests { _register_ts: Option, _collections: Vec<(GlobalId, CollectionDescription)>, _migrated_storage_collections: &BTreeSet, - ) -> Result<(), StorageError> { + ) -> Result>, StorageError> { unimplemented!() } diff --git a/src/compute-client/src/controller.rs b/src/compute-client/src/controller.rs index 914badf45bebe..1f4458df19689 100644 --- a/src/compute-client/src/controller.rs +++ b/src/compute-client/src/controller.rs @@ -494,16 +494,20 @@ where ComputeGrpcClient: ComputeClient, { /// Create a compute instance. + /// + /// Returns a [`ReadHold`] for each log index installed on the instance. pub fn create_instance( &mut self, id: ComputeInstanceId, arranged_logs: BTreeMap, workload_class: Option, - ) -> Result<(), InstanceExists> { + ) -> Result>, InstanceExists> { if self.instances.contains_key(&id) { return Err(InstanceExists(id)); } + let log_ids: Vec<_> = arranged_logs.values().copied().collect(); + self.instances.insert( id, Instance::new( @@ -538,7 +542,12 @@ where config_params.workload_class = Some(workload_class); instance.update_configuration(config_params); - Ok(()) + let read_holds = log_ids + .into_iter() + .map(|id| instance.acquire_read_hold(id).expect("log index exists")) + .collect(); + + Ok(read_holds) } /// Updates a compute instance's workload class. @@ -717,19 +726,26 @@ where /// If a `subscribe_target_replica` is given, any subscribes exported by the dataflow are /// configured to target that replica, i.e., only subscribe responses sent by that replica are /// considered. + /// + /// Returns a [`ReadHold`] at the `as_of` for each index exported by the create dataflow. pub fn create_dataflow( &mut self, instance_id: ComputeInstanceId, dataflow: DataflowDescription, (), T>, import_read_holds: Vec>, subscribe_target_replica: Option, - ) -> Result<(), DataflowCreationError> { - self.instance_mut(instance_id)?.create_dataflow( - dataflow, - import_read_holds, - subscribe_target_replica, - )?; - Ok(()) + ) -> Result>, DataflowCreationError> { + let instance = self.instance_mut(instance_id)?; + + let index_ids: Vec<_> = dataflow.exported_index_ids().collect(); + instance.create_dataflow(dataflow, import_read_holds, subscribe_target_replica)?; + + let read_holds = index_ids + .into_iter() + .map(|id| instance.acquire_read_hold(id).expect("index just created")) + .collect(); + + Ok(read_holds) } /// Drop the read capability for the given collections and allow their resources to be diff --git a/src/controller/src/clusters.rs b/src/controller/src/clusters.rs index 8efe433582f25..e1c24ee90d87b 100644 --- a/src/controller/src/clusters.rs +++ b/src/controller/src/clusters.rs @@ -37,6 +37,7 @@ use mz_ore::instrument; use mz_ore::task::{self, AbortOnDropHandle}; use mz_repr::adt::numeric::Numeric; use mz_repr::GlobalId; +use mz_storage_types::read_holds::ReadHold; use regex::Regex; use serde::{Deserialize, Serialize}; use tokio::time; @@ -338,15 +339,18 @@ where /// A cluster is a combination of a storage instance and a compute instance. /// A cluster has zero or more replicas; each replica colocates the storage /// and compute layers on the same physical resources. + /// + /// Returns a [`ReadHold`] for each log index installed on the cluster. pub fn create_cluster( &mut self, id: ClusterId, config: ClusterConfig, - ) -> Result<(), anyhow::Error> { + ) -> Result>, anyhow::Error> { self.storage.create_instance(id); - self.compute - .create_instance(id, config.arranged_logs, config.workload_class)?; - Ok(()) + let read_holds = + self.compute + .create_instance(id, config.arranged_logs, config.workload_class)?; + Ok(read_holds) } /// Updates the workload class for a cluster. diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index e2583ff1a9b89..74690ca8761df 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -415,12 +415,14 @@ pub trait StorageController: Debug { /// might later give non-tables the same treatment, but hold off on that initially.) Callers /// must provide a Some if any of the collections is a table. A None may be given if none of the /// collections are a table (i.e. all materialized views, sources, etc). + /// + /// Returns a [`ReadHold`] for each created collection. async fn create_collections( &mut self, storage_metadata: &StorageMetadata, register_ts: Option, collections: Vec<(GlobalId, CollectionDescription)>, - ) -> Result<(), StorageError> { + ) -> Result>, StorageError> { self.create_collections_for_bootstrap( storage_metadata, register_ts, @@ -440,7 +442,7 @@ pub trait StorageController: Debug { register_ts: Option, collections: Vec<(GlobalId, CollectionDescription)>, migrated_storage_collections: &BTreeSet, - ) -> Result<(), StorageError>; + ) -> Result>, StorageError>; /// Check that the ingestion associated with `id` can use the provided /// [`SourceDesc`]. diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 9b170b95b57b1..d0f67b2c669f6 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -201,12 +201,14 @@ pub trait StorageCollections: Debug { /// but hold off on that initially.) Callers must provide a Some if any of /// the collections is a table. A None may be given if none of the /// collections are a table (i.e. all materialized views, sources, etc). + /// + /// Returns a [`ReadHold`] for each created collection. async fn create_collections( &self, storage_metadata: &StorageMetadata, register_ts: Option, collections: Vec<(GlobalId, CollectionDescription)>, - ) -> Result<(), StorageError> { + ) -> Result>, StorageError> { self.create_collections_for_bootstrap( storage_metadata, register_ts, @@ -226,7 +228,7 @@ pub trait StorageCollections: Debug { register_ts: Option, collections: Vec<(GlobalId, CollectionDescription)>, migrated_storage_collections: &BTreeSet, - ) -> Result<(), StorageError>; + ) -> Result>, StorageError>; /// Alters the identified ingestion to use the provided [`SourceDesc`]. /// @@ -1338,7 +1340,7 @@ where register_ts: Option, mut collections: Vec<(GlobalId, CollectionDescription)>, migrated_storage_collections: &BTreeSet, - ) -> Result<(), StorageError> { + ) -> Result>, StorageError> { let is_in_txns = |id, metadata: &CollectionMetadata| { metadata.txns_shard.is_some() && !(self.read_only && migrated_storage_collections.contains(&id)) @@ -1507,6 +1509,8 @@ where // Reorder in dependency order. to_register.sort_by_key(|(id, ..)| *id); + let collection_ids: Vec<_> = to_register.iter().map(|(id, ..)| *id).collect(); + // We hold this lock for a very short amount of time, just doing some // hashmap inserts and unbounded channel sends. let mut self_collections = self.collections.lock().expect("lock poisoned"); @@ -1670,7 +1674,10 @@ where self.synchronize_finalized_shards(storage_metadata); - Ok(()) + let read_holds = self + .acquire_read_holds(collection_ids) + .expect("collections just created"); + Ok(read_holds) } async fn alter_ingestion_source_desc( diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 9dd7b60195794..1601d215b6787 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -519,11 +519,12 @@ where register_ts: Option, mut collections: Vec<(GlobalId, CollectionDescription)>, migrated_storage_collections: &BTreeSet, - ) -> Result<(), StorageError> { + ) -> Result>, StorageError> { self.migrated_storage_collections .clone_from(migrated_storage_collections); - self.storage_collections + let read_holds = self + .storage_collections .create_collections_for_bootstrap( storage_metadata, register_ts.clone(), @@ -914,7 +915,7 @@ where }; } - Ok(()) + Ok(read_holds) } fn check_alter_ingestion_source_desc( From 28409b63a717f5f5e66949d8dd3ac585ee7eed05 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 12 Sep 2024 19:14:51 +0200 Subject: [PATCH 6/6] coord: give read holds to `initialize_*_read_policy` This commit makes the `initialize_{storage,compute}_read_policy` methods take read holds in their arguments, instead of acquiring new read holds themselves. Read holds are readily available at all their call sites, so we can just use those instead of acquiring new ones. --- src/adapter/src/coord.rs | 15 +++--- src/adapter/src/coord/peek.rs | 2 +- src/adapter/src/coord/read_policy.rs | 34 ++++-------- src/adapter/src/coord/sequencer/cluster.rs | 9 ++-- src/adapter/src/coord/sequencer/inner.rs | 52 ++++++++++++------- .../inner/create_materialized_view.rs | 7 +-- 6 files changed, 60 insertions(+), 59 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 7865765057353..7070c2b307d7f 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1941,12 +1941,14 @@ impl Coordinator { ); } } + let read_hold = read_holds[&entry.id].clone(); let cw = policy.expect("sources have a compaction window"); - self.initialize_storage_read_policy(entry.id, cw).await; + self.initialize_storage_read_policy(read_hold, cw).await; } CatalogItem::Table(_) => { + let read_hold = read_holds[&entry.id].clone(); let cw = policy.expect("tables have a compaction window"); - self.initialize_storage_read_policy(entry.id, cw).await; + self.initialize_storage_read_policy(read_hold, cw).await; } CatalogItem::Index(idx) => { let id = entry.id; @@ -1987,14 +1989,16 @@ impl Coordinator { read_holds.insert(id, read_hold); } + let read_hold = read_holds[&id].clone(); let cw = policy.expect("indexes have a compaction window"); - self.initialize_compute_read_policy(id, idx.cluster_id, cw) + self.initialize_compute_read_policy(read_hold, idx.cluster_id, cw) .await; } CatalogItem::View(_) => (), CatalogItem::MaterializedView(mview) => { + let read_hold = read_holds[&entry.id].clone(); let cw = policy.expect("materialized views have a compaction window"); - self.initialize_storage_read_policy(entry.id, cw).await; + self.initialize_storage_read_policy(read_hold, cw).await; let mut df_desc = self .catalog() @@ -3072,8 +3076,7 @@ impl Coordinator { .unwrap_or_terminate("dataflow creation cannot fail"); for hold in read_holds { - let id = hold.id(); - self.initialize_compute_read_policy(id, instance, CompactionWindow::Default) + self.initialize_compute_read_policy(hold, instance, CompactionWindow::Default) .await; } } diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 63c36fc601967..3fb32e3b9e694 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -593,7 +593,7 @@ impl crate::coord::Coordinator { .into_element(); self.initialize_compute_read_policy( - index_id, + read_hold.clone(), compute_instance, // Disable compaction so that nothing can compact before the peek occurs below. CompactionWindow::DisableCompaction, diff --git a/src/adapter/src/coord/read_policy.rs b/src/adapter/src/coord/read_policy.rs index 4a564152fe39d..27a8a0f46b9dc 100644 --- a/src/adapter/src/coord/read_policy.rs +++ b/src/adapter/src/coord/read_policy.rs @@ -221,22 +221,16 @@ impl crate::coord::Coordinator { /// with a read policy that allows no compaction. pub(crate) async fn initialize_storage_read_policy( &mut self, - id: GlobalId, + mut read_hold: ReadHold, compaction_window: CompactionWindow, ) { + let id = read_hold.id(); + // Install read hold in the Coordinator's timeline state. if let TimelineContext::TimelineDependent(timeline) = self.get_timeline_context(id) { - let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await; + let TimelineState { oracle, read_holds } = self.ensure_timeline_state(&timeline).await; let read_ts = oracle.read_ts().await; - - let mut read_hold = self - .controller - .storage - .acquire_read_hold(id) - .expect("missing storage collection"); let _ = read_hold.try_downgrade(Antichain::from_elem(read_ts)); - - let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await; read_holds.insert_storage_collection(id, read_hold); }; @@ -248,11 +242,11 @@ impl crate::coord::Coordinator { pub(crate) async fn initialize_storage_read_policies( &mut self, - ids: impl IntoIterator, + read_holds: impl IntoIterator>, compaction_window: CompactionWindow, ) { - for id in ids { - self.initialize_storage_read_policy(id, compaction_window) + for hold in read_holds { + self.initialize_storage_read_policy(hold, compaction_window) .await; } } @@ -264,23 +258,17 @@ impl crate::coord::Coordinator { /// with a read policy that allows no compaction. pub(crate) async fn initialize_compute_read_policy( &mut self, - id: GlobalId, + mut read_hold: ReadHold, instance: ComputeInstanceId, compaction_window: CompactionWindow, ) { + let id = read_hold.id(); + // Install read hold in the Coordinator's timeline state. if let TimelineContext::TimelineDependent(timeline) = self.get_timeline_context(id) { - let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await; + let TimelineState { oracle, read_holds } = self.ensure_timeline_state(&timeline).await; let read_ts = oracle.read_ts().await; - - let mut read_hold = self - .controller - .compute - .acquire_read_hold(instance, id) - .expect("missing compute collection"); let _ = read_hold.try_downgrade(Antichain::from_elem(read_ts)); - - let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await; read_holds.insert_compute_collection((instance, id), read_hold); }; diff --git a/src/adapter/src/coord/sequencer/cluster.rs b/src/adapter/src/coord/sequencer/cluster.rs index b3cf08fa21792..f41eb5e1db39d 100644 --- a/src/adapter/src/coord/sequencer/cluster.rs +++ b/src/adapter/src/coord/sequencer/cluster.rs @@ -371,11 +371,8 @@ impl Coordinator { .. } = self; let cluster = catalog.get_cluster(cluster_id); - let cluster_id = cluster.id; - let introspection_source_ids: Vec<_> = - cluster.log_indexes.iter().map(|(_, id)| *id).collect(); - controller + let log_read_holds = controller .create_cluster( cluster_id, mz_controller::clusters::ClusterConfig { @@ -390,8 +387,8 @@ impl Coordinator { self.create_cluster_replica(cluster_id, replica_id).await; } - for id in introspection_source_ids { - self.initialize_compute_read_policy(id, cluster_id, CompactionWindow::Default) + for hold in log_read_holds { + self.initialize_compute_read_policy(hold, cluster_id, CompactionWindow::Default) .await; } } diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 5c9e9a1d3a3f8..b300f91570174 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -593,13 +593,16 @@ impl Coordinator { let storage_metadata = coord.catalog.state().storage_metadata(); - coord + let read_holds = coord .controller .storage .create_collections(storage_metadata, None, collections) .await .unwrap_or_terminate("cannot fail to create collections"); + let mut read_holds: BTreeMap<_, _> = + read_holds.into_iter().map(|r| (r.id(), r)).collect(); + // It is _very_ important that we only initialize read policies // after we have created all the sources/collections. Some of // the sources created in this collection might have @@ -620,9 +623,12 @@ impl Coordinator { .state() .source_compaction_windows(source_ids); for (compaction_window, ids) in read_policies { - coord - .initialize_storage_read_policies(ids, compaction_window) - .await; + for id in ids { + let hold = read_holds.remove(&id).expect("collection just created"); + coord + .initialize_storage_read_policy(hold, compaction_window) + .await; + } } }) .await; @@ -956,7 +962,7 @@ impl Coordinator { DataSourceOther::TableWrites, ); let storage_metadata = coord.catalog.state().storage_metadata(); - coord + let read_hold = coord .controller .storage .create_collections( @@ -965,17 +971,14 @@ impl Coordinator { vec![(table_id, collection_desc)], ) .await - .unwrap_or_terminate("cannot fail to create collections"); + .unwrap_or_terminate("cannot fail to create collections") + .into_element(); coord.apply_local_write(register_ts).await; - coord - .initialize_storage_read_policy( - table_id, - table - .custom_logical_compaction_window - .unwrap_or(CompactionWindow::Default), - ) - .await; + let cw = table + .custom_logical_compaction_window + .unwrap_or(CompactionWindow::Default); + coord.initialize_storage_read_policy(read_hold, cw).await; } TableDataSource::DataSource(data_source) => { match data_source { @@ -1004,7 +1007,7 @@ impl Coordinator { status_collection_id, }; let storage_metadata = coord.catalog.state().storage_metadata(); - coord + let read_holds = coord .controller .storage .create_collections( @@ -1015,14 +1018,22 @@ impl Coordinator { .await .unwrap_or_terminate("cannot fail to create collections"); + let mut read_holds: BTreeMap<_, _> = + read_holds.into_iter().map(|r| (r.id(), r)).collect(); + let read_policies = coord .catalog() .state() .source_compaction_windows(vec![table_id]); for (compaction_window, ids) in read_policies { - coord - .initialize_storage_read_policies(ids, compaction_window) - .await; + for id in ids { + let hold = read_holds + .remove(&id) + .expect("collection just created"); + coord + .initialize_storage_read_policy(hold, compaction_window) + .await; + } } } _ => unreachable!("CREATE TABLE data source got {:?}", data_source), @@ -3895,14 +3906,15 @@ impl Coordinator { let storage_metadata = self.catalog.state().storage_metadata(); - self.controller + let read_holds = self + .controller .storage .create_collections(storage_metadata, None, collections) .await .unwrap_or_terminate("cannot fail to create collections"); self.initialize_storage_read_policies( - source_ids, + read_holds, source_compaction_window.unwrap_or(CompactionWindow::Default), ) .await; diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index 48ecc3607a9c8..3a2489770a759 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -662,7 +662,7 @@ impl Coordinator { let storage_metadata = coord.catalog.state().storage_metadata(); // Announce the creation of the materialized view source. - coord + let read_hold = coord .controller .storage .create_collections( @@ -679,11 +679,12 @@ impl Coordinator { )], ) .await - .unwrap_or_terminate("cannot fail to append"); + .unwrap_or_terminate("cannot fail to append") + .into_element(); coord .initialize_storage_read_policy( - sink_id, + read_hold, compaction_window.unwrap_or(CompactionWindow::Default), ) .await;