Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hash probe side spilling support #8894

Closed
wants to merge 1 commit into from

Conversation

xiaoxmeng
Copy link
Contributor

@xiaoxmeng xiaoxmeng commented Feb 28, 2024

Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed):
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Feb 28, 2024
Copy link

netlify bot commented Feb 28, 2024

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit a614026
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/66066f46d5cec70008b8ab00

@xiaoxmeng xiaoxmeng force-pushed the probe-spill branch 14 times, most recently from 96612d5 to 6813bc5 Compare March 4, 2024 06:38
@xiaoxmeng xiaoxmeng force-pushed the probe-spill branch 4 times, most recently from 50d19e6 to b6f6cd7 Compare March 5, 2024 18:37
@xiaoxmeng xiaoxmeng force-pushed the probe-spill branch 9 times, most recently from 0bd12f8 to 940b2cf Compare March 16, 2024 06:53
xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Mar 28, 2024
Summary:
Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed): 
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.


Reviewed By: bikramSingh91, oerling

Differential Revision: D55054964

Pulled By: xiaoxmeng
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D55054964

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Mar 28, 2024
Summary:
Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed): 
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.


Reviewed By: bikramSingh91, oerling

Differential Revision: D55054964

Pulled By: xiaoxmeng
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D55054964

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Mar 28, 2024
Summary:
Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed): 
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.


Reviewed By: bikramSingh91, oerling

Differential Revision: D55054964

Pulled By: xiaoxmeng
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D55054964

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Mar 28, 2024
Summary:
Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed): 
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.


Reviewed By: bikramSingh91, oerling

Differential Revision: D55054964

Pulled By: xiaoxmeng
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D55054964

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Mar 28, 2024
Summary:
Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed): 
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.


Reviewed By: bikramSingh91, oerling

Differential Revision: D55054964

Pulled By: xiaoxmeng
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D55054964

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Mar 28, 2024
Summary:
Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed): 
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.


Reviewed By: bikramSingh91, oerling

Differential Revision: D55054964

Pulled By: xiaoxmeng
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D55054964

Summary:
Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed): 
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.


Reviewed By: bikramSingh91, oerling

Differential Revision: D55054964

Pulled By: xiaoxmeng
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D55054964

@facebook-github-bot
Copy link
Contributor

@xiaoxmeng merged this pull request in 2ea66c6.

Copy link

Conbench analyzed the 1 benchmark run on commit 2ea66c62.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

@xiaoxmeng xiaoxmeng deleted the probe-spill branch March 29, 2024 19:38
Real-Chen-Happy pushed a commit to Real-Chen-Happy/velox that referenced this pull request Apr 2, 2024
Summary:
Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed):
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.

Pull Request resolved: facebookincubator#8894

Reviewed By: bikramSingh91, oerling

Differential Revision: D55054964

Pulled By: xiaoxmeng

fbshipit-source-id: 8ad361c2e0e5bf3e88b5b719bcc323e8e7d4f276
Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this pull request Jun 7, 2024
Summary:
Add spilling support at hash probe side to handle the memory arbitration request
after the build operators have built the hash table and is being processed by the
probe side. We leverage the existing spilling facility built in hash join bridge to support
this and the following extensions made to probe side (build side support and join bridge
extension are already landed):
(1) make hash probe operators to wait for the other peers when finish processing the
current probe inputs (either from source or previously spilled input) no matter the join has
more spilled data to process or not. This is to handle the edge case that the spilling is
triggered at some slow probe operators and we need all the probe operators to be present
to handle the split hash table and the rest of steps. This is due to the limitation of the current
allPeersFinished implementation which expects all the drivers to be present in the pipeline to
function;
(1) add reclaim() method to interface with memory arbitration which checks if a probe operator
is spillable: if the table has been set and has data; if we have set input spiller to spill the input
as we don't support recursive input spill (which will never be the case as if build has triggered
spill, it will spill all the partitions for now so the probe side will always have an empty table if it
needs spill the input);
(2) add output spiller to spill the output produced by the current pending input. We parallelize the
output spill with one thread per each probe operator;
(3) if any one of the probe operators has no input to process (it hasn't received the no more input
signal), then we have to spill the built hash table, and we parallelize this by one thread per each
sub-hash table;
(4) free the memory held by the spilled hash table;
(5) setup input spiller for the rest of probe inputs;

Unit tests added to cover different spilling scenarios, and will run join fuzzer with spilling, OOM
injection and query abort injections.

Pull Request resolved: facebookincubator#8894

Reviewed By: bikramSingh91, oerling

Differential Revision: D55054964

Pulled By: xiaoxmeng

fbshipit-source-id: 8ad361c2e0e5bf3e88b5b719bcc323e8e7d4f276
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. fb-exported Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants