Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 11496][C++] Allow partitioned producers to start lazily #11570

Merged
merged 6 commits into from
Aug 16, 2021

Conversation

Vanlightly
Copy link
Contributor

Fixes #11496 also matches part of PIP 79.

C++ implementation that closely matches the proposed Java client changes from reducing partitioned producer connections and lookups: PR 10279

Motivation

Producers that send messages to partitioned topics start a producer per partition, even when using single partition routing. For topics that have the combination of a large number of producers and a large number of partitions, this can put strain on the brokers. With say 1000 partitions and single partition routing with non-keyed messages, 999 topic owner lookups and producer registrations are performed that could be avoided.

PIP 79 also describes this. I wrote this before realising that PIP 79 also covers this. This implementation can be reviewed and contrasted to the Java client implementation in #10279.

Modifications

Allows partitioned producers to start producers for individual partitions lazily. Starting a producer involves a topic owner
lookup to find out which broker is the owner of the partition, then registering the producer for that partition with the owner
broker. For topics with many partitions and when using SinglePartition routing without keyed messages, all of these
lookups and producer registrations are a waste except for the single chosen partition.

This change allows the user to control whether a producer on a partitioned topic uses this lazy start or not, via a new config
in ProducerConfiguration. When ProducerConfiguration.setLazyStartPartitionedProducers(true) is set, the PartitionedProducerImpl.start() becomes a synchronous operation that only does housekeeping (no network operations).
The producer of any given partition is started (which includes a topic owner lookup and registration) upon sending the first message to that partition. While the producer starts, messages are buffered.

The sendTimeout timer is only activated once a producer has been fully started, which should give enough time for any buffered messages to be sent. For very short send timeouts, this setting could cause send timeouts during the start phase. The default of 30s should however not cause this issue.

Verifying this change

This change added tests and can be verified as follows:

  • BasicEndToEndTest, testPartitionedProducerConsumer
  • BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopicLazyProducers
  • BasicEndToEndTest, testFlushInLazyPartitionedProducer

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes) - client configuration
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

For contributor

For this PR, do we need to update docs?

Yes, the new client config would need documenting. Can contribute that if this PR is accepted.

For committer

For this PR, do we need to update docs?

  • If yes,

    • if you update docs in this PR, label this PR with the doc label.

    • if you plan to update docs later, label this PR with the doc-required label.

    • if you need help on updating docs, create a follow-up issue with the doc-required label.

  • If no, label this PR with the no-need-doc label and explain why.

