Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-74: Implemented memory limit in C++ producer #9676

Merged
merged 2 commits into from
Mar 8, 2021

Conversation

merlimat
Copy link
Contributor

Motivation

Similar to what already implemented in Java client, adding a client-wide memory limit for messages produced on all the C++ producers.

Modifications

Refactored the pendingMessagesQueue handling code to be similar to the Java implementation, using a "Deque" and a semaphore, rather that having all the logic collapsed into the queue implementation.

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Feb 23, 2021
@merlimat merlimat added this to the 2.8.0 milestone Feb 23, 2021
@merlimat merlimat self-assigned this Feb 23, 2021
pulsar-client-cpp/lib/Semaphore.cc Outdated Show resolved Hide resolved
@BewareMyPower
Copy link
Contributor

Did the apache/pulsar-build image change recently? The cpp CI failure was caused by

-- Found the following Boost libraries:
--   system
-- Found PythonLibs: /usr/lib/x86_64-linux-gnu/libpython3.5m.so (found version "3.5.2") 
-- PYTHON: 3.5.2
-- DETECTED Python 3
-- Boost version: 1.58.0

and

CMake Error at python/CMakeLists.txt:84 (MESSAGE):
  Could not find Boost Python library

because the test image apache/pulsar-build doesn't install the Boost.Python for Python3.

I see other tests detect Python2 but not Python3.

@sijie
Copy link
Member

sijie commented Mar 8, 2021

@BewareMyPower Can you review it again?

1 similar comment
@sijie
Copy link
Member

sijie commented Mar 8, 2021

@BewareMyPower Can you review it again?

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sijie sijie merged commit e7ba260 into apache:master Mar 8, 2021
@erobot
Copy link
Contributor

erobot commented Mar 22, 2021

std::shared_ptr<ProducerImpl::PendingCallbacks> ProducerImpl::getPendingCallbacksWhenFailed() {
auto callbacks = std::make_shared<PendingCallbacks>();
callbacks->opSendMsgs.reserve(pendingMessagesQueue_.size());
LOG_DEBUG(getName() << "# messages in pending queue : " << pendingMessagesQueue_.size());
// Iterate over a copy of the pending messages queue, to trigger the future completion
// without holding producer mutex.
for (auto& op : pendingMessagesQueue_) {
callbacks->opSendMsgs.push_back(op);
}
if (batchMessageContainer_) {
OpSendMsg opSendMsg;
if (batchMessageContainer_->createOpSendMsg(opSendMsg) == ResultOk) {
callbacks->opSendMsgs.emplace_back(opSendMsg);
}
batchMessageContainer_->clear();
}
pendingMessagesQueue_.clear();
return callbacks;
}

@merlimat getPendingCallbacksWhenFailed is called when message timeouts. This function clears pendingMessagesQueue_ and batchMessageContainer_, so I think releaseSemaphore should be called here as well.

@merlimat
Copy link
Contributor Author

merlimat commented Apr 6, 2021

@erobot That's completely correct. It was a regression introduced here. I've created PR to fix it: #10144

merlimat added a commit to merlimat/pulsar that referenced this pull request Apr 6, 2021
### Motivation

Similar to what already implemented in Java client, adding a client-wide memory limit for messages produced on all the C++ producers.

### Modifications

Refactored the `pendingMessagesQueue` handling code to be similar to the Java implementation, using a "Deque" and a semaphore, rather that having all the logic collapsed into the queue implementation.
@Anonymitaet
Copy link
Member

@merlimat thanks for your contribution!

I see you've added instructions of setMemoryLimit to C++ API doc (ClientConfiguration.h), do we need to add the instructions here? Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants