Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute: tokenize delta join operator #18927

Merged
merged 2 commits into from
Apr 27, 2023

Conversation

teskje
Copy link
Contributor

@teskje teskje commented Apr 24, 2023

This PR adds shutdown token checking to the closures we pass to join operators as described in the Faster Dataflow Shutdown design. When the dataflow is shutting down, this makes the join closures drain all input data, rather than processing it. As a result, join operators shut down faster and emit less data, which in turn speeds up shutdown of downstream operators.

The PR is split into three commits:

  • Commit 1 adds a ShutdownToken type that conveniently wraps Option<Weak<()>>. In particular, it provides a probe method that make the check inside the join closures a single line, rather than a multi-line if-return statement.
  • Commit 2 adds a shutdown token check to the delta join operator.
  • Commit 3 adds a shutdown token check to the linear join operator. I decided to remove this again due to the issues with interactivity (see below). We should come up with a better way to control yielding in DD before we attempt tokenizing the linear join. You can still view the commit here.

While tokenizing the delta join improves both shutdown time and interactivity during shutdown, tokenizing the linear join improves shutdown time but is detrimental to interactivity (see performance measurements). As a result, this PR only adds shutdown checking to the former.

Tokenization improves shutdown time for both delta and linear joins because a) the joins need to evaluate fewer expressions and b) downstream operators receive fewer inputs (although the performance tests don't try to test this).

Tokenization improves interactivity with delta joins because this operator yields after having processed 1 million updates, and processing an update is faster if it is discarded, compared to evaluating an expression on it. Crucially, the delta join operator (more precisely: the half join operator) applies its fuel against updates that go into the join closure. In contrast, the differential join operator applies its fuel against updates returned by the join closure. As a result, the shutdown check (which makes the join closure stop emitting updates on shutdown) breaks the operator's fueling. On shutdown, the tokenized differential join operator will crunch through all updates on its inputs without yielding. This is good for throughput but bad for interactivity.

To solve the interactivity issues with linear joins, we need to adjust the way the DD join operator decides whether it should yield. One approach could be to introduce a yield_function like the half join operator has, and apply time-based yielding during dataflow shutdown.

Motivation

  • This PR fixes a previously unreported bug.

Joins can consume resources and impact interactivity even after their dataflows have been dropped.

Improves #7577.
Workaround for #2392.

Tips for reviewer

Check out the performance measurements!

I'm unsure about how to write a test for this. Given that shutdowns of tokenized joins still take some time proportional to the number of queued updates, it seems hard to come up with a test that is not prone to flakiness. We'd essentially need to test whether a join dataflow goes away "relatively fast", but there is no good way to determine what that means without considering the environment the test is running in.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • This PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way) and therefore is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • This PR includes the following user-facing behavior changes:
    • N/A

@teskje
Copy link
Contributor Author

teskje commented Apr 25, 2023

Performance Measurements

For both delta and linear joins, we are interested in testing these performance aspects of tokenization:

  • Impact on normal processing: whether the additional token check in the join closure has non-negligible impact on the join processing performance when the dataflow is not shutting down.
  • Time to shutdown: how much time passes between cancellation of the dataflow until it is completely shut down.
  • Impact on interactivity: how token checking impacts interactivity for users during dataflow shutdown.

Delta Join

As a test case for delta joins I used a simple cross-join:

materialize=> create table t (a int);
CREATE TABLE
materialize=> explain select t1.a + t2.a + t3.a from t t1, t t2, t t3;
          Optimized Plan