pulsar-client-cpp/lib/PartitionedProducerImpl.cc Outdated Show resolved Hide resolved
pulsar-client-cpp/lib/PartitionedProducerImpl.cc Outdated Show resolved Hide resolved
pulsar-client-cpp/lib/PartitionedProducerImpl.cc Outdated Show resolved Hide resolved
@@ -379,8 +418,8 @@ bool PartitionedProducerImpl::isConnected() const {
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
for (const auto& producer : producers_) {
if (!producer->isConnected()) {
for (const auto& producer : producers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the producer has been created, but no messages have been sent, this will return true, even though there has been no connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The question is what people do with this method. Is it an indicator of a problem? Without lazy producers, false means we were once connected but now aren't and so can be considered a transient problem. But with lazy producers, false may mean we've not even tried, so false does not mean anything. So I decided to return true to signal absence of a problem.

Definitely interested to hear opinions on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I have no strong opinion here. "connected" is a weird concept in a distributed system like this.

@Anonymitaet Anonymitaet added the doc-required Your PR changes impact docs and you will update later. label Aug 5, 2021
pulsar-client-cpp/lib/PartitionedProducerImpl.cc Outdated Show resolved Hide resolved
@@ -379,8 +418,8 @@ bool PartitionedProducerImpl::isConnected() const {
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
for (const auto& producer : producers_) {
if (!producer->isConnected()) {
for (const auto& producer : producers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I have no strong opinion here. "connected" is a weird concept in a distributed system like this.

pulsar-client-cpp/lib/ProducerImpl.cc Outdated Show resolved Hide resolved
@Anonymitaet
Copy link
Member

@Vanlightly Thanks for providing doc-related info! Looking forward to your docs.

@BewareMyPower
Copy link
Contributor

Could you rebase to master since C++/Python tests have been fixed?

@Vanlightly
Copy link
Contributor Author

Vanlightly commented Aug 9, 2021

I realised that I had not included the extra config in the __init__.py so will add that to this PR.

@Anonymitaet Beyond the doxygen and pdoc generated docs, is it worth documenting this change further. If so, where do you think?

@Anonymitaet
Copy link
Member

@Vanlightly how about adding your changes here?

Avoids performing lookups and producer registration on all partitions of a partitioned
topic when using SinglePartition routing without keyed messages
Lazy producers connect on demand on their first message.
The send timeout timer must be started on the first
message as there is no guarantee the connection will
complete
@Vanlightly
Copy link
Contributor Author

@Anonymitaet I have added to the documentation a section about lazy producers and also added examples of both synchronous and asynchronous producers and consumers.

Copy link
Member

@Anonymitaet Anonymitaet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for your great contribution! I’ve left some comments, PTAL.

@Anonymitaet Anonymitaet added doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. and removed doc-required Your PR changes impact docs and you will update later. labels Aug 11, 2021
Ensure that the state is checked always in
PartitionedProducer::sendAsync to avoid buffering of
messages after closeAsync called.

Removed sequential locking in ProducerImpl::
handleCreateProducer that allowed for state to go
back to Ready after closeAsync called.
@equanz
Copy link
Contributor

equanz commented Aug 12, 2021

Thank you for your comments about PIP-79.
I have a few comments. I think these comments don't block merging.

For authn/authz backward compatibility reason, I think the internal producer should be connected to broker before completing the creation of partitioned producer( https://mail-archives.apache.org/mod_mbox/pulsar-dev/202102.mbox/%3CCAO2yDybn3sqPJV32YqvYndk%3D8mxNKodcGB4GE3QmUs8F9m8YUw%40mail.gmail.com%3E ). However, it isn't critical because the change affects when ProducerConfiguration.setLazyStartPartitionedProducers(true) is set.

As mentioned in here, partitioned producer stats will be incorrect if each partition has different number of producers. I tried to fix this issue in the PIP-79 (It will be fixed in #10534 at Java client).

@ivankelly
Copy link
Contributor

For authn/authz backward compatibility reason, I think the internal producer should be connected to broker before completing the creation of partitioned producer( https://mail-archives.apache.org/mod_mbox/pulsar-dev/202102.mbox/%3CCAO2yDybn3sqPJV32YqvYndk%3D8mxNKodcGB4GE3QmUs8F9m8YUw%40mail.gmail.com%3E ). However, it isn't critical because the change affects when ProducerConfiguration.setLazyStartPartitionedProducers(true) is set.

This can be worked around by making the producer connection to any one random partition. If we could make this the first partition selected in roundrobin, even better.

In order to ensure than authz errors are returned
during producer creation, start one producer
eagerly when using lazy partitioned producers.
When UseSinglePartition mode is used, the
eagerly created producer will be the only producer
created (when non-keyed messages are sent).
In the case that an internal producer is created
lazily and it fails, then fail any pending
requests immediately.
@Vanlightly
Copy link
Contributor Author

@equanz @ivankelly I have changed it so that one producer is started eagerly, chosen by the routing policy and any other internal producers are started lazily. This will ensure that authz errors occur during createProducer and we still get the desired behaviour of fewer lookups and connections when using single partition routing.

@sijie sijie added this to the 2.9.0 milestone Aug 13, 2021
@codelipenghui codelipenghui merged commit 9577b84 into apache:master Aug 16, 2021
@BewareMyPower BewareMyPower added component/c++ type/PIP type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature release/2.8.2 labels Sep 10, 2021
@BewareMyPower
Copy link
Contributor

@codelipenghui I added the release/2.8.2 label first.

Without this PR, the test introduced from #11955 would block forever. It's because for a partitioned producer, calling send() after shutdown() or close() would block forever. I didn't find why this PR works, but this PR should fix it and has been tested in my local env.

The root cause is when a partitioned producer calls shutdown() in client shutdown, it only sets the state and the internal producers are not affected. We can also solve this problem by shutdown all internal producers when a partitioned producer calls shutdown.

codelipenghui pushed a commit that referenced this pull request Sep 10, 2021
Fixes #11496 also matches part of PIP 79.

C++ implementation that closely matches the proposed Java client changes from reducing partitioned producer connections and lookups: PR 10279

### Motivation

Producers that send messages to partitioned topics start a producer per partition, even when using single partition routing. For topics that have the combination of a large number of producers and a large number of partitions, this can put strain on the brokers. With say 1000 partitions and single partition routing with non-keyed messages, 999 topic owner lookups and producer registrations are performed that could be avoided.

PIP 79 also describes this. I wrote this before realising that PIP 79 also covers this. This implementation can be reviewed and contrasted to the Java client implementation in #10279.

### Modifications

Allows partitioned producers to start producers for individual partitions lazily. Starting a producer involves a topic owner
lookup to find out which broker is the owner of the partition, then registering the producer for that partition with the owner
broker. For topics with many partitions and when using SinglePartition routing without keyed messages, all of these
lookups and producer registrations are a waste except for the single chosen partition.

This change allows the user to control whether a producer on a partitioned topic uses this lazy start or not, via a new config
in ProducerConfiguration. When ProducerConfiguration.setLazyStartPartitionedProducers(true) is set, the PartitionedProducerImpl.start() becomes a synchronous operation that only does housekeeping (no network operations).
The producer of any given partition is started (which includes a topic owner lookup and registration) upon sending the first message to that partition. While the producer starts, messages are buffered.

The sendTimeout timer is only activated once a producer has been fully started, which should give enough time for any buffered messages to be sent. For very short send timeouts, this setting could cause send timeouts during the start phase. The default of 30s should however not cause this issue.

(cherry picked from commit 9577b84)
@codelipenghui codelipenghui added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Sep 18, 2021
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…e#11570)

Fixes apache#11496 also matches part of PIP 79.

C++ implementation that closely matches the proposed Java client changes from reducing partitioned producer connections and lookups: PR 10279

### Motivation

Producers that send messages to partitioned topics start a producer per partition, even when using single partition routing. For topics that have the combination of a large number of producers and a large number of partitions, this can put strain on the brokers. With say 1000 partitions and single partition routing with non-keyed messages, 999 topic owner lookups and producer registrations are performed that could be avoided.

PIP 79 also describes this. I wrote this before realising that PIP 79 also covers this. This implementation can be reviewed and contrasted to the Java client implementation in apache#10279.

### Modifications

Allows partitioned producers to start producers for individual partitions lazily. Starting a producer involves a topic owner
lookup to find out which broker is the owner of the partition, then registering the producer for that partition with the owner
broker. For topics with many partitions and when using SinglePartition routing without keyed messages, all of these
lookups and producer registrations are a waste except for the single chosen partition.

This change allows the user to control whether a producer on a partitioned topic uses this lazy start or not, via a new config
in ProducerConfiguration. When ProducerConfiguration.setLazyStartPartitionedProducers(true) is set, the PartitionedProducerImpl.start() becomes a synchronous operation that only does housekeeping (no network operations).
The producer of any given partition is started (which includes a topic owner lookup and registration) upon sending the first message to that partition. While the producer starts, messages are buffered.

The sendTimeout timer is only activated once a producer has been fully started, which should give enough time for any buffered messages to be sent. For very short send timeouts, this setting could cause send timeouts during the start phase. The default of 30s should however not cause this issue.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Apr 22, 2022
Fixes apache#13849
Fixes apache#14848

### Motivation

apache#11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
the case that some `sendAsync` calls that are invoked after `closeAsync`
is called in another thread must complete with `ResultAlreadyClosed`.
It's flaky because the synchronization between two threads is not
strict. This test uses `sendStartLatch` for the order of `sendAsync` and
`closeAsync`:

```
sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
```

However, it cannot guarantee the rest `sendAsync` calls happen after
`closeAsync` is called. If so, all `sendAsync` calls will complete with
`ResultOk`.

On the other hand, this test is meaningless because it requires strict
synchronization between two threads so there is no need to run
`sendAsync` and `closeAsync` in two threads.

The verification of this test is also wrong, see
apache#13849 (comment).
When `closeAsync` is called, the previous `sendAsync` calls might not
complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
not only those called after `closeAsync`.

In addition, this PR also tries to fix the flaky `testReferenceCount`,
which assumes too strictly.

### Modifications

- Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
- Only check the reference count is greater than 0 instead of equal to 1
BewareMyPower added a commit that referenced this pull request Apr 22, 2022
Fixes #13849
Fixes #14848

### Motivation

#11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
the case that some `sendAsync` calls that are invoked after `closeAsync`
is called in another thread must complete with `ResultAlreadyClosed`.
It's flaky because the synchronization between two threads is not
strict. This test uses `sendStartLatch` for the order of `sendAsync` and
`closeAsync`:

```
sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
```

However, it cannot guarantee the rest `sendAsync` calls happen after
`closeAsync` is called. If so, all `sendAsync` calls will complete with
`ResultOk`.

On the other hand, this test is meaningless because it requires strict
synchronization between two threads so there is no need to run
`sendAsync` and `closeAsync` in two threads.

The verification of this test is also wrong, see
#13849 (comment).
When `closeAsync` is called, the previous `sendAsync` calls might not
complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
not only those called after `closeAsync`.

In addition, this PR also tries to fix the flaky `testReferenceCount`,
which assumes too strictly.

### Modifications

- Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
- Only check the reference count is greater than 0 instead of equal to 1
codelipenghui pushed a commit that referenced this pull request Apr 28, 2022
Fixes #13849
Fixes #14848

### Motivation

#11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
the case that some `sendAsync` calls that are invoked after `closeAsync`
is called in another thread must complete with `ResultAlreadyClosed`.
It's flaky because the synchronization between two threads is not
strict. This test uses `sendStartLatch` for the order of `sendAsync` and
`closeAsync`:

```
sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
```

However, it cannot guarantee the rest `sendAsync` calls happen after
`closeAsync` is called. If so, all `sendAsync` calls will complete with
`ResultOk`.

On the other hand, this test is meaningless because it requires strict
synchronization between two threads so there is no need to run
`sendAsync` and `closeAsync` in two threads.

The verification of this test is also wrong, see
#13849 (comment).
When `closeAsync` is called, the previous `sendAsync` calls might not
complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
not only those called after `closeAsync`.

In addition, this PR also tries to fix the flaky `testReferenceCount`,
which assumes too strictly.

### Modifications

- Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
- Only check the reference count is greater than 0 instead of equal to 1

(cherry picked from commit eeea9ca)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request May 9, 2022
Fixes apache#13849
Fixes apache#14848

### Motivation

apache#11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
the case that some `sendAsync` calls that are invoked after `closeAsync`
is called in another thread must complete with `ResultAlreadyClosed`.
It's flaky because the synchronization between two threads is not
strict. This test uses `sendStartLatch` for the order of `sendAsync` and
`closeAsync`:

```
sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
```

However, it cannot guarantee the rest `sendAsync` calls happen after
`closeAsync` is called. If so, all `sendAsync` calls will complete with
`ResultOk`.

On the other hand, this test is meaningless because it requires strict
synchronization between two threads so there is no need to run
`sendAsync` and `closeAsync` in two threads.

The verification of this test is also wrong, see
apache#13849 (comment).
When `closeAsync` is called, the previous `sendAsync` calls might not
complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
not only those called after `closeAsync`.

In addition, this PR also tries to fix the flaky `testReferenceCount`,
which assumes too strictly.

### Modifications

- Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
- Only check the reference count is greater than 0 instead of equal to 1

(cherry picked from commit eeea9ca)
(cherry picked from commit 83b6833)
BewareMyPower added a commit that referenced this pull request May 24, 2022
Fixes #13849
Fixes #14848

### Motivation

#11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
the case that some `sendAsync` calls that are invoked after `closeAsync`
is called in another thread must complete with `ResultAlreadyClosed`.
It's flaky because the synchronization between two threads is not
strict. This test uses `sendStartLatch` for the order of `sendAsync` and
`closeAsync`:

```
sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
```

However, it cannot guarantee the rest `sendAsync` calls happen after
`closeAsync` is called. If so, all `sendAsync` calls will complete with
`ResultOk`.

On the other hand, this test is meaningless because it requires strict
synchronization between two threads so there is no need to run
`sendAsync` and `closeAsync` in two threads.

The verification of this test is also wrong, see
#13849 (comment).
When `closeAsync` is called, the previous `sendAsync` calls might not
complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
not only those called after `closeAsync`.

In addition, this PR also tries to fix the flaky `testReferenceCount`,
which assumes too strictly.

### Modifications

- Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
- Only check the reference count is greater than 0 instead of equal to 1

(cherry picked from commit eeea9ca)
BewareMyPower added a commit that referenced this pull request Jul 27, 2022
Fixes #13849
Fixes #14848

### Motivation

#11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
the case that some `sendAsync` calls that are invoked after `closeAsync`
is called in another thread must complete with `ResultAlreadyClosed`.
It's flaky because the synchronization between two threads is not
strict. This test uses `sendStartLatch` for the order of `sendAsync` and
`closeAsync`:

```
sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
```

However, it cannot guarantee the rest `sendAsync` calls happen after
`closeAsync` is called. If so, all `sendAsync` calls will complete with
`ResultOk`.

On the other hand, this test is meaningless because it requires strict
synchronization between two threads so there is no need to run
`sendAsync` and `closeAsync` in two threads.

The verification of this test is also wrong, see
#13849 (comment).
When `closeAsync` is called, the previous `sendAsync` calls might not
complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
not only those called after `closeAsync`.

In addition, this PR also tries to fix the flaky `testReferenceCount`,
which assumes too strictly.

### Modifications

- Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
- Only check the reference count is greater than 0 instead of equal to 1

(cherry picked from commit eeea9ca)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.8 Archived: 2.8 is end of life doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. release/2.8.2 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature type/PIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce producer lookups and connections in partitioned producers
7 participants