Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add REFRESH options for MVs -- parsing, purification, planning #23870

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion misc/dbt-materialize/tests/adapter/test_constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def test_ddl_enforcement(self, project):
fetch="one",
)
assert (
'WITH (ASSERT NOT NULL = "a", ASSERT NOT NULL = "b")'
'ASSERT NOT NULL = "a", ASSERT NOT NULL = "b"'
in nullability_assertions_ddl[1]
)

Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/checks/all_checks/null_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def validate(self) -> Testdrive:
<null> <null> <null>

> SHOW CREATE MATERIALIZED VIEW null_value_view2;
materialize.public.null_value_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"null_value_view2\\" IN CLUSTER \\"{self._default_cluster()}\\" AS SELECT \\"f1\\", \\"f2\\", NULL FROM \\"materialize\\".\\"public\\".\\"null_value_table\\" WHERE \\"f1\\" IS NULL OR \\"f1\\" IS NOT NULL OR \\"f1\\" = NULL"
materialize.public.null_value_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"null_value_view2\\" IN CLUSTER \\"{self._default_cluster()}\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"f2\\", NULL FROM \\"materialize\\".\\"public\\".\\"null_value_table\\" WHERE \\"f1\\" IS NULL OR \\"f1\\" IS NOT NULL OR \\"f1\\" = NULL"

> SELECT * FROM null_value_view2;
<null> <null> <null>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def validate(self) -> Testdrive:
dedent(
f"""
> SHOW CREATE MATERIALIZED VIEW string_bytea_types_view1;
materialize.public.string_bytea_types_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"string_bytea_types_view1\\" IN CLUSTER \\"{self._default_cluster()}\\" AS SELECT \\"text_col\\", \\"bytea_col\\", 'това'::\\"pg_catalog\\".\\"text\\", '\\\\xAAAA'::\\"pg_catalog\\".\\"bytea\\" FROM \\"materialize\\".\\"public\\".\\"text_bytea_types_table\\" WHERE \\"text_col\\" >= ''::\\"pg_catalog\\".\\"text\\" AND \\"bytea_col\\" >= ''::\\"pg_catalog\\".\\"bytea\\""
materialize.public.string_bytea_types_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"string_bytea_types_view1\\" IN CLUSTER \\"{self._default_cluster()}\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"text_col\\", \\"bytea_col\\", 'това'::\\"pg_catalog\\".\\"text\\", '\\\\xAAAA'::\\"pg_catalog\\".\\"bytea\\" FROM \\"materialize\\".\\"public\\".\\"text_bytea_types_table\\" WHERE \\"text_col\\" >= ''::\\"pg_catalog\\".\\"text\\" AND \\"bytea_col\\" >= ''::\\"pg_catalog\\".\\"bytea\\""

> SELECT text_col, text, LENGTH(bytea_col), LENGTH(bytea) FROM string_bytea_types_view1;
aaaa това 2 2
Expand Down
12 changes: 6 additions & 6 deletions misc/python/materialize/checks/all_checks/top_k.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ def validate(self) -> Testdrive:
dedent(
f"""
> SHOW CREATE MATERIALIZED VIEW basic_topk_view1;
materialize.public.basic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view1\\" IN CLUSTER \\"{self._default_cluster()}\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2"
materialize.public.basic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view1\\" IN CLUSTER \\"{self._default_cluster()}\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2"

> SELECT * FROM basic_topk_view1;
2 32
3 48

> SHOW CREATE MATERIALIZED VIEW basic_topk_view2;
materialize.public.basic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view2\\" IN CLUSTER \\"{self._default_cluster()}\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2"
materialize.public.basic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view2\\" IN CLUSTER \\"{self._default_cluster()}\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2"

> SELECT * FROM basic_topk_view2;
1 16
Expand Down Expand Up @@ -123,14 +123,14 @@ def validate(self) -> Testdrive:
dedent(
f"""
> SHOW CREATE MATERIALIZED VIEW monotonic_topk_view1;
materialize.public.monotonic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view1\\" IN CLUSTER \\"{self._default_cluster()}\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2"
materialize.public.monotonic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view1\\" IN CLUSTER \\"{self._default_cluster()}\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2"

> SELECT * FROM monotonic_topk_view1;
E 5
D 4

> SHOW CREATE MATERIALIZED VIEW monotonic_topk_view2;
materialize.public.monotonic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view2\\" IN CLUSTER \\"{self._default_cluster()}\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2"
materialize.public.monotonic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view2\\" IN CLUSTER \\"{self._default_cluster()}\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2"

> SELECT * FROM monotonic_topk_view2;
A 1
Expand Down Expand Up @@ -186,13 +186,13 @@ def validate(self) -> Testdrive:
dedent(
f"""
> SHOW CREATE MATERIALIZED VIEW monotonic_top1_view1;
materialize.public.monotonic_top1_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view1\\" IN CLUSTER \\"{self._default_cluster()}\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 1"
materialize.public.monotonic_top1_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view1\\" IN CLUSTER \\"{self._default_cluster()}\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 1"

> SELECT * FROM monotonic_top1_view1;
D 5

> SHOW CREATE MATERIALIZED VIEW monotonic_top1_view2;
materialize.public.monotonic_top1_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view2\\" IN CLUSTER \\"{self._default_cluster()}\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 1"
materialize.public.monotonic_top1_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view2\\" IN CLUSTER \\"{self._default_cluster()}\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 1"

> SELECT * FROM monotonic_top1_view2;
A 1
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4217,6 +4217,10 @@ impl SessionCatalog for ConnCatalog<'_> {
}
}

fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
self.state.get_system_type(name)
}

fn try_get_item(&self, id: &GlobalId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
Some(self.state.try_get_entry(id)?)
}
Expand Down
21 changes: 18 additions & 3 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,12 @@ impl CatalogState {
diff: Diff,
) -> Vec<BuiltinTableUpdate> {
let create_stmt = mz_sql::parse::parse(&view.create_sql)
.unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", view.create_sql))
.unwrap_or_else(|e| {
panic!(
"create_sql cannot be invalid: `{}` --- error: `{}`",
view.create_sql, e
)
})
.into_element()
.ast;
let query = match &create_stmt {
Expand Down Expand Up @@ -875,7 +880,12 @@ impl CatalogState {
diff: Diff,
) -> Vec<BuiltinTableUpdate> {
let create_stmt = mz_sql::parse::parse(&mview.create_sql)
.unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", mview.create_sql))
.unwrap_or_else(|e| {
panic!(
"create_sql cannot be invalid: `{}` --- error: `{}`",
mview.create_sql, e
)
})
.into_element()
.ast;
let query = match &create_stmt {
Expand Down Expand Up @@ -975,7 +985,12 @@ impl CatalogState {
let mut updates = vec![];

let create_stmt = mz_sql::parse::parse(&index.create_sql)
.unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", index.create_sql))
.unwrap_or_else(|e| {
panic!(
"create_sql cannot be invalid: `{}` --- error: `{}`",
index.create_sql, e
)
})
.into_element()
.ast;

Expand Down
40 changes: 39 additions & 1 deletion src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use mz_ore::now::{EpochMillis, NowFn};
use mz_repr::namespaces::PG_CATALOG_SCHEMA;
use mz_sql::ast::display::AstDisplay;
use mz_sql::catalog::NameReference;
use mz_sql_parser::ast::{visit_mut, Ident, Raw, RawDataType, Statement};
use mz_sql_parser::ast::{
visit_mut, CreateMaterializedViewStatement, Ident, MaterializedViewOption,
MaterializedViewOptionName, Raw, RawDataType, RefreshOptionValue, Statement,
};
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::sources::GenericSourceConnection;
Expand Down Expand Up @@ -95,6 +98,10 @@ pub(crate) async fn migrate(
}
ast_rewrite_rewrite_type_schemas_0_81_0(stmt);

if catalog_version <= Version::new(0, 81, u64::MAX) {
ast_rewrite_create_materialized_view_refresh_options_0_82_0(stmt)?;
}

Ok(())
})
})
Expand Down Expand Up @@ -348,6 +355,37 @@ fn persist_default_cluster_0_82_0(
Ok(())
}

fn ast_rewrite_create_materialized_view_refresh_options_0_82_0(
stmt: &mut Statement<Raw>,
) -> Result<(), anyhow::Error> {
use mz_sql::ast::visit_mut::VisitMut;
use mz_sql::ast::WithOptionValue;

struct Rewriter;

impl<'ast> VisitMut<'ast, Raw> for Rewriter {
fn visit_create_materialized_view_statement_mut(
&mut self,
node: &'ast mut CreateMaterializedViewStatement<Raw>,
) {
if !node
.with_options
.iter()
.any(|option| matches!(option.name, MaterializedViewOptionName::Refresh))
{
node.with_options.push(MaterializedViewOption {
name: MaterializedViewOptionName::Refresh,
value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
})
}
}
}

Rewriter.visit_statement_mut(stmt);

Ok(())
}

fn _add_to_audit_log(
tx: &mut Transaction,
event_type: mz_audit_log::EventType,
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1921,6 +1921,7 @@ mod builtin_migration_tests {
cluster_id: ClusterId::User(1),
non_null_assertions: vec![],
custom_logical_compaction_window: None,
refresh_schedule: None,
})
}
SimplifiedItem::Index { on } => {
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ impl CatalogState {
self.entry_by_id.get_mut(id).expect("catalog out of sync")
}

/// Gets an type named `name` from exactly one of the system schemas.
/// Gets a type named `name` from exactly one of the system schemas.
///
/// # Panics
/// - If `name` is not an entry in any system schema
Expand Down Expand Up @@ -942,6 +942,7 @@ impl CatalogState {
cluster_id: materialized_view.cluster_id,
non_null_assertions: materialized_view.non_null_assertions,
custom_logical_compaction_window: materialized_view.compaction_window,
refresh_schedule: materialized_view.refresh_schedule,
})
}
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
Expand Down
88 changes: 86 additions & 2 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@ use mz_sql::ast::{
CopyRelation, CopyStatement, InsertSource, Query, Raw, SetExpr, Statement, SubscribeStatement,
};
use mz_sql::catalog::RoleAttributes;
use mz_sql::names::{PartialItemName, ResolvedIds};
use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
use mz_sql::plan::{
AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan, TransactionType,
};
use mz_sql::pure::{
materialized_view_option_contains_temporal, purify_create_materialized_view_options,
};
use mz_sql::rbac;
use mz_sql::rbac::CREATE_ITEM_USAGE;
use mz_sql::session::user::User;
use mz_sql::session::vars::{
EndTransactionAction, OwnedVarInput, Value, Var, STATEMENT_LOGGING_SAMPLE_RATE,
};
use mz_sql_parser::ast::CreateMaterializedViewStatement;
use mz_storage_types::sources::Timeline;
use opentelemetry::trace::TraceContextExt;
use tokio::sync::{mpsc, oneshot, watch};
use tracing::{debug_span, Instrument};
Expand Down Expand Up @@ -616,7 +621,7 @@ impl Coordinator {
let original_stmt = Arc::clone(&stmt);
// `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
// IDs, then `resolved_ids` should be updated to also remove/add those IDs.
let (stmt, resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
Ok(resolved) => resolved,
Err(e) => return ctx.retire(Err(e.into())),
};
Expand Down Expand Up @@ -685,6 +690,85 @@ impl Coordinator {
"CREATE SUBSOURCE statements",
))),