----------------------------------
 Explained Query:                +
   Return                        +
     Project (#3)                +
       Map (((#0 + #1) + #2))    +
         CrossJoin type=delta    +
           Get l0                +
           Get l0                +
           Get l0                +
   With                          +
     cte l0 =                    +
       ArrangeBy keys=[[]]       +
         Get materialize.public.t+

Impact on Normal Processing

Run the join query to completion, measure the time that takes.
We add a LIMIT 1 to avoid an error due to a too-large result.

-- setup
create table t (a int);
insert into t select * from generate_series(1, 1000);

-- experiment
\timing
select t1.a + t2.a + t3.a from t t1, t t2, t t3 limit 1;

-- result on main
Time: 315092.618 ms (05:15.093)

-- result on this branch
Time: 316690.848 ms (05:16.691)

The token check's impact on delta join processing is negligible.

Time to Shutdown

Cancel the join query after 1s, use a subscribe to measure the time until the dataflow stops showing up in the introspection sources.

-- setup
create table t (a int);
insert into t select * from generate_series(1, 1000);

-- first SQL session
copy (subscribe mz_internal.mz_dataflows with (progress)) to stdout;

-- second SQL session
set statement_timeout = '1s';
insert into t select t1.a + t2.a + t3.a from t t1, t t2, t t3;

-- result on main
1682337080000	f	1	240	7	Dataflow: oneshot-select-t2
1682337082000	t	\N	\N	\N	\N
1682337083000	t	\N	\N	\N	\N
1682337085000	t	\N	\N	\N	\N
1682337086000	t	\N	\N	\N	\N
1682337087000	t	\N	\N	\N	\N
[...]
1682337393000	t	\N	\N	\N	\N
1682337394000	t	\N	\N	\N	\N
1682337394000	f	-1	240	7	Dataflow: oneshot-select-t2
-- => 314s

-- result on this branch
1682336590000	f	1	240	7	Dataflow: oneshot-select-t4
1682336592000	t	\N	\N	\N	\N
1682336593000	t	\N	\N	\N	\N
1682336594000	t	\N	\N	\N	\N
1682336595000	t	\N	\N	\N	\N
1682336596000	t	\N	\N	\N	\N
[...]
1682336632000	t	\N	\N	\N	\N
1682336633000	t	\N	\N	\N	\N
1682336633000	f	-1	240	7	Dataflow: oneshot-select-t4
-- => 43s

The token check significantly reduces the time to dataflow shutdown.

Impact on interactivity

Cancel the join query after 1s, measure the time of subsequent queries.

-- setup
create table t (a int);
insert into t select * from generate_series(1, 10000);

-- experiment
\timing
set statement_timeout = '1s';
insert into t select t1.a + t2.a + t3.a from t t1, t t2, t t3;
select * from mz_internal.mz_dataflows;
select * from mz_internal.mz_dataflows;
select * from mz_internal.mz_dataflows;
select * from mz_internal.mz_dataflows;
select * from mz_internal.mz_dataflows;

-- result on main
Time: 4281.017 ms (00:04.281)
Time: 4028.588 ms (00:04.029)
Time: 4753.921 ms (00:04.754)
Time: 5498.200 ms (00:05.498)
Time: 6200.048 ms (00:06.200)

-- result on this branch
Time: 2254.506 ms (00:02.255)
Time: 794.095 ms
Time: 792.803 ms
Time: 786.285 ms
Time: 1102.680 ms (00:01.103)

The token check significantly improves interactivity during the dataflow shutdown.

Linear Join

When selecting a query for testing linear joins, we need to be more careful. The differential join operator checks its fuel only between keys, so a cross-join is essentially unfueled and therefore never gets notified of a token drop. We need to make sure that our test case includes multiple join keys. Further, fueling is based on the number of distinct DD updates (i.e. (data, time, diff) tuples). A single DD update can through its diff field represent a large amount of SQL-level records but consume close to no fuel. So we also need to make sure that the join we use for testing yields a large number of distinct records, to ensure the join operator yields regularly.

materialize=> create table t (a int, b int);
CREATE TABLE
materialize=> explain select t1.b * 10000 + t2.b from t t1 join t t2 using (a);
               Optimized Plan
---------------------------------------------
 Explained Query:                           +
   Return                                   +
     Project (#4)                           +
       Map (((#1 * 10000) + #3))            +
         Join on=(#0 = #2) type=differential+
           Get l0                           +
           Get l0                           +
   With                                     +
     cte l0 =                               +
       ArrangeBy keys=[[#0]]                +
         Filter (#0) IS NOT NULL            +
           Get materialize.public.t         +
                                            +
 Source materialize.public.t                +
   filter=((#0) IS NOT NULL)                +

Impact on Normal Processing

Run the join query to completion, measure the time that takes.
We add a LIMIT 1 to avoid an error due to a too-large result.

-- setup
create table t (a int, b int);
insert into t select a, b from generate_series(1, 1000) a, generate_series(1, 1000) b;

-- experiment
\timing
select t1.b * 10000 + t2.b from t t1 join t t2 using (a) limit 1;

-- result on main
Time: 404629.115 ms (06:44.629)

-- result on this branch
Time: 410193.309 ms (06:50.193)

-- result with TimelyDataflow/differential-dataflow#389
Time: 416953.989 ms (06:56.954)

The token check's impact on linear join processing is noticeable (~1%) but presumably still justifiable.
The change to effort checking in the differential join operators adds another ~1% of slowdown.

Time to Shutdown

Cancel the join query after 1s, use a subscribe to measure the time until the dataflow stops showing up in the introspection sources.

-- setup
create table t (a int, b int);
insert into t select a, b from generate_series(1, 1000) a, generate_series(1, 1000) b;

-- first SQL session
copy (subscribe mz_internal.mz_dataflows with (progress)) to stdout;

-- second SQL session
set statement_timeout = '1s';
insert into t select t1.b * 10000 + t2.b from t t1 join t t2 using (a);

-- result on main
1682342622000	f	1	240	7	Dataflow: oneshot-select-t4
1682342625000	t	\N	\N	\N	\N
1682342629000	t	\N	\N	\N	\N
1682342630000	t	\N	\N	\N	\N
1682342631000	t	\N	\N	\N	\N
1682342632000	t	\N	\N	\N	\N
[...]
1682343096000	t	\N	\N	\N	\N
1682343097000	t	\N	\N	\N	\N
1682343098000	t	\N	\N	\N	\N
1682343098000	f	-1	240	7	Dataflow: oneshot-select-t4
-- => 476s

-- result on this branch
1682342126000	f	1	3174	51	Dataflow: oneshot-select-t87
1682342127000	t	\N	\N	\N	\N
1682342128000	t	\N	\N	\N	\N
1682342145000	t	\N	\N	\N	\N
1682342145000	f	-1	3174	51	Dataflow: oneshot-select-t87
1682342146000	t	\N	\N	\N	\N
-- => 19s

-- result with TimelyDataflow/differential-dataflow#389
1682420207000	f	1	365	9	Dataflow: oneshot-select-t6
1682420209000	t	\N	\N	\N	\N
1682420210000	t	\N	\N	\N	\N
1682420211000	t	\N	\N	\N	\N
1682420212000	t	\N	\N	\N	\N
1682420213000	t	\N	\N	\N	\N
[...]
1682420223000	t	\N	\N	\N	\N
1682420224000	t	\N	\N	\N	\N
1682420225000	t	\N	\N	\N	\N
1682420225000	f	-1	365	9	Dataflow: oneshot-select-t6
-- => 18s

The token check significantly reduces the time to dataflow shutdown.

Impact on interactivity

Cancel the join query after 1s, measure the time of subsequent queries.

-- setup
create table t (a int, b int);
insert into t select a, b from generate_series(1, 1000) a, generate_series(1, 1000) b;

-- experiment
\timing
set statement_timeout = '1s';
insert into t select t1.b * 10000 + t2.b from t t1 join t t2 using (a);
select * from mz_internal.mz_dataflows;
select * from mz_internal.mz_dataflows;
select * from mz_internal.mz_dataflows;
select * from mz_internal.mz_dataflows;
select * from mz_internal.mz_dataflows;

-- result on main
Time: 2161.498 ms (00:02.161)
Time: 2351.707 ms (00:02.352)
Time: 2341.483 ms (00:02.341)
Time: 2336.083 ms (00:02.336)
Time: 2330.590 ms (00:02.331)

-- result on this branch
Time: 17063.713 ms (00:17.064)
Time: 59.697 ms
Time: 6.032 ms
Time: 5.783 ms
Time: 6.287 ms

-- result with TimelyDataflow/differential-dataflow#389
Time: 1257.665 ms (00:01.258)
Time: 81.699 ms
Time: 81.119 ms
Time: 81.497 ms
Time: 79.772 ms

The token check significantly degrades interactivity during the dataflow shutdown. As mentioned in the PR description, this is because it essentially disables fueling after the dataflow was cancelled, so queries can only return once the join dataflow has shut down.

This issue is mitigated with the effort counting adjustment to the differential join operator, which restores fueling as intended and as a result provides significantly better interactivity than the token-less implementation.

This commit adds a shutdown token check to the closure we pass to the
delta join operator. When the dataflow is shutting down, this makes the
join closure drain all input data, rather than processing it. As a
results, delta join operators shut down faster and emit less data,
which in turn speeds up shutdown of downstream operators.
@teskje teskje changed the title compute: tokenize join operators compute: tokenize delta join operator Apr 26, 2023
@teskje teskje marked this pull request as ready for review April 26, 2023 09:05
@teskje teskje requested review from a team, tokenrove and vmarcos April 26, 2023 09:05
@@ -364,6 +370,10 @@ where
|_timer, count| count > 1_000_000,
// TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any benefit to also checking the token in the yield function? Might save a bunch of sorting and buffer manipulation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, how would that allow us to reduce sorting and buffer manipulation? Do you mean we could yield less often when the token was dropped and thereby avoid duplicate work?

I have been thinking about adding a token check to the yield function, to switch to time-based yielding when the token was dropped. But Frank's comment above makes me somewhat scared of messing with the yield function at all :D #8818 provides context on that comment. The TLDR is that specifying a bad yield function introduces the risk that the half join operator gets stuck because it yields before it was able to make any durable progress.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, for some reason I was thinking about that from the point of view of if that dataflow would eventually get dropped, which will not happen, so yielding on shutdown would be bad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, unfortunately with this approach we still need to grind through all the join matches produced by input data that has made it to the operator so far. There is really no way around this, short of modifying the implementation in DD to do a token check as well. Or drop_dataflow of course :)

@tokenrove
Copy link
Contributor

I'm unsure about how to write a test for this. Given that shutdowns of tokenized joins still take some time proportional to the number of queued updates, it seems hard to come up with a test that is not prone to flakiness. We'd essentially need to test whether a join dataflow goes away "relatively fast", but there is no good way to determine what that means without considering the environment the test is running in.

Yeah, this is the kind of thing that's usually easier to track with a benchmark suite and a long-term history of measurements in a consistent environment. Might be good to keep track of the idea that we'd like to do benchmarks that measure shutdown efficacy though.

Copy link
Contributor

@vmarcos vmarcos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the measurements and the changes; these LGTM! I agree that it is wise to only merge the two commits in the PR at present, and take a bit more time to evaluate strategies for linear joins. The evaluation was particularly helpful to increase confidence here, so much appreciated!

@teskje
Copy link
Contributor Author

teskje commented Apr 27, 2023

TFTRs!

@teskje teskje merged commit 8acd600 into MaterializeInc:main Apr 27, 2023
@teskje teskje deleted the faster-join-shutdown branch April 27, 2023 11:18
@teskje teskje mentioned this pull request Jul 17, 2023
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants