Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce lock contention in RepartitionExec::execute #10009

Merged
merged 2 commits into from
Apr 10, 2024

Conversation

crepererum
Copy link
Contributor

@crepererum crepererum commented Apr 9, 2024

Which issue does this PR close?

Closes #10014

Rationale for this change

The state is initialized ONCE for all partitions. However this may take a short while (on a very busy system 1ms or more). It is quite likely that multiple threads call execute at the same time, because we have just fanned out to the number "target partitions" which is likely set to the number of CPU cores which now all try to start to execute the plan at the same time.

The solution is to not waste CPU circles in some futex lock but to tell the async runtime (= tokio) that we are performing work and the other threads should rather do something useful.

What changes are included in this PR?

This mostly just moves code around, no functional change intended.

Are these changes tested?

Existing tests still pass.

Are there any user-facing changes?

Faster query exec.

The state is initialized ONCE for all partitions. However this may take
a short while (on a very busy system 1ms or more). It is quite likely
that multiple threads call `execute` at the same time, because we have
just fanned out to the number "target partitions" which is likely set to
the number of CPU cores which now all try to start to execute the plan
at the same time.

The solution is to not waste CPU circles in some futex lock but to tell
the async runtime (= tokio) that we are performing work and the other
threads should rather do something useful.

This mostly just moves code around, no functional change intended.
})
.collect();

// TODO: metric input-output mapping is broken
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was broken before: the first parameter was set to partition, i.e. the first partition that initializes the state. That's clearly wrong. I now initialize it to 0 and will fix the tracking properly in a follow-up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #10015 to track

Comment on lines +1297 to +1300
let mut background_task = JoinSet::new();
background_task.spawn(async move {
input.wait().await;
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That goes for this and the other tests:

There is NO guarantee that the inputs to RepartitionExec are polled if you never poll any of the outputs. Hence we need to move this barrier call into a background task so it happens at the same time as the output poll below.

@alamb alamb changed the title fix: lock contention in RepartitionExec::execute fix: reduce lock contention in RepartitionExec::execute Apr 9, 2024
@Dandandan
Copy link
Contributor

/benchmark

:)

Copy link

github-actions bot commented Apr 9, 2024

Benchmark results

Benchmarks comparing cb21404 (main) and 01a4468 (PR)
Comparing cb21404 and 01a4468
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃  cb21404 ┃  01a4468 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 288.93ms │ 290.89ms │     no change │
│ QQuery 2     │  38.25ms │  41.82ms │  1.09x slower │
│ QQuery 3     │  61.54ms │  59.31ms │     no change │
│ QQuery 4     │  79.36ms │  81.27ms │     no change │
│ QQuery 5     │  98.31ms │  99.78ms │     no change │
│ QQuery 6     │  16.76ms │  16.86ms │     no change │
│ QQuery 7     │ 229.19ms │ 226.86ms │     no change │
│ QQuery 8     │  54.87ms │  43.09ms │ +1.27x faster │
│ QQuery 9     │ 123.68ms │ 122.10ms │     no change │
│ QQuery 10    │ 110.49ms │ 110.77ms │     no change │
│ QQuery 11    │  44.05ms │  50.13ms │  1.14x slower │
│ QQuery 12    │  60.47ms │  59.46ms │     no change │
│ QQuery 13    │ 104.21ms │ 110.17ms │  1.06x slower │
│ QQuery 14    │  19.95ms │  19.31ms │     no change │
│ QQuery 15    │  32.66ms │  33.27ms │     no change │
│ QQuery 16    │  48.57ms │  47.74ms │     no change │
│ QQuery 17    │ 147.98ms │ 157.37ms │  1.06x slower │
│ QQuery 18    │ 550.89ms │ 579.23ms │  1.05x slower │
│ QQuery 19    │  66.28ms │  63.46ms │     no change │
│ QQuery 20    │ 123.06ms │ 114.96ms │ +1.07x faster │
│ QQuery 21    │ 334.90ms │ 364.57ms │  1.09x slower │
│ QQuery 22    │  39.90ms │  39.74ms │     no change │
└──────────────┴──────────┴──────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary      ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (cb21404)   │ 2674.32ms │
│ Total Time (01a4468)   │ 2732.16ms │
│ Average Time (cb21404) │  121.56ms │
│ Average Time (01a4468) │  124.19ms │
│ Queries Faster         │         2 │
│ Queries Slower         │         6 │
│ Queries with No Change │        14 │
└────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃  cb21404 ┃  01a4468 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │ 428.26ms │ 439.25ms │    no change │
│ QQuery 2     │  55.42ms │  57.68ms │    no change │
│ QQuery 3     │ 142.59ms │ 144.14ms │    no change │
│ QQuery 4     │  88.98ms │  85.14ms │    no change │
│ QQuery 5     │ 197.39ms │ 197.44ms │    no change │
│ QQuery 6     │ 104.73ms │ 109.25ms │    no change │
│ QQuery 7     │ 276.97ms │ 294.83ms │ 1.06x slower │
│ QQuery 8     │ 184.92ms │ 189.25ms │    no change │
│ QQuery 9     │ 287.43ms │ 301.41ms │    no change │
│ QQuery 10    │ 230.19ms │ 242.36ms │ 1.05x slower │
│ QQuery 11    │  60.73ms │  62.24ms │    no change │
│ QQuery 12    │ 125.82ms │ 123.79ms │    no change │
│ QQuery 13    │ 173.22ms │ 177.07ms │    no change │
│ QQuery 14    │ 125.29ms │ 127.83ms │    no change │
│ QQuery 15    │ 183.74ms │ 189.55ms │    no change │
│ QQuery 16    │  50.30ms │  51.76ms │    no change │
│ QQuery 17    │ 307.78ms │ 307.77ms │    no change │
│ QQuery 18    │ 440.73ms │ 465.69ms │ 1.06x slower │
│ QQuery 19    │ 226.71ms │ 232.00ms │    no change │
│ QQuery 20    │ 191.39ms │ 195.59ms │    no change │
│ QQuery 21    │ 320.31ms │ 322.27ms │    no change │
│ QQuery 22    │  52.58ms │  53.67ms │    no change │
└──────────────┴──────────┴──────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary      ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (cb21404)   │ 4255.48ms │
│ Total Time (01a4468)   │ 4370.00ms │
│ Average Time (cb21404) │  193.43ms │
│ Average Time (01a4468) │  198.64ms │
│ Queries Faster         │         0 │
│ Queries Slower         │         3 │
│ Queries with No Change │        19 │
└────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃   cb21404 ┃   01a4468 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │ 4193.11ms │ 4270.80ms │    no change │
│ QQuery 2     │  471.42ms │  498.39ms │ 1.06x slower │
│ QQuery 3     │ 1684.99ms │ 1726.17ms │    no change │
│ QQuery 4     │  814.68ms │  811.16ms │    no change │
│ QQuery 5     │ 2157.72ms │ 2181.34ms │    no change │
│ QQuery 6     │ 1010.61ms │ 1042.31ms │    no change │
│ QQuery 7     │ 3646.77ms │ 3652.32ms │    no change │
│ QQuery 8     │ 2455.95ms │ 2447.77ms │    no change │
│ QQuery 9     │ 4110.48ms │ 4019.59ms │    no change │
│ QQuery 10    │ 2537.27ms │ 2521.15ms │    no change │
│ QQuery 11    │  562.46ms │  555.69ms │    no change │
│ QQuery 12    │ 1196.63ms │ 1197.01ms │    no change │
│ QQuery 13    │ 2342.79ms │ 2382.42ms │    no change │
│ QQuery 14    │ 1267.59ms │ 1286.54ms │    no change │
│ QQuery 15    │ 1919.37ms │ 1951.67ms │    no change │
│ QQuery 16    │  515.55ms │  518.62ms │    no change │
│ QQuery 17    │ 5300.18ms │ 5407.52ms │    no change │
│ QQuery 18    │ 6846.98ms │ 6923.99ms │    no change │
│ QQuery 19    │ 2222.72ms │ 2301.25ms │    no change │
│ QQuery 20    │ 2570.40ms │ 2599.65ms │    no change │
│ QQuery 21    │ 4365.39ms │ 4364.31ms │    no change │
│ QQuery 22    │  571.29ms │  569.52ms │    no change │
└──────────────┴───────────┴───────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary      ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (cb21404)   │ 52764.34ms │
│ Total Time (01a4468)   │ 53229.17ms │
│ Average Time (cb21404) │  2398.38ms │
│ Average Time (01a4468) │  2419.51ms │
│ Queries Faster         │          0 │
│ Queries Slower         │          1 │
│ Queries with No Change │         21 │
└────────────────────────┴────────────┘

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @crepererum -- I reviewed this code carefully and it makes sense to me

I think it is important to document the rationale in comments to reduce the chance of this change being accidentally undone in the future, but otherwise I think it is ready to go 🚀

@@ -1240,7 +1294,10 @@ mod tests {
std::mem::drop(output_stream0);

// Now, start sending input
input.wait().await;
let mut background_task = JoinSet::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this change? I tried this change without the other changes in this PR and the test still passes (I was expecting it would hang or something)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafusion/physical-plan/src/repartition/mod.rs Outdated Show resolved Hide resolved
})
.collect();

// TODO: metric input-output mapping is broken
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #10015 to track

@crepererum
Copy link
Contributor Author

TBH a slightly slower runtime in a "repeat a single query in a hot-loop" benchmark is somewhat expected. If you have enough tokio threads to spare, you can totally futex-wait. But for any kind of server software that's a bad idea.

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@crepererum crepererum merged commit 75c399c into apache:main Apr 10, 2024
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce lock contention in RepartitionExec::execute
3 participants