Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dataflow] Fuel the arrangement to stream operator #7171

Merged
merged 4 commits into from Aug 5, 2021

Conversation

frankmcsherry
Copy link
Contributor

@frankmcsherry frankmcsherry commented Jun 25, 2021

This PR introduces fuel to the primary operator that converts arrangements to streams of updates. This should increase the responsiveness of the system, as queries deriving from arrangements needn't lock up the dataflow with the operators that turn entire arrangements into a stream of updates. At the same time, it may also improve performance, as this provides the ability of the rest of the system to respond as outputs are produced, potentially accumulating them or otherwise retiring them.

This hasn't been tested other than with sqllogictest, and in particular isn't certain to fix the issues it is pointed at. The default number of records is set at 1M which may be too large, but should be something that we can at least distinguish from not yielding at all (on sufficiently large inputs).

The next step is to test this out, determine if it alleviates previously observed responsiveness problems (it will not fix them all, but should be a start).

Fixes #7031

@benesch
Copy link
Member

benesch commented Jun 25, 2021

(I fixed #7301 to #7031.)

@frankmcsherry
Copy link
Contributor Author

@philip-stoev when next we meet, it would be great to discuss how to test this. Specifically, it has behavior that should only be different on collections that have more than 1M records. That number can be dialed around, but it seems likely to be baked in somewhere (roughly: after how many records should the operator yield control). I noticed at least one bug (pre-fixed) that would have resulted in repeated data being output, but I could also imagine that I have others.

@frankmcsherry
Copy link
Contributor Author

@aljoscha this could potentially reduce some of the sink build up you were seeing. I suspect it was at least in some part due to the source of data (an arrangement) spilling out all of the data at once, rather than as downstream operators handle it. Might be a bit before this lands (wants testing) but I wanted to point it out in case you wanted to drive it around and see what it does.

@frankmcsherry frankmcsherry marked this pull request as ready for review July 6, 2021 20:44
@frankmcsherry
Copy link
Contributor Author

I've tested this against sqllogictest with fuel set to both 10 and 1, to try and see if that shakes out any bugs. It did not.

Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, added a nit.

.unwrap()
.do_work(&key, &mut logic, &mut fuel, output);
if fuel > 0 {
todo.pop_front();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider asserting here that the PendingWork removed is indeed completed. It looks like it is from the do_work implementation, but it's always good to avoid surprises.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like, some sort of assert that if you try and read from or step the cursor you get nothing back?

@aljoscha
Copy link
Contributor

@frankmcsherry Because you suggested this as a potential fix for #7043 a while back: it doesn't fix it. The problem there is that the sink machinery that sorts by timestamp and stashes messages until the frontier passes their timestamp still needs to wait for the frontier to advance. In the extreme case of all data having one and the same timestamp, that means waiting for all data and sorting all data in one go. Fueling the import doesn't change that problem because the imported data still all have the same timestamp.

What would help is if the import could compact the timestamp forward as the import is happening, thus yielding a smoother distribution of timestamps. Is that at all possible with how an arrangement import currently works?

@frankmcsherry
Copy link
Contributor Author

Afaict, you are asking if the import could "lie" about the timestamp so that the progress tracking machinery could report "complete" times? I think in any situation that this is safe, there is no consolidation happening, and the code can instead just rely on the fact that arrangements can produce compacted data that can just be emitted once seen.

I feel like there is some random mapping and such that lives between the arrangement and the sink. I have no clue what it is, but if we can nail that down (and constrain it appropriately) then the data could just be streamed out.

Sinks could also read directly out of arrangements, per some issue from like a year ago. Them wanting to sort by time is neat, but it comes with this performance penalty.

@aljoscha
Copy link
Contributor

Afaict, you are asking if the import could "lie" about the timestamp so that the progress tracking machinery could report "complete" times? I think in any situation that this is safe, there is no consolidation happening, and the code can instead just rely on the fact that arrangements can produce compacted data that can just be emitted once seen.

I' not 100% sure. Also, I think what I am asking for is not possible with how arrangements work. Imagine you're sinking an arrangement that has 100m entries, compaction window is set to 1ms. Importing it takes 20 seconds, but all records have the timestamp that they had when importing started. What I was asking for is that (timestamp) compaction is reflected in the records as they are emitted over 20 seconds. But I think that's not possible, because at the start of the import, you basically get a cursor over the records as of the start time and holding that cursor keeps them from being compacted, right?

I feel like there is some random mapping and such that lives between the arrangement and the sink. I have no clue what it is, but if we can nail that down (and constrain it appropriately) then the data could just be streamed out.

Unfortunately, we can't just stream them out because of the exactly-once business that the sink does. Or at least not with the current implementation. I had some thoughts written down about that here: https://gist.github.com/aljoscha/9d134bbec1b5fceacf8a2da791a44aef

Sinks could also read directly out of arrangements, per some issue from like a year ago. Them wanting to sort by time is neat, but it comes with this performance penalty.

Yes! I'm aware of #2808 and would love to do it, but I think that also wouldn't easily work to provide the guarantees that the sink currently provides. There's a short comment on that in https://gist.github.com/aljoscha/9d134bbec1b5fceacf8a2da791a44aef as well.

@philip-stoev philip-stoev self-requested a review August 3, 2021 13:05
@philip-stoev
Copy link
Contributor

philip-stoev commented Aug 4, 2021

@frankmcsherry many apologies, I did not see this PR previously. Will work on it today.

EDIT: RE the 1M limit , I am going to patch this locally to be 10 records or some other very small value and this should bring more of the existing tests to bear. I will also use the opportunity to see how we can start creating a "1M rows" body of tests that would be exercising the product across the board with more than a few records per source/table.

@philip-stoev
Copy link
Contributor

Item No 1. The following query never completes with refuel =1 and refuel=10 (I gave it 20 minutes on 4 cores) but is instantaneous with refuel =1000 and higher values. CPU is pegged at 400%, mz_message_counts reports hundreds of millions of messages are being exchanged. The record counters on the SVG graph show no progress.

As the participating tables have hundreds of records at most, the query should be possible to evaluate even at refuel = 10. Therefore, I think some endless loop situation has developed.

Perf reports the following:

+    4.95%     1.94%  timely:work-0    materialized        [.] <differential_dataflow::trace::implementations::spine_fueled::Spine<K,V,T,R,B> as differential_dataflow::trace::TraceReader>::map_batches     ◆
-    4.90%     0.00%  timely:work-3    [unknown]           [.] 0000000000000000                                                                                                                              ▒
   - 0                                                                                                                                                                                                       ▒
        1.05% <differential_dataflow::trace::implementations::spine_fueled::Spine<K,V,T,R,B> as differential_dataflow::trace::TraceReader>::map_batches                                                      ▒
        0.64% hashbrown::map::HashMap<K,V,S>::get_mut                                                                                                                                                        ▒
+    4.85%     1.99%  timely:work-1    materialized        [.] <differential_dataflow::trace::implementations::spine_fueled::Spine<K,V,T,R,B> as differential_dataflow::trace::TraceReader>::map_batches     ▒
+    4.84%     1.93%  timely:work-3    materialized        [.] <differential_dataflow::trace::implementations::spine_fueled::Spine<K,V,T,R,B> as differential_dataflow::trace::TraceReader>::map_batches     ▒
+    4.81%     0.00%  timely:work-0    [unknown]           [.] 0000000000000000                                                                                                                              ▒
+    4.56%     0.00%  timely:work-1    [unknown]           [.] 0000000000000000                                                                                                                              ▒
+    4.24%     0.00%  timely:work-2    [unknown]           [.] 0000000000000000                                                                                                                              ▒
+    3.60%     1.59%  timely:work-2    materialized        [.] <differential_dataflow::trace::implementations::spine_fueled::Spine<K,V,T,R,B> as differential_dataflow::trace::TraceReader>::map_batches     ▒
+    3.38%     3.37%  timely:work-0    materialized        [.] sdallocx                                                                                                                                      ▒
+    3.28%     3.27%  timely:work-3    materialized        [.] sdallocx                                                                                                                                      ▒
+    3.26%     3.25%  timely:work-1    materialized        [.] sdallocx                                                                                                                                      ▒
     2.43%     2.43%  timely:work-2    materialized        [.] sdallocx                                                                                                                                      ▒
+    2.27%     1.48%  timely:work-2    materialized        [.] mallocx                                                                                                                                       ▒
     1.90%     1.84%  timely:work-0    materialized        [.] mallocx                                                                                                                                       ▒
     1.89%     1.83%  timely:work-3    materialized        [.] mallocx                                                                                                                                       ▒
     1.86%     1.81%  timely:work-1    materialized        [.] mallocx                                                                                                                                       ▒
+    1.73%     0.99%  timely:work-1    materialized        [.] differential_dataflow::trace::wrappers::rc::TraceBox<Tr>::adjust_physical_compaction                                                          ▒
+    1.72%     1.72%  timely:work-3    materialized        [.] timely::progress::frontier::MutableAntichain<T>::rebuild                                                                                      ▒
+    1.68%     0.97%  timely:work-0    materialized        [.] differential_dataflow::trace::wrappers::rc::TraceBox<Tr>::adjust_physical_compaction                                                          ▒
+    1.66%     0.13%  timely:work-3    libc-2.31.so        [.] clock_gettime@GLIBC_2.2.5                                                                                                                     ▒
+    1.65%     0.12%  timely:work-0    libc-2.31.so        [.] clock_gettime@GLIBC_2.2.5                                                                                                                     ▒
+    1.65%     0.89%  timely:work-3    materialized        [.] differential_dataflow::trace::wrappers::rc::TraceBox<Tr>::adjust_physical_compaction                                                          ▒
+    1.65%     1.65%  timely:work-1    materialized        [.] timely::progress::frontier::MutableAntichain<T>::rebuild                                                                                      ▒
+    1.63%     0.12%  timely:work-1    libc-2.31.so        [.] clock_gettime@GLIBC_2.2.5                                                                                                                     ▒
+    1.62%     1.62%  timely:work-0    materialized        [.] timely::progress::frontier::MutableAntichain<T>::rebuild                                                                                      ▒
+    1.53%     0.06%  timely:work-0    [vdso]              [.] __vdso_clock_gettime                                                                                                                          ▒
+    1.52%     0.07%  timely:work-3    [vdso]              [.] __vdso_clock_gettime                                                                                                                          ▒
+    1.50%     0.07%  timely:work-1    [vdso]              [.] __vdso_clock_gettime                                                                                                                          ▒
+    1.43%     0.00%  timely:work-3    [unknown]           [.] 0x0000000000000001                                                                                                                            ▒
+    1.40%     0.85%  timely:work-0    materialized        [.] differential_dataflow::trace::wrappers::rc::TraceBox<Tr>::adjust_physical_compaction                                                          ▒
+    1.38%     0.80%  timely:work-2    materialized        [.] differential_dataflow::trace::wrappers::rc::TraceBox<Tr>::adjust_physical_compaction                                                          ▒
+    1.38%     0.11%  timely:work-2    libc-2.31.so        [.] clock_gettime@GLIBC_2.2.5                                                                                                                     ▒
+    1.33%     0.78%  timely:work-3    materialized        [.] differential_dataflow::trace::wrappers::rc::TraceBox<Tr>::adjust_physical_compaction                                                          ▒
+    1.31%     0.79%  timely:work-1    materialized        [.] differential_dataflow::trace::wrappers::rc::TraceBox<Tr>::adjust_physical_compaction                                                          ▒
+    1.27%     0.00%  timely:work-0    [vdso]              [.] 0x00007fff3471b6cb                                                                                                                            ▒
+    1.27%     1.27%  timely:work-0    [vdso]              [.] 0x00000000000006cb                                                                                                                            ▒
+    1.26%     0.00%  timely:work-3    [vdso]              [.] 0x00007fff3471b6cb                                                                                                                            ▒
+    1.26%     1.26%  timely:work-3    [vdso]              [.] 0x00000000000006cb                                                                                                                            ▒
+    1.26%     0.06%  timely:work-2    [vdso]              [.] __vdso_clock_gettime                                                                                                                          ▒
+    1.25%     1.25%  timely:work-2    materialized        [.] timely::progress::frontier::MutableAntichain<T>::rebuild                                                                                      ▒
+    1.25%     0.00%  timely:work-1    [vdso]              [.] 0x00007fff3471b6cb                                                                                                                            ▒
+    1.25%     1.25%  timely:work-1    [vdso]              [.] 0x00000000000006cb                                                                                                                            ▒
+    1.20%     0.00%  timely:work-0    [unknown]           [.] 0x0000000000000001                                                                                                                            ▒
+    1.12%     0.00%  timely:work-2    [unknown]           [.] 0x0000000000000001                                                                                                                            ▒
+    1.05%     0.00%  timely:work-2    [vdso]              [.] 0x00007fff3471b6cb                                                                                                                            ▒
+    1.05%     1.05%  timely:work-2    [vdso]              [.] 0x00000000000006cb                                                                                                                            ▒
+    1.04%     0.00%  timely:work-2    [unknown]           [.] 0x1056394808568b48                                                                                                                            ▒
+    1.04%     0.00%  timely:work-1    [unknown]           [.] 0x0000000000000001                                                                                                                            ▒
+    1.03%     0.62%  timely:work-2    materialized        [.] differential_dataflow::trace::wrappers::rc::TraceBox<Tr>::adjust_physical_compaction                                                          ▒
+    1.01%     1.01%  timely:work-3    materialized        [.] timely::progress::change_batch::ChangeBatch<T>::compact                                                                                       ▒
     0.99%     0.59%  timely:work-0    materialized        [.] <differential_dataflow::trace::implementations::spine_fueled::Spine<K,V,T,R,B> as differential_dataflow::trace::TraceReader>::map_batches 

The query is:

SELECT COUNT(l_shipDATE) AS col7309,
       COUNT(l_receiptDATE) AS col7310,
       COUNT(l_commitDATE) AS col7311,
       COUNT(l_receiptDATE) AS col7312
FROM lineitem
JOIN orders ON (l_orderkey = o_orderkey)
RIGHT JOIN customer ON (o_custkey = c_custkey)
WHERE o_orderdate = l_commitDATE - INTERVAL ' 3 DAY '
  AND l_extendedprice = o_totalprice
  AND l_shipDATE = o_orderdate + INTERVAL ' 6 DAY '
  AND EXISTS
    (SELECT o_custkey
     FROM lineitem
     JOIN orders ON (l_orderkey = o_orderkey)
     WHERE l_commitDATE = '1993-11-23'
       AND l_shipDATE = o_orderdate - INTERVAL ' 0 MONTH '
       AND l_receiptDATE = o_orderdate - INTERVAL ' 7 DAY ' )
  AND EXISTS
    (SELECT o_custkey
     FROM orders
     WHERE l_shipDATE = o_orderdate + INTERVAL ' 2 DAY '
       AND l_receiptDATE = o_orderdate
       AND o_custkey = 7 );

To reproduce, modify refuel to be 10 and then run this SQL file through the psql client: gh7171-item1.sql.zip

@frankmcsherry
Copy link
Contributor Author

Looking into this, it looks like the differential dataflow cursor implementation will reset the value position when you seek a key, rather than leaving it intact. That's not something that has been spec'd out (a bug on my part), but it means that in this PR with indexed access, if you don't completely enumerate the values for a key you'll return and start over on them.

In particular, I think the problematic part of the query above is the last constraint o_custkey = 7 which can use indexed access. I can't get the issue to trigger in a minimized plan like select count(*) from orders where o_custkey = 7, but this PR also predates the work on logical planning that would have made it clearer which implementation was being used.

I'll work on a fix for DD, or for here, once I sort out what the right cursor behavior should be.

@frankmcsherry
Copy link
Contributor Author

The above commit defends against the uncertain cursor semantics, and corrects the issue for fuel = 10 on my desktop (it was reproducing). Independent of the DD decision about cursors, this check should prevent the bad behavior.

@frankmcsherry
Copy link
Contributor Author

The failures seem .. idk maybe spurious, or maybe just wrong:

pg-cdc.td:85:1: error: expected error containing 'error connecting to server: failed to lookup address information: Name or service not known', but got 'error connecting to server: failed to lookup address information: Temporary failure in name resolution: failed to lookup address information: Temporary failure in name resolution'

@philip-stoev
Copy link
Contributor

philip-stoev commented Aug 4, 2021 via email

Copy link
Contributor

@philip-stoev philip-stoev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so:

  • buildkite is green with refuel=1 , refuel=10
  • chbench runs without crashes / loops with refuel=10
  • RQG reports no wrong results with refuel=1, refuel=10
  • I have pushed the endless loop test case from this PR into the branch, please squash it and take it along for the ride to main.

There are multiple hurdles to be overcome in order to push a dedicated test for this, so it can not be done right now:

  • the "refuel" constant should be made configurable for testing purposes using a general mechanism for such constants going forward
  • once implemented, ideally we should not need a dedicated test subdirectory with a mzcompose.yml file in it in order to run such a test -- that would be too much of a boilerplate in order to test one small aspect of the system. Maybe the mzworkflows.py approach that is currently under discussion will allow such tests to be specified in a more concise manner (and hopefully somehow run faster as well).

@frankmcsherry
Copy link
Contributor Author

Thanks very much!

@frankmcsherry frankmcsherry deleted the fuel_arrangement branch March 8, 2022 13:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants