-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP 68: Exclusive Producer #8685
Conversation
0ceb670
to
e6f7c8e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.. few minor comments.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) { | ||
lock.writeLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid locking for normal producer usecase?
if (producers.isEmpty() && producer.getAccessMode() == Shared) {
return CompletableFuture.completedFuture(topicEpoch);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to @rdhabalia suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that we can avoid the locking, and I'm not sure that it's something to worry about in the context of adding a producer.
Actually, in the specific case there's still a race condition between updating the producers
map, since the update to hasExclusiveProducer
and the insertion into producers
map should be atomic as well.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
Show resolved
Hide resolved
return future; | ||
} | ||
|
||
// case WaitForExclusive: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one suggestion. I am sure you must have thought about it but can't we rename WaitForExclusive
with FailOver
as it will be consistent to subscription type name and it will need no explanation and easy to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem I see is that it's not exactly the same thing and that can confuse people:
- The semantic of creating a
WaitForExclusive
is different because thenewProducer()....create()
call is hanging until that particular producer is selected, unlike in consumers where it's created immediately but it will not receive messages. - The use case is mostly different from "failover" for reliability purpose, since you can use
WaitForExclusive
to do leader election
checkArgument(producer.getTopic() == this); | ||
|
||
if (producers.remove(producer.getProducerName(), producer)) { | ||
handleProducerRemoved(producer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we need lock here else it may create a race condition and producer with WaitForExclusive
may wait forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No lock needed there so far. Other changes are needed for WaitForExclusive
, it's not implemented in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
I left a few little suggestions
} | ||
|
||
protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) { | ||
lock.writeLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to @rdhabalia suggestion
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Outdated
Show resolved
Hide resolved
6ef1fc1
to
c2d57bc
Compare
Rebased and addressed comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Great work
One question, with an Exclusive Producer, what happens if I delete and recreate a topic?
- create a topic
- start ex producer and write
- delete/create the same topic
- is the producer still valid?
I expect the ex producer to be fenced out.
Is this the case?
@@ -86,5 +88,11 @@ void runMessageExpiryCheck() { | |||
} | |||
} | |||
|
|||
private static final Random random = new Random(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about setting a fixed seed? In order to have reproducible tests execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to get random topic names. We don't really need it to be reproducible.
Deleting the topic will delete all the state associated with the topic. If the topic is re-created, it's effectively a different topic. All the state associated with the topic is gone: subscriptions, dedup state, fencing. The topic epoch will be reset after the deletion, so it should allow the producer. I will add specific test for this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great. Thank you
f6aecc3
to
eba7bbe
Compare
* PIP 68: Exclusive Producer * Added missing enums cases in C++ * Addressed comments * Moved constant to top of file * Fix mistake in previous update * Added handling for topic deletion
Motivation
Implementation of https://github.com/apache/pulsar/wiki/PIP-68%3A-Exclusive-Producer
This is the first part of the implementation of exclusive producer. It adds support for
ProducerAccessMode.Exclusive
.The 2nd mode,
ProducerAccessMode.WaitForExclusive
, will be added in separate PR.