Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a couple of bugs in exclusive producer #9554

Merged
merged 4 commits into from
Feb 11, 2021

Conversation

jerrypeng
Copy link
Contributor

@jerrypeng jerrypeng commented Feb 10, 2021

Motivation

When multiple producers contend to become the exclusive producer. A race condition can occur and the following exception can happen but gets silently swallowed which eventually leads to a client timeout.

21:44:46.038 [ForkJoinPool.commonPool-worker-2] ERROR org.apache.pulsar.broker.service.AbstractTopic - got error: java.util.NoSuchElementException
java.util.NoSuchElementException: null
	at java.util.concurrent.ConcurrentHashMap$KeyIterator.next(ConcurrentHashMap.java:3416) ~[?:1.8.0_231]
	at java.util.concurrent.ConcurrentHashMap$KeyIterator.nextElement(ConcurrentHashMap.java:3423) ~[?:1.8.0_231]
	at org.apache.pulsar.broker.service.AbstractTopic.incrementTopicEpochIfNeeded(AbstractTopic.java:399) ~[pulsar-broker.jar:2.8.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.AbstractTopic.addProducer(AbstractTopic.java:346) ~[pulsar-broker.jar:2.8.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.addProducer(PersistentTopic.java:509) ~[pulsar-broker.jar:2.8.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.ServerCnx.lambda$null$22(ServerCnx.java:1183) ~[pulsar-broker.jar:2.8.0-SNAPSHOT]
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_231]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_231]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_231]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_231]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$null$6(BookkeeperSchemaStorage.java:230) ~[pulsar-broker.jar:2.8.0-SNAPSHOT]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) [?:1.8.0_231]
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) [?:1.8.0_231]
	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443) [?:1.8.0_231]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_231]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_231]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_231]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_231]

Also USAGE_COUNT_UPDATER is used incorrectly in handleProducerRemoved()

@jerrypeng jerrypeng added type/bug The PR fixed a bug or issue reported a bug area/broker labels Feb 10, 2021
@jerrypeng jerrypeng added this to the 2.8.0 milestone Feb 10, 2021
@jerrypeng jerrypeng self-assigned this Feb 10, 2021
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("Topic has an existing exclusive producer");
if (producers.keys().hasMoreElements()) {
errorMsg.append(": " + producers.keys().nextElement());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this avoids the race altogether since there's no atomicity between checking hasMoreElements() and nextElement(). I think we could get an iterator and use that to check, since it will be guaranteed to be consistent.

Comment on lines 387 to 389
if (producers.keys().hasMoreElements()) {
errorMsg.append(": " + producers.keys().nextElement());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (producers.keys().hasMoreElements()) {
errorMsg.append(": " + producers.keys().nextElement());
}
producers.keySet().stream().findFirst().map(p -> errorMsg.append(": " ).append(p));

Comment on lines 400 to 402
if (producers.keys().hasMoreElements()) {
errorMsg.append(": " + producers.keys().nextElement());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (producers.keys().hasMoreElements()) {
errorMsg.append(": " + producers.keys().nextElement());
}
producers.keySet().stream().findFirst().map(p -> errorMsg.append(": " ).append(p));

1. Fix uncaught exception in exclusive producer
2. Incorrect logic when producer is removed
@jerrypeng jerrypeng changed the title Fix uncaught exception in exclusive producer creation Fix a couple of bugs in exclusive producer Feb 11, 2021
@@ -109,6 +109,9 @@

protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
// pointer to the exclusive producer
private volatile Producer exclusiveProducer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use a ref to the name instead? Otherwise it will provide a much longer toString() representation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. Just wanted to preserve the existing logging behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing one was printing the key of the map, hence just the producer name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

return FutureUtil.failedFuture(new ProducerBusyException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
return FutureUtil.failedFuture(
new ProducerFencedException("Topic has an existing exclusive producer: " + exclusiveProducer));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shared producer will not get fenced, instead it will get a busy error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

@@ -619,17 +629,21 @@ public void removeProducer(Producer producer) {

protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
long newCount = USAGE_COUNT_UPDATER.decrementAndGet(this);
if (newCount == 0) {
USAGE_COUNT_UPDATER.decrementAndGet(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we add an issue withe consumers here, can you add a test with consumer, to make sure we don’t break it in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@jerrypeng jerrypeng merged commit d9c5b9b into apache:master Feb 11, 2021
merlimat pushed a commit to merlimat/pulsar that referenced this pull request Apr 6, 2021
* Fix a couple of bugs in exclusive producer
1. Fix uncaught exception in exclusive producer
2. Incorrect logic when producer is removed

Co-authored-by: Jerry Peng <jerryp@splunk.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants