-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enforce queuing memory limit on exchange clients in merge exchange #7410
Enforce queuing memory limit on exchange clients in merge exchange #7410
Conversation
✅ Deploy Preview for meta-velox canceled.
|
Adding a unit test is proving to be tricky since we only use localExchangeClients within velox tests that do not allocate any memory. We can potentially implement a new Exchange client only for testing that emulates remote clients by redundantly copying the vectors it receives from local partitioned output. Will explore this option but open to suggestions. |
@bikramSingh91 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
velox/docs/configs.rst
Outdated
* - merge_exchange.max_buffer_size | ||
- integer | ||
- 128MB | ||
- The aggregate buffer size (in Bytes) across all exchange clients generated by the merge exchange operator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bytes -> bytes
typo in PR description:
join -> exchange |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bikramSingh91 nice change. Thanks!
velox/exec/Merge.cpp
Outdated
} else { | ||
noMoreSplits_ = true; | ||
auto maxMergeExchangeBufferSize = operatorCtx_->driverCtx() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const auto maxMergeExchangeBufferSize
velox/exec/Merge.cpp
Outdated
auto maxMergeExchangeBufferSize = operatorCtx_->driverCtx() | ||
->queryConfig() | ||
.maxMergeExchangeBufferSize(); | ||
auto maxQueuedBytesPerSource = std::min<int64_t>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
velox/exec/Merge.cpp
Outdated
maxMergeExchangeBufferSize / remoteSourceTaskIds_.size(), | ||
MergeSource::kMaxQueuedBytesLowerLimit), | ||
MergeSource::kMaxQueuedBytesUpperLimit); | ||
uint32_t currentSourceId = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
for (uint32_t remoteSourceIndex = 0; remoteSourceIndex < remoteSourceTaskIds_.size(); ++remoteSourceIndex) {
auto* pool = operatorCtx_->task()->addMergeSourcePool(..., emoteSourceIndex);
sources_.emplace_back(..., remoteSourceTaskIds_[remoteSourceIndex], ...
}
velox/exec/MergeSource.cpp
Outdated
@@ -120,13 +120,14 @@ class MergeExchangeSource : public MergeSource { | |||
MergeExchange* mergeExchange, | |||
const std::string& taskId, | |||
int destination, | |||
memory::MemoryPool* FOLLY_NONNULL pool) | |||
memory::MemoryPool* FOLLY_NONNULL pool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NYC: can just remove FOLLY_NONNULL? Thanks!
velox/exec/MergeSource.cpp
Outdated
@@ -207,9 +208,10 @@ std::shared_ptr<MergeSource> MergeSource::createMergeExchangeSource( | |||
MergeExchange* mergeExchange, | |||
const std::string& taskId, | |||
int destination, | |||
memory::MemoryPool* pool) { | |||
memory::MemoryPool* pool, | |||
int64_t maxQueuedBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe keep pool the last parameter? thanks!
@@ -23,6 +23,9 @@ class MergeExchange; | |||
|
|||
class MergeSource { | |||
public: | |||
static constexpr int32_t kMaxQueuedBytesUpperLimit = 32 << 20; // 32 MB. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does these need to be public? If not, let's move to private section? thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are used by MergeExchange::addMergeSources() so need to be public. The other alternative is that we keep them private and handle the enforcement of upper and lower limit within MergeSource::createMergeExchangeSource(). This would abstract away the enforcement logic from the caller (MergeExchange) but this means the client loses explicit control. Either way is fine with me since we dont have other generic use cases yet for either alternative.
Please let me know if you want me to change the current implementation either way. Thanks
velox/exec/MergeSource.h
Outdated
@@ -40,7 +43,8 @@ class MergeSource { | |||
MergeExchange* mergeExchange, | |||
const std::string& taskId, | |||
int destination, | |||
memory::MemoryPool* FOLLY_NONNULL pool); | |||
memory::MemoryPool* FOLLY_NONNULL pool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
…acebookincubator#7410) Summary: Currently, when the merge exchange creates ExchangeClients for each source, they are subject to a 32MB queuing limit. However, this can lead to high memory usage if all clients queue around 32MB. For instance, we observed 300 clients consuming over 6GB of memory in one case. To address this issue, we have introduced a new query config "merge_exchange.max_buffer_size" that sets an upper bound on the total memory that can be queued by these clients. This max limit is divided equally among all clients with an upper and lower limit of 32MB and 1MB, respectively, per client. The default for this config is set to 128MB. It's important to note that this limit is enforced approximately, not strictly. Reviewed By: xiaoxmeng, mbasmanova Differential Revision: D50996298 Pulled By: bikramSingh91
cb0a584
to
1658217
Compare
This pull request was exported from Phabricator. Differential Revision: D50996298 |
…acebookincubator#7410) Summary: Currently, when the merge exchange creates ExchangeClients for each source, they are subject to a 32MB queuing limit. However, this can lead to high memory usage if all clients queue around 32MB. For instance, we observed 300 clients consuming over 6GB of memory in one case. To address this issue, we have introduced a new query config "merge_exchange.max_buffer_size" that sets an upper bound on the total memory that can be queued by these clients. This max limit is divided equally among all clients with an upper and lower limit of 32MB and 1MB, respectively, per client. The default for this config is set to 128MB. It's important to note that this limit is enforced approximately, not strictly. Reviewed By: xiaoxmeng, mbasmanova Differential Revision: D50996298 Pulled By: bikramSingh91
1658217
to
310d26b
Compare
This pull request was exported from Phabricator. Differential Revision: D50996298 |
…acebookincubator#7410) Summary: Currently, when the merge exchange creates ExchangeClients for each source, they are subject to a 32MB queuing limit. However, this can lead to high memory usage if all clients queue around 32MB. For instance, we observed 300 clients consuming over 6GB of memory in one case. To address this issue, we have introduced a new query config "merge_exchange.max_buffer_size" that sets an upper bound on the total memory that can be queued by these clients. This max limit is divided equally among all clients with an upper and lower limit of 32MB and 1MB, respectively, per client. The default for this config is set to 128MB. It's important to note that this limit is enforced approximately, not strictly. Reviewed By: xiaoxmeng, mbasmanova Differential Revision: D50996298 Pulled By: bikramSingh91
310d26b
to
7a8622b
Compare
This pull request was exported from Phabricator. Differential Revision: D50996298 |
@bikramSingh91 merged this pull request in 9bb4de4. |
Conbench analyzed the 1 benchmark run on commit There were no benchmark performance regressions. 🎉 The full Conbench report has more details. |
Currently, when the merge join creates ExchangeClients for each
source, they are subject to a 32MB queuing limit. However, this can
lead to high memory usage if all clients queue around 32MB. For
instance, we observed 300 clients consuming over 6GB of memory in one
case. To address this issue, we have introduced a new query config
"merge_exchange.max_buffer_size" that sets an upper bound on the total
memory that can be queued by these clients. This max limit is divided
equally among all clients with an upper and lower limit of 32MB and
1MB, respectively, per client. The default for this config is set to
128MB. It's important to note that this limit is enforced
approximately, not strictly.