Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions src/adapter/src/optimize/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,25 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
// Set the `as_of` and `until` timestamps for the dataflow.
df_desc.set_as_of(timestamp_ctx.antichain());

// Get the single timestamp representing the `as_of` time.
let as_of = df_desc
.as_of
.clone()
.expect("as_of antichain")
.into_option()
.expect("unique as_of element");

// Resolve all unmaterializable function calls including mz_now().
let style = ExprPrepStyle::OneShot {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be the case that prep_relation_expr or prep_scalar_expr could care about until? If so, we should move the following block before this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fortunately, that can't happen, because we don't pass the DataflowDescription to prep_relation_expr or prep_scalar_expr, so they have no way of observing what until we have set on the DataflowDescription.

logical_time: EvalTime::Time(as_of),
session,
catalog_state: self.catalog.state(),
};
df_desc.visit_children(
|r| prep_relation_expr(r, style),
|s| prep_scalar_expr(s, style),
)?;

// Use the opportunity to name an `until` frontier that will prevent
// work we needn't perform. By default, `until` will be
// `Antichain::new()`, which prevents no updates and is safe.
Expand Down Expand Up @@ -336,25 +355,6 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
}

// Get the single timestamp representing the `as_of` time.
let as_of = df_desc
.as_of
.clone()
.expect("as_of antichain")
.into_option()
.expect("unique as_of element");

// Resolve all unmaterializable function calls including mz_now().
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::Time(as_of),
session,
catalog_state: self.catalog.state(),
};
df_desc.visit_children(
|r| prep_relation_expr(r, style),
|s| prep_scalar_expr(s, style),
)?;

// Ensure all expressions are normalized before finalizing.
for build in df_desc.objects_to_build.iter_mut() {
normalize_lets(&mut build.plan.0, &self.config.features)?
Expand Down
38 changes: 19 additions & 19 deletions src/adapter/src/optimize/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,25 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
// Set the `as_of` and `until` timestamps for the dataflow.
df_desc.set_as_of(timestamp_ctx.antichain());

// Get the single timestamp representing the `as_of` time.
let as_of = df_desc
.as_of
.clone()
.expect("as_of antichain")
.into_option()
.expect("unique as_of element");

// Resolve all unmaterializable function calls including mz_now().
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::Time(as_of),
session,
catalog_state: self.catalog.state(),
};
df_desc.visit_children(
|r| prep_relation_expr(r, style),
|s| prep_scalar_expr(s, style),
)?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern about until here.

// Use the opportunity to name an `until` frontier that will prevent
// work we needn't perform. By default, `until` will be
// `Antichain::new()`, which prevents no updates and is safe.
Expand Down Expand Up @@ -351,25 +370,6 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
}

// Get the single timestamp representing the `as_of` time.
let as_of = df_desc
.as_of
.clone()
.expect("as_of antichain")
.into_option()
.expect("unique as_of element");

// Resolve all unmaterializable function calls including mz_now().
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::Time(as_of),
session,
catalog_state: self.catalog.state(),
};
df_desc.visit_children(
|r| prep_relation_expr(r, style),
|s| prep_scalar_expr(s, style),
)?;

// TODO: use the following code once we can be sure that the
// index_exports always exist.
//
Expand Down
116 changes: 76 additions & 40 deletions test/sqllogictest/filter-pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ CREATE TABLE events (
delete_ms numeric
);

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv1 AS
SELECT count(*)
FROM events
WHERE mz_now() >= insert_ms
AND mz_now() < delete_ms;

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv1
----
Explained Query:
materialize.public.mv1:
With
cte l0 =
Reduce aggregates=[count(*)]
Expand All @@ -58,17 +61,20 @@ Target cluster: quickstart

