-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-68: WaitForExclusive producer access mode #8992
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work.
I left a question and a super minor comment t for a typo
// If the producer is queued waiting, we will get an immediate notification | ||
// that we need to pass to client | ||
if (isActive()) { | ||
log.info("[{}] Producer is waiting in qeuue: {}", remoteAddress, producer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo qeuue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
if (requestFuture != null) { | ||
log.info("{} Producer {} has been queued up at broker. request: {}", ctx.channel(), | ||
success.getProducerName(), requestId); | ||
requestFuture.markAsResponded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the PulsarClient gets closed or there is a network error?
Are we guaranteed to fail and do not wait forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the marking of the future is only for timeout within a single connection. When a connection fails, everything that was in that connections is marked as failed and will trigger a retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
@merlimat thanks for your great work. Would you like to add docs accordingly? Then I can help review, thanks |
checkArgument(producer.getTopic() == this); | ||
|
||
CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); | ||
|
||
incrementTopicEpochIfNeeded(producer) | ||
.thenAccept(epoch -> { | ||
lock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Motivation
Implemented the 2nd part of the proposal for PIP-68.
With
WaitForExclusive
mode, a producer is pending until there are other producers connected and then it will be created.The change in the client logic is to make sure that we don't time out the producer creation request. The broker will send a 1st response saying that the producer creation is pending. At this point the client will disable the timeout on the original request.