Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 89 additions & 74 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use mz_adapter_types::dyncfgs::{
};
use mz_compute_client::as_of_selection;
use mz_ore::channel::trigger;
use mz_ore::collections::CollectionExt;
use mz_sql::names::{ResolvedIds, SchemaSpecifier};
use mz_sql::session::user::User;
use mz_storage_types::read_holds::ReadHold;
Expand Down Expand Up @@ -496,6 +497,7 @@ pub struct PeekStageOptimize {
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
determination: TimestampDetermination<mz_repr::Timestamp>,
read_holds: ReadHolds<mz_repr::Timestamp>,
optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
/// An optional context set iff the state machine is initiated from
/// sequencing an EXPLAIN for this statement.
Expand All @@ -511,6 +513,7 @@ pub struct PeekStageFinish {
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
determination: TimestampDetermination<mz_repr::Timestamp>,
read_holds: ReadHolds<mz_repr::Timestamp>,
cluster_id: ComputeInstanceId,
finishing: RowSetFinishing,
/// When present, an optimizer trace to be used for emitting a plan insights
Expand All @@ -528,6 +531,7 @@ pub struct PeekStageCopyTo {
global_lir_plan: optimize::copy_to::GlobalLirPlan,
optimization_finished_at: EpochMillis,
source_ids: BTreeSet<GlobalId>,
read_holds: ReadHolds<mz_repr::Timestamp>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -808,6 +812,7 @@ pub struct SubscribeFinish {
replica_id: Option<ReplicaId>,
plan: plan::SubscribePlan,
global_lir_plan: optimize::subscribe::GlobalLirPlan,
read_holds: ReadHolds<Timestamp>,
dependency_ids: BTreeSet<GlobalId>,
}

Expand Down Expand Up @@ -1810,13 +1815,15 @@ impl Coordinator {
self.controller
.set_arrangement_exert_proportionality(exert_prop);

let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
Default::default();
// Keep read holds for each storage and compute collection we create during bootstrapping.
// We need these to pass copies of them to `create_dataflow` commands, and to ensure
// collections don't get compacted before we have created all their dependants.
let mut read_holds: BTreeMap<GlobalId, ReadHold<_>> = Default::default();

let enable_worker_core_affinity =
self.catalog().system_config().enable_worker_core_affinity();
for instance in self.catalog.clusters() {
self.controller.create_cluster(
let log_holds = self.controller.create_cluster(
instance.id,
ClusterConfig {
arranged_logs: instance.log_indexes.clone(),
Expand All @@ -1833,6 +1840,8 @@ impl Coordinator {
enable_worker_core_affinity,
)?;
}

read_holds.extend(log_holds.into_iter().map(|r| (r.id(), r)));
}

info!(
Expand All @@ -1842,8 +1851,10 @@ impl Coordinator {

let init_storage_collections_start = Instant::now();
info!("startup: coordinator init: bootstrap: storage collections init beginning");
self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
let storage_holds = self
.bootstrap_storage_collections(&migrated_storage_collections_0dt)
.await;
read_holds.extend(storage_holds.into_iter().map(|r| (r.id(), r)));
info!(
"startup: coordinator init: bootstrap: storage collections init complete in {:?}",
init_storage_collections_start.elapsed()
Expand All @@ -1863,7 +1874,7 @@ impl Coordinator {
// `bootstrap_dataflow_plans`.
let bootstrap_as_ofs_start = Instant::now();
info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
self.bootstrap_dataflow_as_ofs(&read_holds).await;
info!(
"startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
bootstrap_as_ofs_start.elapsed()
Expand Down Expand Up @@ -1930,40 +1941,28 @@ impl Coordinator {
);
}
}
policies_to_set
.entry(policy.expect("sources have a compaction window"))
.or_insert_with(Default::default)
.storage_ids
.insert(entry.id());
let read_hold = read_holds[&entry.id].clone();
let cw = policy.expect("sources have a compaction window");
self.initialize_storage_read_policy(read_hold, cw).await;
}
CatalogItem::Table(_) => {
policies_to_set
.entry(policy.expect("tables have a compaction window"))
.or_insert_with(Default::default)
.storage_ids
.insert(entry.id());
let read_hold = read_holds[&entry.id].clone();
let cw = policy.expect("tables have a compaction window");
self.initialize_storage_read_policy(read_hold, cw).await;
}
CatalogItem::Index(idx) => {
let policy_entry = policies_to_set
.entry(policy.expect("indexes have a compaction window"))
.or_insert_with(Default::default);

if logs.contains(&idx.on) {
policy_entry
.compute_ids
.entry(idx.cluster_id)
.or_insert_with(BTreeSet::new)
.insert(entry.id());
} else {
let id = entry.id;

if !logs.contains(&idx.on) {
let df_desc = self
.catalog()
.try_get_physical_plan(&entry.id())
.try_get_physical_plan(&id)
.expect("added in `bootstrap_dataflow_plans`")
.clone();

let df_meta = self
.catalog()
.try_get_dataflow_metainfo(&entry.id())
.try_get_dataflow_metainfo(&id)
.expect("added in `bootstrap_dataflow_plans`");

if self.catalog().state().system_config().enable_mz_notices() {
Expand All @@ -1975,27 +1974,31 @@ impl Coordinator {
);
}

// What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
// but we cannot call that as it will also downgrade the read hold on the index.
policy_entry
.compute_ids
.entry(idx.cluster_id)
.or_insert_with(Default::default)
.extend(df_desc.export_ids());
let import_read_holds = df_desc
.import_ids()
.map(|id| read_holds[&id].clone())
.collect();

self.controller
let read_hold = self
.controller
.compute
.create_dataflow(idx.cluster_id, df_desc, None)
.unwrap_or_terminate("cannot fail to create dataflows");
.create_dataflow(idx.cluster_id, df_desc, import_read_holds, None)
.unwrap_or_terminate("cannot fail to create dataflows")
.into_element();

read_holds.insert(id, read_hold);
}

let read_hold = read_holds[&id].clone();
let cw = policy.expect("indexes have a compaction window");
self.initialize_compute_read_policy(read_hold, idx.cluster_id, cw)
.await;
}
CatalogItem::View(_) => (),
CatalogItem::MaterializedView(mview) => {
policies_to_set
.entry(policy.expect("materialized views have a compaction window"))
.or_insert_with(Default::default)
.storage_ids
.insert(entry.id());
let read_hold = read_holds[&entry.id].clone();
let cw = policy.expect("materialized views have a compaction window");
self.initialize_storage_read_policy(read_hold, cw).await;

let mut df_desc = self
.catalog()
Expand Down Expand Up @@ -2031,7 +2034,13 @@ impl Coordinator {
);
}

self.ship_dataflow(df_desc, mview.cluster_id, None).await;
let import_read_holds = df_desc
.import_ids()
.map(|id| read_holds[&id].clone())
.collect();

self.ship_dataflow(df_desc, mview.cluster_id, import_read_holds, None)
.await;
}
CatalogItem::Sink(sink) => {
let id = entry.id();
Expand Down Expand Up @@ -2078,13 +2087,8 @@ impl Coordinator {
}
}

// Having installed all entries, creating all constraints, we can now drop read holds and
// relax read policies.
drop(dataflow_read_holds);
// TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
Copy link
Contributor Author

@teskje teskje Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO suggested that calling initialize_read_policies in a loop is somehow a problem, which I'm not sure why it would be. I assume we are worried about start up times, but is there anything that would make an initialize_read_policies invocation slow?

for (cw, policies) in policies_to_set {
self.initialize_read_policies(&policies, cw).await;
}
// Having installed all entries, creating all constraints, we can now drop the read holds.
drop(read_holds);

// Expose mapping from T-shirt sizes to actual sizes
builtin_table_updates.extend(
Expand Down Expand Up @@ -2382,7 +2386,7 @@ impl Coordinator {
async fn bootstrap_storage_collections(
&mut self,
migrated_storage_collections: &BTreeSet<GlobalId>,
) {
) -> Vec<ReadHold<Timestamp>> {
let catalog = self.catalog();
let source_status_collection_id = catalog
.resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
Expand Down Expand Up @@ -2485,7 +2489,8 @@ impl Coordinator {

let storage_metadata = self.catalog.state().storage_metadata();

self.controller
let read_holds = self
.controller
.storage
.create_collections_for_bootstrap(
storage_metadata,
Expand All @@ -2499,6 +2504,8 @@ impl Coordinator {
if !self.controller.read_only() {
self.apply_local_write(register_ts).await;
}

read_holds
}

/// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
Expand Down Expand Up @@ -2649,14 +2656,13 @@ impl Coordinator {

/// Selects for each compute dataflow an as-of suitable for bootstrapping it.
///
/// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
/// in place and that must not be dropped before all compute dataflows have been created with
/// the compute controller.
///
/// This method expects all storage collections and dataflow plans to be available, so it must
/// run after [`Coordinator::bootstrap_storage_collections`] and
/// [`Coordinator::bootstrap_dataflow_plans`].
async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
async fn bootstrap_dataflow_as_ofs(
&mut self,
read_holds: &BTreeMap<GlobalId, ReadHold<Timestamp>>,
) {
let mut catalog_ids = Vec::new();
let mut dataflows = Vec::new();
let mut read_policies = BTreeMap::new();
Expand All @@ -2673,9 +2679,10 @@ impl Coordinator {
}

let read_ts = self.get_local_read_ts().await;
let read_holds = as_of_selection::run(
as_of_selection::run(
&mut dataflows,
&read_policies,
read_holds,
&*self.controller.storage_collections,
read_ts,
);
Expand All @@ -2684,8 +2691,6 @@ impl Coordinator {
for (id, plan) in catalog_ids.into_iter().zip(dataflows) {
catalog.set_physical_plan(id, plan);
}

read_holds
}

/// Serves the coordinator, receiving commands from users over `cmd_rx`
Expand Down Expand Up @@ -3056,34 +3061,41 @@ impl Coordinator {
&mut self,
dataflow: DataflowDescription<Plan>,
instance: ComputeInstanceId,
import_read_holds: Vec<ReadHold<Timestamp>>,
subscribe_target_replica: Option<ReplicaId>,
) {
// We must only install read policies for indexes, not for sinks.
// Sinks are write-only compute collections that don't have read policies.
let export_ids = dataflow.exported_index_ids().collect();

self.controller
let read_holds = self
.controller
.compute
.create_dataflow(instance, dataflow, subscribe_target_replica)
.create_dataflow(
instance,
dataflow,
import_read_holds,
subscribe_target_replica,
)
.unwrap_or_terminate("dataflow creation cannot fail");

self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
.await;
for hold in read_holds {
self.initialize_compute_read_policy(hold, instance, CompactionWindow::Default)
.await;
}
}

/// Like `ship_dataflow`, but also await on builtin table updates.
pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
&mut self,
dataflow: DataflowDescription<Plan>,
instance: ComputeInstanceId,
import_read_holds: Vec<ReadHold<Timestamp>>,
notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
) {
if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, import_read_holds, None);
let ((), ()) =
futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
} else {
self.ship_dataflow(dataflow, instance, None).await;
self.ship_dataflow(dataflow, instance, import_read_holds, None)
.await;
}
}

Expand Down Expand Up @@ -3387,11 +3399,14 @@ impl Coordinator {
// prevent us from incorrectly teaching those functions how to return errors (which has
// happened twice and is the motivation for this test).

// An arbitrary compute instance ID to satisfy the function calls below. Note that
// this only works because this function will never run.
// An arbitrary compute instance ID and empty read holds to satisfy the function calls
// below. Note that this only works because this function will never run.
let compute_instance = ComputeInstanceId::User(1);
let import_read_holds = Default::default();

let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
let _: () = self
.ship_dataflow(dataflow, compute_instance, import_read_holds, None)
.await;
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/adapter/src/coord/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl Coordinator {
// dataflow for it.
let response = if self.introspection_subscribes.contains_key(&subscribe_id) {
let (df_desc, _df_meta) = global_lir_plan.unapply();
self.ship_dataflow(df_desc, cluster_id, Some(replica_id))
self.ship_dataflow(df_desc, cluster_id, read_holds.into(), Some(replica_id))
.await;

Ok(StageResult::Response(
Expand All @@ -310,7 +310,6 @@ impl Coordinator {
))
};

drop(read_holds);
response
}

Expand Down
Loading
Loading