Statement::CreateMaterializedView(mut cmvs) => {
// (This won't be the same timestamp as the system table inserts, unfortunately.)
let mz_now = if cmvs
.with_options
.iter()
.any(materialized_view_option_contains_temporal)
{
let timeline_context =
match self.validate_timeline_context(resolved_ids.0.clone()) {
Ok(tc) => tc,
Err(e) => return ctx.retire(Err(e)),
};

// We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
// but even in the TimestampIndependent case.
// Note that we didn't accurately decide whether we are TimestampDependent
// or TimestampIndependent, because for this we'd need to also check whether
// `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
// However, this doesn't matter here, as we are just going to default to
// EpochMilliseconds in both cases.
let timeline = timeline_context
.timeline()
.unwrap_or(&Timeline::EpochMilliseconds);
Some(self.get_timestamp_oracle(timeline).read_ts().await)
// TODO: It might be good to take into account `least_valid_read` in addition to
// the oracle's `read_ts`, but there are two problems:
// 1. At this point, we don't know which indexes would be used. We could do an
// overestimation here by grabbing the ids of all indexes that are on ids
// involved in the query. (We'd have to recursively follow view definitions,
// similarly to `validate_timeline_context`.)
// 2. For a peek, when the `least_valid_read` is later than the oracle's
// `read_ts`, then the peek doesn't return before it completes at the chosen
// timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear
// whether we want to make it block until the chosen time. If it doesn't block,
// then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED
// VIEW statement.
//
// Note: The Adapter is usually keeping a read hold of all objects at the oracle
// read timestamp, so `least_valid_read` usually won't actually be later than
// the oracle's `read_ts`. (see `Coordinator::advance_timelines`)
//
// Note 2: If we choose a timestamp here that is earlier than
// `least_valid_read`, that is somewhat bad, but not catastrophic: The only
// bad thing that happens is that we won't perform that refresh that was
// specified to be at `mz_now()` (which is usually the initial refresh)
// (similarly to how we don't perform refreshes that were specified to be in the
// past).
} else {
None
};

let owned_catalog = self.owned_catalog();
let catalog = owned_catalog.for_session(ctx.session());

purify_create_materialized_view_options(
catalog,
mz_now,
&mut cmvs,
&mut resolved_ids,
);

let purified_stmt =
Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
if_exists: cmvs.if_exists,
name: cmvs.name,
columns: cmvs.columns,
in_cluster: cmvs.in_cluster,
query: cmvs.query,
with_options: cmvs.with_options,
});

// (Purifying CreateMaterializedView doesn't happen async, so no need to send
// `Message::PurifiedStatementReady` here.)
ggevay marked this conversation as resolved.
Show resolved Hide resolved
match self.plan_statement(ctx.session(), purified_stmt, &params, &resolved_ids) {
Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
Err(e) => ctx.retire(Err(e)),
}
}
ggevay marked this conversation as resolved.
Show resolved Hide resolved

// All other statements are handled immediately.
_ => match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
Expand Down