Skip to content

Protocol Handlers: close() should be an async operation #19579

@eolivelli

Description

@eolivelli

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

When the broker is shutting down there are many subcomponents to shutdown, in particular the Protocol Handlers.
Most of the shutdown (close) procedure of the Broker is async but we still have a blocking operation when closing the Protocol Handlers.
Protocol Handlers, like KOP/Starlight for Kafka, often start threadpools and Pulsar Clients, and it may happen that in order to try a graceful shutdown the PH waits for some resources to be disposed, but such disposal may be deferred for long time (because the broker is also shuttting down and some resources are no more available, leading to errors and backoffs).

The shutdown procedure of the Broker should be as quick as possible in order to prevent latency spikes and other unwanted consequences due to having a partially working broker.

This is an example of a test failed due to time out on Starlight for Kafka.


Error:  testConnectListenerNotExist(io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest)  Time elapsed: 20.037 s  <<< FAILURE!
org.testng.internal.thread.ThreadTimeoutException: Method io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest.testConnectListenerNotExist() didn't finish within the time-out 20000
	at java.base@11.0.18/java.lang.StackStreamFactory$AbstractStackWalker.callStackWalk(Native Method)
	at java.base@11.0.18/java.lang.StackStreamFactory$AbstractStackWalker.beginStackWalk(StackStreamFactory.java:370)
	at java.base@11.0.18/java.lang.StackStreamFactory$AbstractStackWalker.walk(StackStreamFactory.java:243)
	at java.base@11.0.18/java.lang.StackWalker.walk(StackWalker.java:498)
	at app//org.apache.logging.log4j.util.StackLocator.calcLocation(StackLocator.java:96)
	at app//org.apache.logging.log4j.util.StackLocatorUtil.calcLocation(StackLocatorUtil.java:99)
	at app//org.apache.logging.log4j.spi.AbstractLogger.getLocation(AbstractLogger.java:2216)
	at app//org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2159)
	at app//org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2142)
	at app//org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2040)
	at app//org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1907)
	at app//org.apache.logging.slf4j.Log4jLogger.warn(Log4jLogger.java:249)
	at app//io.streamnative.pulsar.handlers.kop.AbstractPulsarClient.close(AbstractPulsarClient.java:49)
	at app//io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.close(KafkaProtocolHandler.java:578)
	at app//org.apache.pulsar.broker.protocol.ProtocolHandlerWithClassLoader.close(ProtocolHandlerWithClassLoader.java:90)
	at app//org.apache.pulsar.broker.protocol.ProtocolHandlers$$Lambda$1438/0x0000000100b38040.accept(Unknown Source)
	at java.base@11.0.18/java.lang.Iterable.forEach(Iterable.java:75)
	at app//org.apache.pulsar.broker.protocol.ProtocolHandlers.close(ProtocolHandlers.java:154)
	at app//org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:458)
	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.closeAsync$accessor$pCM5WTps(Unknown Source)
	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588$auxiliary$4dJQHnOb.call(Unknown Source)
	at app//org.mockito.internal.invocation.RealMethod$FromCallable$1.call(RealMethod.java:40)
	at app//org.mockito.internal.invocation.RealMethod$FromBehavior.invoke(RealMethod.java:62)
	at app//org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:127)
	at app//org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:43)
	at app//org.mockito.Answers.answer(Answers.java:100)
	at app//org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
	at app//org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	at app//org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:35)
	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:63)
	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:49)
	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptSuperCallable(MockMethodInterceptor.java:110)
	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.closeAsync(Unknown Source)
	at app//org.apache.pulsar.broker.PulsarService.close(PulsarService.java:380)
	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.close$accessor$pCM5WTps(Unknown Source)
	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588$auxiliary$Lvw9o3WA.call(Unknown Source)
	at app//org.mockito.internal.invocation.RealMethod$FromCallable$1.call(RealMethod.java:40)
	at app//org.mockito.internal.invocation.RealMethod$FromBehavior.invoke(RealMethod.java:62)
	at app//org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:127)
	at app//org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:43)
	at app//org.mockito.Answers.answer(Answers.java:100)
	at app//org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
	at app//org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	at app//org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:35)
	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:63)
	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:49)
	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptSuperCallable(MockMethodInterceptor.java:110)
	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.close(Unknown Source)
	at app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.stopBroker(KopProtocolHandlerTestBase.java:411)
	at app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.stopBroker(KopProtocolHandlerTestBase.java:415)
	at app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.internalCleanup(KopProtocolHandlerTestBase.java:[379](https://github.com/datastax/starlight-for-kafka/actions/runs/4225957299/jobs/7338864110#step:7:380))
	at app//io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest.testConnectListenerNotExist(KafkaListenerNameTest.java:211)
	at java.base@11.0.18/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base@11.0.18/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base@11.0.18/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base@11.0.18/java.lang.reflect.Method.invoke(Method.java:566)
	at app//org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
	at app//org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
	at app//org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
	at java.base@11.0.18/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base@11.0.18/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base@11.0.18/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base@11.0.18/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base@11.0.18/java.lang.Thread.run(Thread.java:829)

Solution

The ProtocolHandler API should provide a closeAsync() method instead of a blocking close().

We could make the API backward compatible by leveraging Java default methods:

public default CompletableFuture<?> closeAsync() {
         CompletableFuture<?> result = new CompletableFuture<>();
         try {
            this.close();
            result.complete(null);
         } catch (Throwable t) {
              // TODO handle InterruptedException here
              result.completeExceptionally(t);
         }
 }

Alternatives

Remove the close() API at all and break compatibility.

Rejected because there are already a few ProtocolHandlers and we would make harm to the community by breaking the compatibility.

Anything else?

We should port this little API change to stable branches, especially 2.10.x that is the latest version that support JDK8.

This change is needed in order to enhance the shutdown procedure, that could lead to huge latency spikes.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

Staletype/enhancementThe enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions