From bdf9573960cece48e7811db7ef777b973d355fe6 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Fri, 15 Aug 2025 15:49:21 +0200 Subject: [PATCH 1/2] Move some timeline fns from Coordinator to Catalog --- src/adapter/src/catalog.rs | 236 +++++++++++++++++- src/adapter/src/coord/command_handler.rs | 5 +- src/adapter/src/coord/ddl.rs | 1 + src/adapter/src/coord/read_policy.rs | 4 +- .../inner/create_materialized_view.rs | 3 +- .../src/coord/sequencer/inner/create_view.rs | 3 +- .../sequencer/inner/explain_timestamp.rs | 4 +- src/adapter/src/coord/sequencer/inner/peek.rs | 4 +- .../src/coord/sequencer/inner/subscribe.rs | 4 +- src/adapter/src/coord/timeline.rs | 233 +---------------- 10 files changed, 254 insertions(+), 243 deletions(-) diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index d6eba3bebcc3a..3e7eeeb2cf5ae 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -40,15 +40,15 @@ use mz_catalog::durable::{ use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions}; use mz_catalog::memory::error::{Error, ErrorKind}; use mz_catalog::memory::objects::{ - CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, - NetworkPolicy, Role, RoleAuth, Schema, + CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, ContinualTask, + Database, MaterializedView, NetworkPolicy, Role, RoleAuth, Schema, View, }; use mz_compute_types::dataflows::DataflowDescription; use mz_controller::clusters::ReplicaLocation; use mz_controller_types::{ClusterId, ReplicaId}; -use mz_expr::OptimizedMirRelationExpr; +use mz_expr::{CollectionPlan, OptimizedMirRelationExpr}; use mz_license_keys::ValidatedLicenseKey; -use mz_ore::collections::HashSet; +use mz_ore::collections::{CollectionExt, HashSet}; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME}; use mz_ore::result::ResultExt as _; @@ -80,6 +80,7 @@ use mz_sql::session::vars::SystemVars; use mz_sql_parser::ast::QualifiedReplica; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection}; +use mz_storage_types::sources::Timeline; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::notice::OptimizerNotice; use smallvec::SmallVec; @@ -99,7 +100,7 @@ use crate::command::CatalogDump; use crate::coord::TargetCluster; use crate::session::{PreparedStatement, Session}; use crate::util::ResultExt; -use crate::{AdapterError, AdapterNotice, ExecuteResponse}; +use crate::{AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, TimelineContext}; mod builtin_table_updates; pub(crate) mod consistency; @@ -1511,6 +1512,231 @@ impl Catalog { let builtin_table_updates = self.state.apply_updates(updates)?; Ok(builtin_table_updates) } + + /// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists. + pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext { + let entry = self.get_entry(&id); + self.validate_timeline_context(entry.global_ids()) + .expect("impossible for a single object to belong to incompatible timeline contexts") + } + + /// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists. + pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext { + self.validate_timeline_context(vec![id]) + .expect("impossible for a single object to belong to incompatible timeline contexts") + } + + /// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id + /// belongs to. + pub fn partition_ids_by_timeline_context( + &self, + id_bundle: &CollectionIdBundle, + ) -> impl Iterator + use<> { + let mut res: BTreeMap = BTreeMap::new(); + + for gid in &id_bundle.storage_ids { + let timeline_context = self.get_timeline_context_for_global_id(*gid); + res.entry(timeline_context) + .or_default() + .storage_ids + .insert(*gid); + } + + for (compute_instance, ids) in &id_bundle.compute_ids { + for gid in ids { + let timeline_context = self.get_timeline_context_for_global_id(*gid); + res.entry(timeline_context) + .or_default() + .compute_ids + .entry(*compute_instance) + .or_default() + .insert(*gid); + } + } + + res.into_iter() + } + + /// Returns an id bundle containing all the ids in the give timeline. + pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle { + let mut id_bundle = CollectionIdBundle::default(); + for entry in self.entries() { + if let TimelineContext::TimelineDependent(entry_timeline) = + self.get_timeline_context(entry.id()) + { + if timeline == &entry_timeline { + match entry.item() { + CatalogItem::Table(table) => { + id_bundle.storage_ids.extend(table.global_ids()); + } + CatalogItem::Source(source) => { + id_bundle.storage_ids.insert(source.global_id()); + } + CatalogItem::MaterializedView(mv) => { + id_bundle.storage_ids.insert(mv.global_id()); + } + CatalogItem::ContinualTask(ct) => { + id_bundle.storage_ids.insert(ct.global_id()); + } + CatalogItem::Index(index) => { + id_bundle + .compute_ids + .entry(index.cluster_id) + .or_default() + .insert(index.global_id()); + } + CatalogItem::View(_) + | CatalogItem::Sink(_) + | CatalogItem::Type(_) + | CatalogItem::Func(_) + | CatalogItem::Secret(_) + | CatalogItem::Connection(_) + | CatalogItem::Log(_) => {} + } + } + } + } + id_bundle + } + + /// Return an error if the ids are from incompatible [`TimelineContext`]s. This should + /// be used to prevent users from doing things that are either meaningless + /// (joining data from timelines that have similar numbers with different + /// meanings like two separate debezium topics) or will never complete (joining + /// cdcv2 and realtime data). + pub(crate) fn validate_timeline_context( + &self, + ids: I, + ) -> Result + where + I: IntoIterator, + { + let items_ids = ids + .into_iter() + .filter_map(|gid| self.try_resolve_item_id(&gid)); + let mut timeline_contexts: Vec<_> = + self.get_timeline_contexts(items_ids).into_iter().collect(); + // If there's more than one timeline, we will not produce meaningful + // data to a user. Take, for example, some realtime source and a debezium + // consistency topic source. The realtime source uses something close to now + // for its timestamps. The debezium source starts at 1 and increments per + // transaction. We don't want to choose some timestamp that is valid for both + // of these because the debezium source will never get to the same value as the + // realtime source's "milliseconds since Unix epoch" value. And even if it did, + // it's not meaningful to join just because those two numbers happen to be the + // same now. + // + // Another example: assume two separate debezium consistency topics. Both + // start counting at 1 and thus have similarish numbers that probably overlap + // a lot. However it's still not meaningful to join those two at a specific + // transaction counter number because those counters are unrelated to the + // other. + let timelines: Vec<_> = timeline_contexts + .extract_if(.., |timeline_context| timeline_context.contains_timeline()) + .collect(); + + // A single or group of objects may contain multiple compatible timeline + // contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all + // types of contexts. We choose the strongest context level to return back. + if timelines.len() > 1 { + Err(AdapterError::Unsupported( + "multiple timelines within one dataflow", + )) + } else if timelines.len() == 1 { + Ok(timelines.into_element()) + } else if timeline_contexts + .iter() + .contains(&TimelineContext::TimestampDependent) + { + Ok(TimelineContext::TimestampDependent) + } else { + Ok(TimelineContext::TimestampIndependent) + } + } + + /// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist. + fn get_timeline_contexts(&self, ids: I) -> BTreeSet + where + I: IntoIterator, + { + let mut seen: BTreeSet = BTreeSet::new(); + let mut timelines: BTreeSet = BTreeSet::new(); + + // Recurse through IDs to find all sources and tables, adding new ones to + // the set until we reach the bottom. + let mut ids: Vec<_> = ids.into_iter().collect(); + while let Some(id) = ids.pop() { + // Protect against possible infinite recursion. Not sure if it's possible, but + // a cheap prevention for the future. + if !seen.insert(id) { + continue; + } + if let Some(entry) = self.try_get_entry(&id) { + match entry.item() { + CatalogItem::Source(source) => { + timelines + .insert(TimelineContext::TimelineDependent(source.timeline.clone())); + } + CatalogItem::Index(index) => { + let on_id = self.resolve_item_id(&index.on); + ids.push(on_id); + } + CatalogItem::View(View { optimized_expr, .. }) => { + // If the definition contains a temporal function, the timeline must + // be timestamp dependent. + if optimized_expr.contains_temporal() { + timelines.insert(TimelineContext::TimestampDependent); + } else { + timelines.insert(TimelineContext::TimestampIndependent); + } + let item_ids = optimized_expr + .depends_on() + .into_iter() + .map(|gid| self.resolve_item_id(&gid)); + ids.extend(item_ids); + } + CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => { + // In some cases the timestamp selected may not affect the answer to a + // query, but it may affect our ability to query the materialized view. + // Materialized views must durably materialize the result of a query, even + // for constant queries. If we choose a timestamp larger than the upper, + // which represents the current progress of the view, then the query will + // need to block and wait for the materialized view to advance. + timelines.insert(TimelineContext::TimestampDependent); + let item_ids = optimized_expr + .depends_on() + .into_iter() + .map(|gid| self.resolve_item_id(&gid)); + ids.extend(item_ids); + } + CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => { + // See comment in MaterializedView + timelines.insert(TimelineContext::TimestampDependent); + let item_ids = raw_expr + .depends_on() + .into_iter() + .map(|gid| self.resolve_item_id(&gid)); + ids.extend(item_ids); + } + CatalogItem::Table(table) => { + timelines.insert(TimelineContext::TimelineDependent(table.timeline())); + } + CatalogItem::Log(_) => { + timelines.insert(TimelineContext::TimelineDependent( + Timeline::EpochMilliseconds, + )); + } + CatalogItem::Sink(_) + | CatalogItem::Type(_) + | CatalogItem::Func(_) + | CatalogItem::Secret(_) + | CatalogItem::Connection(_) => {} + } + } + } + + timelines + } } pub fn is_reserved_name(name: &str) -> bool { diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index b9c9b5ddbf8e3..ca1845faaea98 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -1191,8 +1191,9 @@ impl Coordinator { .iter() .any(materialized_view_option_contains_temporal) { - let timeline_context = - self.validate_timeline_context(resolved_ids.collections().copied())?; + let timeline_context = self + .catalog() + .validate_timeline_context(resolved_ids.collections().copied())?; // We default to EpochMilliseconds, similarly to `determine_timestamp_for`, // but even in the TimestampIndependent case. diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 01e5e97f1379e..43c6ef61c4feb 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -522,6 +522,7 @@ impl Coordinator { clusters_to_drop.clone(), ); let timeline_associations: BTreeMap<_, _> = self + .catalog() .partition_ids_by_timeline_context(&collection_id_bundle) .filter_map(|(context, bundle)| { let TimelineContext::TimelineDependent(timeline) = context else { diff --git a/src/adapter/src/coord/read_policy.rs b/src/adapter/src/coord/read_policy.rs index e5ca202169c56..d17ba5f211378 100644 --- a/src/adapter/src/coord/read_policy.rs +++ b/src/adapter/src/coord/read_policy.rs @@ -253,7 +253,9 @@ impl crate::coord::Coordinator { compaction_window: CompactionWindow, ) { // Install read holds in the Coordinator's timeline state. - for (timeline_context, id_bundle) in self.partition_ids_by_timeline_context(id_bundle) { + for (timeline_context, id_bundle) in + self.catalog().partition_ids_by_timeline_context(id_bundle) + { if let TimelineContext::TimelineDependent(timeline) = timeline_context { let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await; let read_ts = oracle.read_ts().await; diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index 5b69a3d72a8af..ebe4a7b6efaa2 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -340,7 +340,8 @@ impl Coordinator { // We want to reject queries that depend on log sources, for example, // even if we can *technically* optimize that reference away. let expr_depends_on = expr.depends_on(); - self.validate_timeline_context(expr_depends_on.iter().copied())?; + self.catalog() + .validate_timeline_context(expr_depends_on.iter().copied())?; self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?; // Materialized views are not allowed to depend on log sources, as replicas // are not producing the same definite collection for these. diff --git a/src/adapter/src/coord/sequencer/inner/create_view.rs b/src/adapter/src/coord/sequencer/inner/create_view.rs index 343c6d930c28a..ecbe73c5b42ef 100644 --- a/src/adapter/src/coord/sequencer/inner/create_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_view.rs @@ -260,7 +260,8 @@ impl Coordinator { // reject queries that depend on a relation in the wrong timeline, for // example, even if we can *technically* optimize that reference away. let expr_depends_on = expr.depends_on(); - self.validate_timeline_context(expr_depends_on.iter().copied())?; + self.catalog() + .validate_timeline_context(expr_depends_on.iter().copied())?; self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?; let validity = diff --git a/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs b/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs index 9b704d514c8fd..40357650f5134 100644 --- a/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs +++ b/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs @@ -310,7 +310,9 @@ impl Coordinator { return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT")); } }; - let mut timeline_context = self.validate_timeline_context(source_ids.iter().copied())?; + let mut timeline_context = self + .catalog() + .validate_timeline_context(source_ids.iter().copied())?; if matches!(timeline_context, TimelineContext::TimestampIndependent) && optimized_plan.contains_temporal() { diff --git a/src/adapter/src/coord/sequencer/inner/peek.rs b/src/adapter/src/coord/sequencer/inner/peek.rs index 775555e94a261..f8a608832d7ec 100644 --- a/src/adapter/src/coord/sequencer/inner/peek.rs +++ b/src/adapter/src/coord/sequencer/inner/peek.rs @@ -354,7 +354,9 @@ impl Coordinator { .transpose()?; let source_ids = plan.source.depends_on(); - let mut timeline_context = self.validate_timeline_context(source_ids.iter().copied())?; + let mut timeline_context = self + .catalog() + .validate_timeline_context(source_ids.iter().copied())?; if matches!(timeline_context, TimelineContext::TimestampIndependent) && plan.source.contains_temporal()? { diff --git a/src/adapter/src/coord/sequencer/inner/subscribe.rs b/src/adapter/src/coord/sequencer/inner/subscribe.rs index dec0efad77f07..11edafea9e204 100644 --- a/src/adapter/src/coord/sequencer/inner/subscribe.rs +++ b/src/adapter/src/coord/sequencer/inner/subscribe.rs @@ -137,7 +137,9 @@ impl Coordinator { session.add_notices(notices); // Determine timeline. - let mut timeline = self.validate_timeline_context(depends_on.iter().copied())?; + let mut timeline = self + .catalog() + .validate_timeline_context(depends_on.iter().copied())?; if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() { // If the from IDs are timestamp independent but the query contains temporal functions // then the timeline context needs to be upgraded to timestamp dependent. diff --git a/src/adapter/src/coord/timeline.rs b/src/adapter/src/coord/timeline.rs index cf50b66a12cbb..28f417f2bebc0 100644 --- a/src/adapter/src/coord/timeline.rs +++ b/src/adapter/src/coord/timeline.rs @@ -15,15 +15,11 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use futures::Future; -use itertools::Itertools; use mz_adapter_types::connection::ConnectionId; -use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View}; use mz_compute_types::ComputeInstanceId; -use mz_expr::CollectionPlan; -use mz_ore::collections::CollectionExt; use mz_ore::instrument; use mz_ore::now::{EpochMillis, NowFn, to_datetime}; -use mz_repr::{CatalogItemId, GlobalId, Timestamp}; +use mz_repr::{GlobalId, Timestamp}; use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier}; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle; @@ -342,230 +338,6 @@ impl Coordinator { empty_timelines.into_iter().collect() } - pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle { - let mut id_bundle = CollectionIdBundle::default(); - for entry in self.catalog().entries() { - if let TimelineContext::TimelineDependent(entry_timeline) = - self.get_timeline_context(entry.id()) - { - if timeline == &entry_timeline { - match entry.item() { - CatalogItem::Table(table) => { - id_bundle.storage_ids.extend(table.global_ids()); - } - CatalogItem::Source(source) => { - id_bundle.storage_ids.insert(source.global_id()); - } - CatalogItem::MaterializedView(mv) => { - id_bundle.storage_ids.insert(mv.global_id()); - } - CatalogItem::ContinualTask(ct) => { - id_bundle.storage_ids.insert(ct.global_id()); - } - CatalogItem::Index(index) => { - id_bundle - .compute_ids - .entry(index.cluster_id) - .or_default() - .insert(index.global_id()); - } - CatalogItem::View(_) - | CatalogItem::Sink(_) - | CatalogItem::Type(_) - | CatalogItem::Func(_) - | CatalogItem::Secret(_) - | CatalogItem::Connection(_) - | CatalogItem::Log(_) => {} - } - } - } - } - id_bundle - } - - /// Return an error if the ids are from incompatible [`TimelineContext`]s. This should - /// be used to prevent users from doing things that are either meaningless - /// (joining data from timelines that have similar numbers with different - /// meanings like two separate debezium topics) or will never complete (joining - /// cdcv2 and realtime data). - pub(crate) fn validate_timeline_context( - &self, - ids: I, - ) -> Result - where - I: IntoIterator, - { - let items_ids = ids - .into_iter() - .filter_map(|gid| self.catalog().try_resolve_item_id(&gid)); - let mut timeline_contexts: Vec<_> = - self.get_timeline_contexts(items_ids).into_iter().collect(); - // If there's more than one timeline, we will not produce meaningful - // data to a user. Take, for example, some realtime source and a debezium - // consistency topic source. The realtime source uses something close to now - // for its timestamps. The debezium source starts at 1 and increments per - // transaction. We don't want to choose some timestamp that is valid for both - // of these because the debezium source will never get to the same value as the - // realtime source's "milliseconds since Unix epoch" value. And even if it did, - // it's not meaningful to join just because those two numbers happen to be the - // same now. - // - // Another example: assume two separate debezium consistency topics. Both - // start counting at 1 and thus have similarish numbers that probably overlap - // a lot. However it's still not meaningful to join those two at a specific - // transaction counter number because those counters are unrelated to the - // other. - let timelines: Vec<_> = timeline_contexts - .extract_if(.., |timeline_context| timeline_context.contains_timeline()) - .collect(); - - // A single or group of objects may contain multiple compatible timeline - // contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all - // types of contexts. We choose the strongest context level to return back. - if timelines.len() > 1 { - Err(AdapterError::Unsupported( - "multiple timelines within one dataflow", - )) - } else if timelines.len() == 1 { - Ok(timelines.into_element()) - } else if timeline_contexts - .iter() - .contains(&TimelineContext::TimestampDependent) - { - Ok(TimelineContext::TimestampDependent) - } else { - Ok(TimelineContext::TimestampIndependent) - } - } - - /// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists. - pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext { - let entry = self.catalog().get_entry(&id); - self.validate_timeline_context(entry.global_ids()) - .expect("impossible for a single object to belong to incompatible timeline contexts") - } - - /// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists. - pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext { - self.validate_timeline_context(vec![id]) - .expect("impossible for a single object to belong to incompatible timeline contexts") - } - - /// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist. - fn get_timeline_contexts(&self, ids: I) -> BTreeSet - where - I: IntoIterator, - { - let mut seen: BTreeSet = BTreeSet::new(); - let mut timelines: BTreeSet = BTreeSet::new(); - - // Recurse through IDs to find all sources and tables, adding new ones to - // the set until we reach the bottom. - let mut ids: Vec<_> = ids.into_iter().collect(); - while let Some(id) = ids.pop() { - // Protect against possible infinite recursion. Not sure if it's possible, but - // a cheap prevention for the future. - if !seen.insert(id) { - continue; - } - if let Some(entry) = self.catalog().try_get_entry(&id) { - match entry.item() { - CatalogItem::Source(source) => { - timelines - .insert(TimelineContext::TimelineDependent(source.timeline.clone())); - } - CatalogItem::Index(index) => { - let on_id = self.catalog().resolve_item_id(&index.on); - ids.push(on_id); - } - CatalogItem::View(View { optimized_expr, .. }) => { - // If the definition contains a temporal function, the timeline must - // be timestamp dependent. - if optimized_expr.contains_temporal() { - timelines.insert(TimelineContext::TimestampDependent); - } else { - timelines.insert(TimelineContext::TimestampIndependent); - } - let item_ids = optimized_expr - .depends_on() - .into_iter() - .map(|gid| self.catalog().resolve_item_id(&gid)); - ids.extend(item_ids); - } - CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => { - // In some cases the timestamp selected may not affect the answer to a - // query, but it may affect our ability to query the materialized view. - // Materialized views must durably materialize the result of a query, even - // for constant queries. If we choose a timestamp larger than the upper, - // which represents the current progress of the view, then the query will - // need to block and wait for the materialized view to advance. - timelines.insert(TimelineContext::TimestampDependent); - let item_ids = optimized_expr - .depends_on() - .into_iter() - .map(|gid| self.catalog().resolve_item_id(&gid)); - ids.extend(item_ids); - } - CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => { - // See comment in MaterializedView - timelines.insert(TimelineContext::TimestampDependent); - let item_ids = raw_expr - .depends_on() - .into_iter() - .map(|gid| self.catalog().resolve_item_id(&gid)); - ids.extend(item_ids); - } - CatalogItem::Table(table) => { - timelines.insert(TimelineContext::TimelineDependent(table.timeline())); - } - CatalogItem::Log(_) => { - timelines.insert(TimelineContext::TimelineDependent( - Timeline::EpochMilliseconds, - )); - } - CatalogItem::Sink(_) - | CatalogItem::Type(_) - | CatalogItem::Func(_) - | CatalogItem::Secret(_) - | CatalogItem::Connection(_) => {} - } - } - } - - timelines - } - - /// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id - /// belongs to. - pub fn partition_ids_by_timeline_context( - &self, - id_bundle: &CollectionIdBundle, - ) -> impl Iterator + use<> { - let mut res: BTreeMap = BTreeMap::new(); - - for gid in &id_bundle.storage_ids { - let timeline_context = self.get_timeline_context_for_global_id(*gid); - res.entry(timeline_context) - .or_default() - .storage_ids - .insert(*gid); - } - - for (compute_instance, ids) in &id_bundle.compute_ids { - for gid in ids { - let timeline_context = self.get_timeline_context_for_global_id(*gid); - res.entry(timeline_context) - .or_default() - .compute_ids - .entry(*compute_instance) - .or_default() - .insert(*gid); - } - } - - res.into_iter() - } - /// Return the set of ids in a timedomain and verify timeline correctness. /// /// When a user starts a transaction, we need to prevent compaction of anything @@ -635,6 +407,7 @@ impl Coordinator { ] { ids.retain(|gid| { let id_timeline_context = self + .catalog() .validate_timeline_context(vec![*gid]) .expect("single id should never fail"); match (&id_timeline_context, &timeline_context) { @@ -679,7 +452,7 @@ impl Coordinator { // For non realtime sources, we define now as the largest timestamp, not in // advance of any object's upper. This is the largest timestamp that is closed // to writes. - let id_bundle = self.ids_in_timeline(&timeline); + let id_bundle = self.catalog().ids_in_timeline(&timeline); // Advance the timeline if-and-only-if there are objects in it. // Otherwise we'd advance to the empty frontier, meaning we From 55f756a042dd3971a81575dcbcc458db0a8fc6dc Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Wed, 20 Aug 2025 12:56:15 +0200 Subject: [PATCH 2/2] Move timeline things from catalog.rs to a new file --- src/adapter/src/catalog.rs | 237 +------------------------- src/adapter/src/catalog/timeline.rs | 249 ++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+), 231 deletions(-) create mode 100644 src/adapter/src/catalog/timeline.rs diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 3e7eeeb2cf5ae..b93096bda22b5 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -40,15 +40,15 @@ use mz_catalog::durable::{ use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions}; use mz_catalog::memory::error::{Error, ErrorKind}; use mz_catalog::memory::objects::{ - CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, ContinualTask, - Database, MaterializedView, NetworkPolicy, Role, RoleAuth, Schema, View, + CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, + NetworkPolicy, Role, RoleAuth, Schema, }; use mz_compute_types::dataflows::DataflowDescription; use mz_controller::clusters::ReplicaLocation; use mz_controller_types::{ClusterId, ReplicaId}; -use mz_expr::{CollectionPlan, OptimizedMirRelationExpr}; +use mz_expr::OptimizedMirRelationExpr; use mz_license_keys::ValidatedLicenseKey; -use mz_ore::collections::{CollectionExt, HashSet}; +use mz_ore::collections::HashSet; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME}; use mz_ore::result::ResultExt as _; @@ -80,7 +80,6 @@ use mz_sql::session::vars::SystemVars; use mz_sql_parser::ast::QualifiedReplica; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection}; -use mz_storage_types::sources::Timeline; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::notice::OptimizerNotice; use smallvec::SmallVec; @@ -100,7 +99,7 @@ use crate::command::CatalogDump; use crate::coord::TargetCluster; use crate::session::{PreparedStatement, Session}; use crate::util::ResultExt; -use crate::{AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, TimelineContext}; +use crate::{AdapterError, AdapterNotice, ExecuteResponse}; mod builtin_table_updates; pub(crate) mod consistency; @@ -109,6 +108,7 @@ mod migrate; mod apply; mod open; mod state; +mod timeline; mod transact; /// A `Catalog` keeps track of the SQL objects known to the planner. @@ -1512,231 +1512,6 @@ impl Catalog { let builtin_table_updates = self.state.apply_updates(updates)?; Ok(builtin_table_updates) } - - /// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists. - pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext { - let entry = self.get_entry(&id); - self.validate_timeline_context(entry.global_ids()) - .expect("impossible for a single object to belong to incompatible timeline contexts") - } - - /// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists. - pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext { - self.validate_timeline_context(vec![id]) - .expect("impossible for a single object to belong to incompatible timeline contexts") - } - - /// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id - /// belongs to. - pub fn partition_ids_by_timeline_context( - &self, - id_bundle: &CollectionIdBundle, - ) -> impl Iterator + use<> { - let mut res: BTreeMap = BTreeMap::new(); - - for gid in &id_bundle.storage_ids { - let timeline_context = self.get_timeline_context_for_global_id(*gid); - res.entry(timeline_context) - .or_default() - .storage_ids - .insert(*gid); - } - - for (compute_instance, ids) in &id_bundle.compute_ids { - for gid in ids { - let timeline_context = self.get_timeline_context_for_global_id(*gid); - res.entry(timeline_context) - .or_default() - .compute_ids - .entry(*compute_instance) - .or_default() - .insert(*gid); - } - } - - res.into_iter() - } - - /// Returns an id bundle containing all the ids in the give timeline. - pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle { - let mut id_bundle = CollectionIdBundle::default(); - for entry in self.entries() { - if let TimelineContext::TimelineDependent(entry_timeline) = - self.get_timeline_context(entry.id()) - { - if timeline == &entry_timeline { - match entry.item() { - CatalogItem::Table(table) => { - id_bundle.storage_ids.extend(table.global_ids()); - } - CatalogItem::Source(source) => { - id_bundle.storage_ids.insert(source.global_id()); - } - CatalogItem::MaterializedView(mv) => { - id_bundle.storage_ids.insert(mv.global_id()); - } - CatalogItem::ContinualTask(ct) => { - id_bundle.storage_ids.insert(ct.global_id()); - } - CatalogItem::Index(index) => { - id_bundle - .compute_ids - .entry(index.cluster_id) - .or_default() - .insert(index.global_id()); - } - CatalogItem::View(_) - | CatalogItem::Sink(_) - | CatalogItem::Type(_) - | CatalogItem::Func(_) - | CatalogItem::Secret(_) - | CatalogItem::Connection(_) - | CatalogItem::Log(_) => {} - } - } - } - } - id_bundle - } - - /// Return an error if the ids are from incompatible [`TimelineContext`]s. This should - /// be used to prevent users from doing things that are either meaningless - /// (joining data from timelines that have similar numbers with different - /// meanings like two separate debezium topics) or will never complete (joining - /// cdcv2 and realtime data). - pub(crate) fn validate_timeline_context( - &self, - ids: I, - ) -> Result - where - I: IntoIterator, - { - let items_ids = ids - .into_iter() - .filter_map(|gid| self.try_resolve_item_id(&gid)); - let mut timeline_contexts: Vec<_> = - self.get_timeline_contexts(items_ids).into_iter().collect(); - // If there's more than one timeline, we will not produce meaningful - // data to a user. Take, for example, some realtime source and a debezium - // consistency topic source. The realtime source uses something close to now - // for its timestamps. The debezium source starts at 1 and increments per - // transaction. We don't want to choose some timestamp that is valid for both - // of these because the debezium source will never get to the same value as the - // realtime source's "milliseconds since Unix epoch" value. And even if it did, - // it's not meaningful to join just because those two numbers happen to be the - // same now. - // - // Another example: assume two separate debezium consistency topics. Both - // start counting at 1 and thus have similarish numbers that probably overlap - // a lot. However it's still not meaningful to join those two at a specific - // transaction counter number because those counters are unrelated to the - // other. - let timelines: Vec<_> = timeline_contexts - .extract_if(.., |timeline_context| timeline_context.contains_timeline()) - .collect(); - - // A single or group of objects may contain multiple compatible timeline - // contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all - // types of contexts. We choose the strongest context level to return back. - if timelines.len() > 1 { - Err(AdapterError::Unsupported( - "multiple timelines within one dataflow", - )) - } else if timelines.len() == 1 { - Ok(timelines.into_element()) - } else if timeline_contexts - .iter() - .contains(&TimelineContext::TimestampDependent) - { - Ok(TimelineContext::TimestampDependent) - } else { - Ok(TimelineContext::TimestampIndependent) - } - } - - /// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist. - fn get_timeline_contexts(&self, ids: I) -> BTreeSet - where - I: IntoIterator, - { - let mut seen: BTreeSet = BTreeSet::new(); - let mut timelines: BTreeSet = BTreeSet::new(); - - // Recurse through IDs to find all sources and tables, adding new ones to - // the set until we reach the bottom. - let mut ids: Vec<_> = ids.into_iter().collect(); - while let Some(id) = ids.pop() { - // Protect against possible infinite recursion. Not sure if it's possible, but - // a cheap prevention for the future. - if !seen.insert(id) { - continue; - } - if let Some(entry) = self.try_get_entry(&id) { - match entry.item() { - CatalogItem::Source(source) => { - timelines - .insert(TimelineContext::TimelineDependent(source.timeline.clone())); - } - CatalogItem::Index(index) => { - let on_id = self.resolve_item_id(&index.on); - ids.push(on_id); - } - CatalogItem::View(View { optimized_expr, .. }) => { - // If the definition contains a temporal function, the timeline must - // be timestamp dependent. - if optimized_expr.contains_temporal() { - timelines.insert(TimelineContext::TimestampDependent); - } else { - timelines.insert(TimelineContext::TimestampIndependent); - } - let item_ids = optimized_expr - .depends_on() - .into_iter() - .map(|gid| self.resolve_item_id(&gid)); - ids.extend(item_ids); - } - CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => { - // In some cases the timestamp selected may not affect the answer to a - // query, but it may affect our ability to query the materialized view. - // Materialized views must durably materialize the result of a query, even - // for constant queries. If we choose a timestamp larger than the upper, - // which represents the current progress of the view, then the query will - // need to block and wait for the materialized view to advance. - timelines.insert(TimelineContext::TimestampDependent); - let item_ids = optimized_expr - .depends_on() - .into_iter() - .map(|gid| self.resolve_item_id(&gid)); - ids.extend(item_ids); - } - CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => { - // See comment in MaterializedView - timelines.insert(TimelineContext::TimestampDependent); - let item_ids = raw_expr - .depends_on() - .into_iter() - .map(|gid| self.resolve_item_id(&gid)); - ids.extend(item_ids); - } - CatalogItem::Table(table) => { - timelines.insert(TimelineContext::TimelineDependent(table.timeline())); - } - CatalogItem::Log(_) => { - timelines.insert(TimelineContext::TimelineDependent( - Timeline::EpochMilliseconds, - )); - } - CatalogItem::Sink(_) - | CatalogItem::Type(_) - | CatalogItem::Func(_) - | CatalogItem::Secret(_) - | CatalogItem::Connection(_) => {} - } - } - } - - timelines - } } pub fn is_reserved_name(name: &str) -> bool { diff --git a/src/adapter/src/catalog/timeline.rs b/src/adapter/src/catalog/timeline.rs new file mode 100644 index 0000000000000..a38536e517bbd --- /dev/null +++ b/src/adapter/src/catalog/timeline.rs @@ -0,0 +1,249 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Logic related to timelines. + +use std::collections::{BTreeMap, BTreeSet}; + +use itertools::Itertools; +use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View}; +use mz_expr::CollectionPlan; +use mz_ore::collections::CollectionExt; +use mz_repr::{CatalogItemId, GlobalId}; +use mz_storage_types::sources::Timeline; + +use crate::catalog::Catalog; +use crate::{AdapterError, CollectionIdBundle, TimelineContext}; + +impl Catalog { + /// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists. + pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext { + let entry = self.get_entry(&id); + self.validate_timeline_context(entry.global_ids()) + .expect("impossible for a single object to belong to incompatible timeline contexts") + } + + /// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists. + pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext { + self.validate_timeline_context(vec![id]) + .expect("impossible for a single object to belong to incompatible timeline contexts") + } + + /// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id + /// belongs to. + pub fn partition_ids_by_timeline_context( + &self, + id_bundle: &CollectionIdBundle, + ) -> impl Iterator + use<> { + let mut res: BTreeMap = BTreeMap::new(); + + for gid in &id_bundle.storage_ids { + let timeline_context = self.get_timeline_context_for_global_id(*gid); + res.entry(timeline_context) + .or_default() + .storage_ids + .insert(*gid); + } + + for (compute_instance, ids) in &id_bundle.compute_ids { + for gid in ids { + let timeline_context = self.get_timeline_context_for_global_id(*gid); + res.entry(timeline_context) + .or_default() + .compute_ids + .entry(*compute_instance) + .or_default() + .insert(*gid); + } + } + + res.into_iter() + } + + /// Returns an id bundle containing all the ids in the give timeline. + pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle { + let mut id_bundle = CollectionIdBundle::default(); + for entry in self.entries() { + if let TimelineContext::TimelineDependent(entry_timeline) = + self.get_timeline_context(entry.id()) + { + if timeline == &entry_timeline { + match entry.item() { + CatalogItem::Table(table) => { + id_bundle.storage_ids.extend(table.global_ids()); + } + CatalogItem::Source(source) => { + id_bundle.storage_ids.insert(source.global_id()); + } + CatalogItem::MaterializedView(mv) => { + id_bundle.storage_ids.insert(mv.global_id()); + } + CatalogItem::ContinualTask(ct) => { + id_bundle.storage_ids.insert(ct.global_id()); + } + CatalogItem::Index(index) => { + id_bundle + .compute_ids + .entry(index.cluster_id) + .or_default() + .insert(index.global_id()); + } + CatalogItem::View(_) + | CatalogItem::Sink(_) + | CatalogItem::Type(_) + | CatalogItem::Func(_) + | CatalogItem::Secret(_) + | CatalogItem::Connection(_) + | CatalogItem::Log(_) => {} + } + } + } + } + id_bundle + } + + /// Return an error if the ids are from incompatible [`TimelineContext`]s. This should + /// be used to prevent users from doing things that are either meaningless + /// (joining data from timelines that have similar numbers with different + /// meanings like two separate debezium topics) or will never complete (joining + /// cdcv2 and realtime data). + pub(crate) fn validate_timeline_context( + &self, + ids: I, + ) -> Result + where + I: IntoIterator, + { + let items_ids = ids + .into_iter() + .filter_map(|gid| self.try_resolve_item_id(&gid)); + let mut timeline_contexts: Vec<_> = + self.get_timeline_contexts(items_ids).into_iter().collect(); + // If there's more than one timeline, we will not produce meaningful + // data to a user. Take, for example, some realtime source and a debezium + // consistency topic source. The realtime source uses something close to now + // for its timestamps. The debezium source starts at 1 and increments per + // transaction. We don't want to choose some timestamp that is valid for both + // of these because the debezium source will never get to the same value as the + // realtime source's "milliseconds since Unix epoch" value. And even if it did, + // it's not meaningful to join just because those two numbers happen to be the + // same now. + // + // Another example: assume two separate debezium consistency topics. Both + // start counting at 1 and thus have similarish numbers that probably overlap + // a lot. However it's still not meaningful to join those two at a specific + // transaction counter number because those counters are unrelated to the + // other. + let timelines: Vec<_> = timeline_contexts + .extract_if(.., |timeline_context| timeline_context.contains_timeline()) + .collect(); + + // A single or group of objects may contain multiple compatible timeline + // contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all + // types of contexts. We choose the strongest context level to return back. + if timelines.len() > 1 { + Err(AdapterError::Unsupported( + "multiple timelines within one dataflow", + )) + } else if timelines.len() == 1 { + Ok(timelines.into_element()) + } else if timeline_contexts + .iter() + .contains(&TimelineContext::TimestampDependent) + { + Ok(TimelineContext::TimestampDependent) + } else { + Ok(TimelineContext::TimestampIndependent) + } + } + + /// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist. + fn get_timeline_contexts(&self, ids: I) -> BTreeSet + where + I: IntoIterator, + { + let mut seen: BTreeSet = BTreeSet::new(); + let mut timelines: BTreeSet = BTreeSet::new(); + + // Recurse through IDs to find all sources and tables, adding new ones to + // the set until we reach the bottom. + let mut ids: Vec<_> = ids.into_iter().collect(); + while let Some(id) = ids.pop() { + // Protect against possible infinite recursion. Not sure if it's possible, but + // a cheap prevention for the future. + if !seen.insert(id) { + continue; + } + if let Some(entry) = self.try_get_entry(&id) { + match entry.item() { + CatalogItem::Source(source) => { + timelines + .insert(TimelineContext::TimelineDependent(source.timeline.clone())); + } + CatalogItem::Index(index) => { + let on_id = self.resolve_item_id(&index.on); + ids.push(on_id); + } + CatalogItem::View(View { optimized_expr, .. }) => { + // If the definition contains a temporal function, the timeline must + // be timestamp dependent. + if optimized_expr.contains_temporal() { + timelines.insert(TimelineContext::TimestampDependent); + } else { + timelines.insert(TimelineContext::TimestampIndependent); + } + let item_ids = optimized_expr + .depends_on() + .into_iter() + .map(|gid| self.resolve_item_id(&gid)); + ids.extend(item_ids); + } + CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => { + // In some cases the timestamp selected may not affect the answer to a + // query, but it may affect our ability to query the materialized view. + // Materialized views must durably materialize the result of a query, even + // for constant queries. If we choose a timestamp larger than the upper, + // which represents the current progress of the view, then the query will + // need to block and wait for the materialized view to advance. + timelines.insert(TimelineContext::TimestampDependent); + let item_ids = optimized_expr + .depends_on() + .into_iter() + .map(|gid| self.resolve_item_id(&gid)); + ids.extend(item_ids); + } + CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => { + // See comment in MaterializedView + timelines.insert(TimelineContext::TimestampDependent); + let item_ids = raw_expr + .depends_on() + .into_iter() + .map(|gid| self.resolve_item_id(&gid)); + ids.extend(item_ids); + } + CatalogItem::Table(table) => { + timelines.insert(TimelineContext::TimelineDependent(table.timeline())); + } + CatalogItem::Log(_) => { + timelines.insert(TimelineContext::TimelineDependent( + Timeline::EpochMilliseconds, + )); + } + CatalogItem::Sink(_) + | CatalogItem::Type(_) + | CatalogItem::Func(_) + | CatalogItem::Secret(_) + | CatalogItem::Connection(_) => {} + } + } + } + + timelines + } +}