Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ mod migrate;
mod apply;
mod open;
mod state;
mod timeline;
mod transact;

/// A `Catalog` keeps track of the SQL objects known to the planner.
Expand Down
249 changes: 249 additions & 0 deletions src/adapter/src/catalog/timeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Logic related to timelines.

use std::collections::{BTreeMap, BTreeSet};

use itertools::Itertools;
use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View};
use mz_expr::CollectionPlan;
use mz_ore::collections::CollectionExt;
use mz_repr::{CatalogItemId, GlobalId};
use mz_storage_types::sources::Timeline;

use crate::catalog::Catalog;
use crate::{AdapterError, CollectionIdBundle, TimelineContext};

impl Catalog {
/// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists.
pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext {
let entry = self.get_entry(&id);
self.validate_timeline_context(entry.global_ids())
.expect("impossible for a single object to belong to incompatible timeline contexts")
}

/// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists.
pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext {
self.validate_timeline_context(vec![id])
.expect("impossible for a single object to belong to incompatible timeline contexts")
}

/// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id
/// belongs to.
pub fn partition_ids_by_timeline_context(
&self,
id_bundle: &CollectionIdBundle,
) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)> + use<> {
let mut res: BTreeMap<TimelineContext, CollectionIdBundle> = BTreeMap::new();

for gid in &id_bundle.storage_ids {
let timeline_context = self.get_timeline_context_for_global_id(*gid);
res.entry(timeline_context)
.or_default()
.storage_ids
.insert(*gid);
}

for (compute_instance, ids) in &id_bundle.compute_ids {
for gid in ids {
let timeline_context = self.get_timeline_context_for_global_id(*gid);
res.entry(timeline_context)
.or_default()
.compute_ids
.entry(*compute_instance)
.or_default()
.insert(*gid);
}
}

res.into_iter()
}

/// Returns an id bundle containing all the ids in the give timeline.
pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle {
let mut id_bundle = CollectionIdBundle::default();
for entry in self.entries() {
if let TimelineContext::TimelineDependent(entry_timeline) =
self.get_timeline_context(entry.id())
{
if timeline == &entry_timeline {
match entry.item() {
CatalogItem::Table(table) => {
id_bundle.storage_ids.extend(table.global_ids());
}
CatalogItem::Source(source) => {
id_bundle.storage_ids.insert(source.global_id());
}
CatalogItem::MaterializedView(mv) => {
id_bundle.storage_ids.insert(mv.global_id());
}
CatalogItem::ContinualTask(ct) => {
id_bundle.storage_ids.insert(ct.global_id());
}
CatalogItem::Index(index) => {
id_bundle
.compute_ids
.entry(index.cluster_id)
.or_default()
.insert(index.global_id());
}
CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_)
| CatalogItem::Log(_) => {}
}
}
}
}
id_bundle
}

/// Return an error if the ids are from incompatible [`TimelineContext`]s. This should
/// be used to prevent users from doing things that are either meaningless
/// (joining data from timelines that have similar numbers with different
/// meanings like two separate debezium topics) or will never complete (joining
/// cdcv2 and realtime data).
pub(crate) fn validate_timeline_context<I>(
&self,
ids: I,
) -> Result<TimelineContext, AdapterError>
where
I: IntoIterator<Item = GlobalId>,
{
let items_ids = ids
.into_iter()
.filter_map(|gid| self.try_resolve_item_id(&gid));
let mut timeline_contexts: Vec<_> =
self.get_timeline_contexts(items_ids).into_iter().collect();
// If there's more than one timeline, we will not produce meaningful
// data to a user. Take, for example, some realtime source and a debezium
// consistency topic source. The realtime source uses something close to now
// for its timestamps. The debezium source starts at 1 and increments per
// transaction. We don't want to choose some timestamp that is valid for both
// of these because the debezium source will never get to the same value as the
// realtime source's "milliseconds since Unix epoch" value. And even if it did,
// it's not meaningful to join just because those two numbers happen to be the
// same now.
//
// Another example: assume two separate debezium consistency topics. Both
// start counting at 1 and thus have similarish numbers that probably overlap
// a lot. However it's still not meaningful to join those two at a specific
// transaction counter number because those counters are unrelated to the
// other.
let timelines: Vec<_> = timeline_contexts
.extract_if(.., |timeline_context| timeline_context.contains_timeline())
.collect();

// A single or group of objects may contain multiple compatible timeline
// contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all
// types of contexts. We choose the strongest context level to return back.
if timelines.len() > 1 {
Err(AdapterError::Unsupported(
"multiple timelines within one dataflow",
))
} else if timelines.len() == 1 {
Ok(timelines.into_element())
} else if timeline_contexts
.iter()
.contains(&TimelineContext::TimestampDependent)
{
Ok(TimelineContext::TimestampDependent)
} else {
Ok(TimelineContext::TimestampIndependent)
}
}

/// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist.
fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>
where
I: IntoIterator<Item = CatalogItemId>,
{
let mut seen: BTreeSet<CatalogItemId> = BTreeSet::new();
let mut timelines: BTreeSet<TimelineContext> = BTreeSet::new();

// Recurse through IDs to find all sources and tables, adding new ones to
// the set until we reach the bottom.
let mut ids: Vec<_> = ids.into_iter().collect();
while let Some(id) = ids.pop() {
// Protect against possible infinite recursion. Not sure if it's possible, but
// a cheap prevention for the future.
if !seen.insert(id) {
continue;
}
if let Some(entry) = self.try_get_entry(&id) {
match entry.item() {
CatalogItem::Source(source) => {
timelines
.insert(TimelineContext::TimelineDependent(source.timeline.clone()));
}
CatalogItem::Index(index) => {
let on_id = self.resolve_item_id(&index.on);
ids.push(on_id);
}
CatalogItem::View(View { optimized_expr, .. }) => {
// If the definition contains a temporal function, the timeline must
// be timestamp dependent.
if optimized_expr.contains_temporal() {
timelines.insert(TimelineContext::TimestampDependent);
} else {
timelines.insert(TimelineContext::TimestampIndependent);
}
let item_ids = optimized_expr
.depends_on()
.into_iter()
.map(|gid| self.resolve_item_id(&gid));
ids.extend(item_ids);
}
CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => {
// In some cases the timestamp selected may not affect the answer to a
// query, but it may affect our ability to query the materialized view.
// Materialized views must durably materialize the result of a query, even
// for constant queries. If we choose a timestamp larger than the upper,
// which represents the current progress of the view, then the query will
// need to block and wait for the materialized view to advance.
timelines.insert(TimelineContext::TimestampDependent);
let item_ids = optimized_expr
.depends_on()
.into_iter()
.map(|gid| self.resolve_item_id(&gid));
ids.extend(item_ids);
}
CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => {
// See comment in MaterializedView
timelines.insert(TimelineContext::TimestampDependent);
let item_ids = raw_expr
.depends_on()
.into_iter()
.map(|gid| self.resolve_item_id(&gid));
ids.extend(item_ids);
}
CatalogItem::Table(table) => {
timelines.insert(TimelineContext::TimelineDependent(table.timeline()));
}
CatalogItem::Log(_) => {
timelines.insert(TimelineContext::TimelineDependent(
Timeline::EpochMilliseconds,
));
}
CatalogItem::Sink(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => {}
}
}
}

timelines
}
}
5 changes: 3 additions & 2 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1191,8 +1191,9 @@ impl Coordinator {
.iter()
.any(materialized_view_option_contains_temporal)
{
let timeline_context =
self.validate_timeline_context(resolved_ids.collections().copied())?;
let timeline_context = self
.catalog()
.validate_timeline_context(resolved_ids.collections().copied())?;

// We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
// but even in the TimestampIndependent case.
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ impl Coordinator {
clusters_to_drop.clone(),
);
let timeline_associations: BTreeMap<_, _> = self
.catalog()
.partition_ids_by_timeline_context(&collection_id_bundle)
.filter_map(|(context, bundle)| {
let TimelineContext::TimelineDependent(timeline) = context else {
Expand Down
4 changes: 3 additions & 1 deletion src/adapter/src/coord/read_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ impl crate::coord::Coordinator {
compaction_window: CompactionWindow,
) {
// Install read holds in the Coordinator's timeline state.
for (timeline_context, id_bundle) in self.partition_ids_by_timeline_context(id_bundle) {
for (timeline_context, id_bundle) in
self.catalog().partition_ids_by_timeline_context(id_bundle)
{
if let TimelineContext::TimelineDependent(timeline) = timeline_context {
let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await;
let read_ts = oracle.read_ts().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ impl Coordinator {
// We want to reject queries that depend on log sources, for example,
// even if we can *technically* optimize that reference away.
let expr_depends_on = expr.depends_on();
self.validate_timeline_context(expr_depends_on.iter().copied())?;
self.catalog()
.validate_timeline_context(expr_depends_on.iter().copied())?;
self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
// Materialized views are not allowed to depend on log sources, as replicas
// are not producing the same definite collection for these.
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/coord/sequencer/inner/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ impl Coordinator {
// reject queries that depend on a relation in the wrong timeline, for
// example, even if we can *technically* optimize that reference away.
let expr_depends_on = expr.depends_on();
self.validate_timeline_context(expr_depends_on.iter().copied())?;
self.catalog()
.validate_timeline_context(expr_depends_on.iter().copied())?;
self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;

let validity =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ impl Coordinator {
return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
}
};
let mut timeline_context = self.validate_timeline_context(source_ids.iter().copied())?;
let mut timeline_context = self
.catalog()
.validate_timeline_context(source_ids.iter().copied())?;
if matches!(timeline_context, TimelineContext::TimestampIndependent)
&& optimized_plan.contains_temporal()
{
Expand Down
4 changes: 3 additions & 1 deletion src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,9 @@ impl Coordinator {
.transpose()?;

let source_ids = plan.source.depends_on();
let mut timeline_context = self.validate_timeline_context(source_ids.iter().copied())?;
let mut timeline_context = self
.catalog()
.validate_timeline_context(source_ids.iter().copied())?;
if matches!(timeline_context, TimelineContext::TimestampIndependent)
&& plan.source.contains_temporal()?
{
Expand Down
4 changes: 3 additions & 1 deletion src/adapter/src/coord/sequencer/inner/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ impl Coordinator {
session.add_notices(notices);

// Determine timeline.
let mut timeline = self.validate_timeline_context(depends_on.iter().copied())?;
let mut timeline = self
.catalog()
.validate_timeline_context(depends_on.iter().copied())?;
if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
// If the from IDs are timestamp independent but the query contains temporal functions
// then the timeline context needs to be upgraded to timestamp dependent.
Expand Down
Loading