EOF

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv2 AS
SELECT content, insert_ms
FROM events
-- The event should appear in only one interval of duration `10000`.
-- The interval begins here ...
WHERE mz_now() >= 10000 * (insert_ms / 10000)
-- ... and ends here.
AND mz_now() < 10000 * (1 + insert_ms / 10000)
AND mz_now() < 10000 * (1 + insert_ms / 10000);

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv2
----
Explained Query:
materialize.public.mv2:
Project (#0{content}, #1{insert_ms})
Filter (mz_now() >= numeric_to_mz_timestamp((10000 * #3))) AND (mz_now() < numeric_to_mz_timestamp((10000 * (1 + #3))))
Map ((#1{insert_ms} / 10000))
Expand All @@ -83,17 +89,20 @@ Target cluster: quickstart

EOF

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv3 AS
SELECT content, insert_ms
FROM events
-- The event should appear in `6` intervals each of width `10000`.
-- The interval begins here ...
WHERE mz_now() >= 10000 * (insert_ms / 10000)
-- ... and ends here.
AND mz_now() < 6 * (10000 + insert_ms / 10000)
AND mz_now() < 6 * (10000 + insert_ms / 10000);

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv3
----
Explained Query:
materialize.public.mv3:
Project (#0{content}, #1{insert_ms})
Filter (mz_now() >= numeric_to_mz_timestamp((10000 * #3))) AND (mz_now() < numeric_to_mz_timestamp((6 * (10000 + #3))))
Map ((#1{insert_ms} / 10000))
Expand All @@ -108,18 +117,21 @@ Target cluster: quickstart

EOF

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv4 AS
SELECT content, insert_ms
FROM events
-- The event should appear inside the interval that begins at
-- `insert_ms` and ends at `insert_ms + 30000`.
-- The interval begins here ..
WHERE mz_now() >= insert_ms
-- ... and ends here.
AND mz_now() < insert_ms + 30000
AND mz_now() < insert_ms + 30000;

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv4
----
Explained Query:
materialize.public.mv4:
Project (#0{content}, #1{insert_ms})
Filter (mz_now() >= numeric_to_mz_timestamp(#1{insert_ms})) AND (mz_now() < numeric_to_mz_timestamp((#1{insert_ms} + 30000)))
ReadStorage materialize.public.events
Expand All @@ -132,14 +144,17 @@ Target cluster: quickstart

EOF

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv5 AS
SELECT content, insert_ms, delete_ms
FROM events
WHERE mz_now() >= insert_ms + 60000
AND mz_now() < delete_ms + 60000;

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv5
----
Explained Query:
materialize.public.mv5:
Filter (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 60000))) AND (mz_now() >= numeric_to_mz_timestamp((#1{insert_ms} + 60000)))
ReadStorage materialize.public.events

Expand All @@ -156,13 +171,16 @@ EOF
# can push down are also associative, so this is moot. Let's at least check that an associative
# function _does_ report pushdown even when the argument list is long.

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv6 AS
SELECT content, insert_ms, delete_ms
FROM events
WHERE COALESCE(delete_ms, insert_ms) < mz_now();

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv6
----
Explained Query:
materialize.public.mv6:
Filter (mz_now() > numeric_to_mz_timestamp(coalesce(#2{delete_ms}, #1{insert_ms})))
ReadStorage materialize.public.events

Expand All @@ -174,8 +192,8 @@ Target cluster: quickstart

EOF

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv7 AS
SELECT content, insert_ms, delete_ms
FROM events
WHERE mz_now() < delete_ms + 10000
Expand All @@ -187,8 +205,11 @@ WHERE mz_now() < delete_ms + 10000
AND mz_now() < delete_ms + 70000
AND mz_now() < delete_ms + 80000
AND mz_now() < delete_ms + 90000;

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv7
----
Explained Query:
materialize.public.mv7:
Filter (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 10000))) AND (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 20000))) AND (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 30000))) AND (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 40000))) AND (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 50000))) AND (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 60000))) AND (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 70000))) AND (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 80000))) AND (mz_now() < numeric_to_mz_timestamp((#2{delete_ms} + 90000)))
ReadStorage materialize.public.events

Expand Down Expand Up @@ -229,13 +250,16 @@ EOF
# Verify that try_parse_monotonic_iso8601_timestamp gets pushdown (the whole
# point of that func)

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv8 AS
SELECT content, inserted_at
FROM events_timestamped
WHERE mz_now() < try_parse_monotonic_iso8601_timestamp(content);

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, filter pushdown) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv8
----
Explained Query:
materialize.public.mv8:
Project (#0{content}, #1{inserted_at})
Filter (mz_now() < timestamp_to_mz_timestamp(try_parse_monotonic_iso8601_timestamp(#0{content})))
ReadStorage materialize.public.events_timestamped
Expand Down Expand Up @@ -308,15 +332,18 @@ Target cluster: quickstart

EOF

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(filter pushdown, humanized expressions) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv9 AS
with cte as (
select x, t, case when x=0 then t - INTERVAL '1' day else t - INTERVAL '2' day end as case_statement from t
)
select x, t from cte
where case_statement < mz_now();

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(filter pushdown, humanized expressions) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv9
----
Explained Query:
materialize.public.mv9:
Filter (mz_now() > timestamp_to_mz_timestamp((#1{t} - case when (#0{x} = 0) then 1 day else 2 days end)))
ReadStorage materialize.public.t

Expand All @@ -334,8 +361,8 @@ EOF
statement ok
CREATE TABLE items(id int, ship_time timestamp);

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(filter pushdown, humanized expressions) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv10 AS
SELECT * from items
WHERE mz_now() <= date_trunc(
'month',
Expand All @@ -345,9 +372,12 @@ WHERE mz_now() <= date_trunc(
+ CASE WHEN EXTRACT(MONTH FROM ship_time) >= 6 THEN EXTRACT(MONTH FROM ship_time) - 6 ELSE 0 END
)
* INTERVAL '1 months'
)
);

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(filter pushdown, humanized expressions) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv10
----
Explained Query:
materialize.public.mv10:
Project (#0{id}, #1{ship_time})
Filter (mz_now() <= timestamp_to_mz_timestamp(date_trunc_month_ts((#1{ship_time} - (1 month * numeric_to_double((case when (#2 < 6) then (extract_month_ts(#1{ship_time}) + 6) else 0 end + case when (#2 >= 6) then (extract_month_ts(#1{ship_time}) - 6) else 0 end)))))))
Map (extract_month_ts(#1{ship_time}))
Expand All @@ -361,12 +391,15 @@ Target cluster: quickstart

EOF

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(filter pushdown, humanized expressions) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv11 AS
SELECT * from items
WHERE CASE WHEN id = 10 THEN EXTRACT(MONTH FROM ship_time) ELSE 0 END < mz_now();

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(filter pushdown, humanized expressions) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv11
----
Explained Query:
materialize.public.mv11:
Filter (mz_now() > numeric_to_mz_timestamp(case when (#0{id} = 10) then extract_month_ts(#1{ship_time}) else 0 end))
ReadStorage materialize.public.items

Expand All @@ -377,12 +410,15 @@ Target cluster: quickstart

EOF

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(filter pushdown, humanized expressions) AS VERBOSE TEXT FOR
statement ok
CREATE MATERIALIZED VIEW mv12 AS
SELECT * from items
WHERE CASE WHEN EXTRACT(MONTH FROM ship_time) >= 6 THEN 12 ELSE 0 END < mz_now();

query T multiline
EXPLAIN OPTIMIZED PLAN WITH(filter pushdown, humanized expressions) AS VERBOSE TEXT FOR MATERIALIZED VIEW mv12
----
Explained Query:
materialize.public.mv12:
Filter (mz_now() > integer_to_mz_timestamp(case when (extract_month_ts(#1{ship_time}) >= 6) then 12 else 0 end))
ReadStorage materialize.public.items

Expand Down
7 changes: 3 additions & 4 deletions test/sqllogictest/mztimestamp.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ true
query T multiline
EXPLAIN OPTIMIZED PLAN WITH(humanized expressions, arity, join implementations) AS VERBOSE TEXT FOR SELECT 1::mz_timestamp = mz_now()
----
Explained Query:
Map ((1 = mz_now())) // { arity: 1 }
Constant // { arity: 0 }
- ()
Explained Query (fast path):
Constant
- (false)

Target cluster: quickstart

Expand Down
Loading