Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix potential deadlock in pulsar client close #5731

Merged
merged 1 commit into from Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -282,8 +282,13 @@ private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int p

@Override
public void close() throws IOException {
try {
eventLoopGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).await();
} catch (InterruptedException e) {
log.warn("EventLoopGroup shutdown was interrupted", e);
}

dnsResolver.close();
eventLoopGroup.shutdown();
}

private void cleanupConnection(InetSocketAddress address, int connectionKey,
Expand Down
Expand Up @@ -554,7 +554,11 @@ public CompletableFuture<Void> closeAsync() {
consumersToClose.forEach(c -> futures.add(c.closeAsync()));
}

FutureUtil.waitForAll(futures).thenRun(() -> {
// Need to run the shutdown sequence in a separate thread to prevent deadlocks
// If there are consumers or producers that need to be shutdown we cannot use the same thread
// to shutdown the EventLoopGroup as well as that would be trying to shutdown itself thus a deadlock
// would happen
FutureUtil.waitForAll(futures).thenRun(() -> new Thread(() -> {
// All producers & consumers are now closed, we can stop the client safely
try {
shutdown();
Expand All @@ -563,7 +567,7 @@ public CompletableFuture<Void> closeAsync() {
} catch (PulsarClientException e) {
closeFuture.completeExceptionally(e);
}
}).exceptionally(exception -> {
}, "pulsar-client-shutdown-thread").start()).exceptionally(exception -> {
closeFuture.completeExceptionally(exception);
return null;
});
Expand Down