Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up StreamObserver on failed openStream attempt to solve No Handlers Found exception #250

Merged

Conversation

smcvb
Copy link
Contributor

@smcvb smcvb commented Oct 28, 2022

Whenever the CommandChannelImpl and QueryChannelImpl try to open a stream with their CommandProviderOutbound and QueryProviderOutbound respectively we should catch exceptions.
Exceptions may occur when, for example, the connection to the Axon Server instance is down or faulty at that moment in time.

If we do not catch these exceptions, the channel implementations get stuck in a faulty state.
In this state, they've already created and their StreamObserver instance.
As the StreamObserver instance is used to deduce whether the connection is live, a following reconnect will be ignored.
A consequence of this, is that the command and query handlers never get registered on a reconnect, resulting in the NoHandlerForCommandException and NoHandlerForQueryException

A user of the axonserver-connector-java module may reach this problematic state when all Axon Server connections are down while it dispatches a command or query.
This is particularly easy to replicate when using Axon Server Standard edition, as only a single connection is present.

To resolve the above, this pull request catches the exceptions on the openStream invocations towards gRPC.
If an exception is caught, the constructed StreamObserver is removed, thus resolving the faulty state.
Additionally to this, several debug and trace statements are introduced to ease future debugging.

Lastly, adding a test case to the Integration Test suite was difficult, as ToxiProxy only breaks the connection for a while instead of a harsh connection breakdown.
Due to this, it's not trivial to reach the above described state, and hence, no test cases were introduced.

Whenever the CommandChannelImpl and QueryChannelImpl try to open a
stream with the CommandProviderOutbound and QueryProviderOutbound
respectively, we should catch exceptions. If we do not, the channel
implementations get stuck in a faulty state, where they created their
StreamObservers, but weren't able to connect them. This situation can
occur, if an AxonServerConnector-Java user dispatches a command/query
while all connections are down. By catching the exception, and clearing
the StreamObservers, we resolve this issue.

#bug/stream-cleanup-on-opening-failure
@CodeDrivenMitch
Copy link
Contributor

@smcvb I am testing this locally with the setup that failed before. I'm afraid the problem hasn't been resolved.

I do see the additional log line:

2022-10-31 09:17:52.958  WARN 45477 --- [   scheduling-1] i.a.a.c.command.impl.CommandChannelImpl  : Failed trying to open stream for CommandChannel in context [default]. Cleaning up for following attempt...

io.grpc.StatusRuntimeException: UNKNOWN: Uncaught exception in the SynchronizationContext. Re-thrown.

It keeps looping over this error on every connect attempt. The root cause is a scheduler that is shut down:

org.axonframework.axonserver.connector.command.AxonServerCommandDispatchException: Exception while dispatching a command to AxonServer
	at org.axonframework.axonserver.connector.command.AxonServerCommandBus.doDispatch(AxonServerCommandBus.java:197) ~[axon-server-connector-4.6.1.jar:4.6.1]
	at org.axonframework.axonserver.connector.command.AxonServerCommandBus.dispatch(AxonServerCommandBus.java:161) ~[axon-server-connector-4.6.1.jar:4.6.1]
	at org.axonframework.commandhandling.gateway.AbstractCommandGateway.send(AbstractCommandGateway.java:76) ~[axon-messaging-4.6.1.jar:4.6.1]
	at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:83) ~[axon-messaging-4.6.1.jar:4.6.1]
	at org.axonframework.extensions.tracing.TracingCommandGateway.lambda$doSendAndExtract$5(TracingCommandGateway.java:152) ~[axon-tracing-4.6.1.jar:4.6.1]
	at org.axonframework.extensions.tracing.TracingCommandGateway.sendWithSpan(TracingCommandGateway.java:172) ~[axon-tracing-4.6.1.jar:4.6.1]
	at org.axonframework.extensions.tracing.TracingCommandGateway.doSendAndExtract(TracingCommandGateway.java:151) ~[axon-tracing-4.6.1.jar:4.6.1]
	at org.axonframework.extensions.tracing.TracingCommandGateway.sendAndWait(TracingCommandGateway.java:118) ~[axon-tracing-4.6.1.jar:4.6.1]
	at io.axoniq.demo.hotel.booking.command.demo.AutomaticAccountCommandDispatcher.dispatch(AutomaticAccountCommandDispatcher.kt:18) ~[classes/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
	at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.22.jar:5.3.22]
	at io.opentelemetry.javaagent.instrumentation.spring.scheduling.SpringSchedulingRunnableWrapper.run(SpringSchedulingRunnableWrapper.java:35) ~[na:na]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.22.jar:5.3.22]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.grpc.StatusRuntimeException: UNKNOWN: Uncaught exception in the SynchronizationContext. Re-thrown.
	at io.grpc.Status.asRuntimeException(Status.java:526) ~[grpc-api-1.49.0.jar:1.49.0]
	at io.grpc.internal.RetriableStream$1.uncaughtException(RetriableStream.java:75) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:97) ~[grpc-api-1.49.0.jar:1.49.0]
	at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) ~[grpc-api-1.49.0.jar:1.49.0]
	at io.grpc.internal.RetriableStream.cancel(RetriableStream.java:493) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.internal.RetriableStream.start(RetriableStream.java:361) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:289) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:191) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) ~[grpc-api-1.49.0.jar:1.49.0]
	at io.opentelemetry.javaagent.shaded.instrumentation.grpc.v1_6.TracingClientInterceptor$TracingClientCall.start(TracingClientInterceptor.java:97) ~[na:na]
	at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) ~[grpc-api-1.49.0.jar:1.49.0]
	at io.axoniq.axonserver.connector.impl.GrpcBufferingInterceptor$AdditionalMessageRequestingCall.start(GrpcBufferingInterceptor.java:69) ~[axonserver-connector-java-4.6.3-SNAPSHOT.jar:4.6.3-SNAPSHOT]
	at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) ~[grpc-api-1.49.0.jar:1.49.0]
	at io.axoniq.axonserver.connector.impl.HeaderAttachingInterceptor$HeaderAttachedCall.start(HeaderAttachingInterceptor.java:68) ~[axonserver-connector-java-4.6.3-SNAPSHOT.jar:4.6.3-SNAPSHOT]
	at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:341) ~[grpc-stub-1.49.0.jar:1.49.0]
	at io.grpc.stub.ClientCalls.asyncStreamingRequestCall(ClientCalls.java:332) ~[grpc-stub-1.49.0.jar:1.49.0]
	at io.grpc.stub.ClientCalls.asyncBidiStreamingCall(ClientCalls.java:119) ~[grpc-stub-1.49.0.jar:1.49.0]
	at io.axoniq.axonserver.grpc.command.CommandServiceGrpc$CommandServiceStub.openStream(CommandServiceGrpc.java:195) ~[axonserver-connector-java-4.6.3-SNAPSHOT.jar:4.6.3-SNAPSHOT]
	at io.axoniq.axonserver.connector.command.impl.CommandChannelImpl.doCreateCommandStream(CommandChannelImpl.java:172) ~[axonserver-connector-java-4.6.3-SNAPSHOT.jar:4.6.3-SNAPSHOT]
	at io.axoniq.axonserver.connector.command.impl.CommandChannelImpl.connect(CommandChannelImpl.java:152) ~[axonserver-connector-java-4.6.3-SNAPSHOT.jar:4.6.3-SNAPSHOT]
	at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.notifyWhenStateChanged(AxonServerManagedChannel.java:260) ~[axonserver-connector-java-4.6.3-SNAPSHOT.jar:4.6.3-SNAPSHOT]
	at io.axoniq.axonserver.connector.impl.ContextConnection.ensureConnected(ContextConnection.java:218) ~[axonserver-connector-java-4.6.3-SNAPSHOT.jar:4.6.3-SNAPSHOT]
	at io.axoniq.axonserver.connector.impl.ContextConnection.commandChannel(ContextConnection.java:171) ~[axonserver-connector-java-4.6.3-SNAPSHOT.jar:4.6.3-SNAPSHOT]
	at org.axonframework.axonserver.connector.command.AxonServerCommandBus.doDispatch(AxonServerCommandBus.java:178) ~[axon-server-connector-4.6.1.jar:4.6.1]
	... 21 common frames omitted
Caused by: java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@6c452cfd rejected from java.util.concurrent.ThreadPoolExecutor@58cdeb8e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 217]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355) ~[na:na]
	at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closedInternal(ClientCallImpl.java:752) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:688) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.internal.RetriableStream$4.run(RetriableStream.java:498) ~[grpc-core-1.49.0.jar:1.49.0]
	at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) ~[grpc-api-1.49.0.jar:1.49.0]
	... 42 common frames omitted

We will have to look into that root cause to fix the re-registering option

Base automatically changed from bug/junit5-and-testng-together to connector-4.6.x October 31, 2022 09:16
Whether exceptions are thrown or reported through a callback, they should result in the same handling logic.
@smcvb
Copy link
Contributor Author

smcvb commented Nov 2, 2022

@abuijze and I had a look at the changes made, diving a bit deeper in it all.
We're confident the predicament occurs because the gRPC does not propagate the exception on the response channel at all, thus not invoking onError.
As such, we've adjusted the changes I made earlier by invoking onError ourselves.
Doing so, we ensure the clean-up occurs, as well as a disconnect.

@MORLACK, if you could give this new format another try, that would be very helpful!

@abuijze
Copy link
Contributor

abuijze commented Nov 3, 2022

I found a discrepancy between grpc's channel.getState(false) == SHUTDOWN and channel.isShutdown(). Whereas the latter checks a flag that is immediately set on calling channel.shutdown(), the former checks on state that is part of a scheduled activity on a channel. Since we check using getState(), it is possible that the reconnection process temporarily keeps a reference to a channel in shutdown state, causing the stack trace @MORLACK is seeing.

I am working on a fix to improve the internal connection tracking.

While isShutdown() returns true immediate upon a shutdown() call, getState() doesn't return SHUTDOWN immediately. The latter state is modified on a scheduled task.

Additional calls to isShutdown() were added to prevent starting calls on a channel that has been shut down already.
Each attempt to retrieve a channel would schedule an attempt to connect if it wasn't in the ready state yet. However, when disconnected, this means that each attempt to perform a call will result in a scheduled task to verify connections.

This commit changes that to only connect a channel when it is created for the first time. If it fails to connect, the channel will itself schedule a task to verify the connection status in a certain timeframe.
@sonarcloud
Copy link

sonarcloud bot commented Nov 3, 2022

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

72.6% 72.6% Coverage
0.0% 0.0% Duplication

@CodeDrivenMitch
Copy link
Contributor

@abuijze I just tested this locally in my hotel demo version that broke it. It fully recovers now!

@CodeDrivenMitch CodeDrivenMitch merged commit 25c857e into connector-4.6.x Nov 4, 2022
@smcvb smcvb deleted the bug/stream-cleanup-on-opening-failure branch November 4, 2022 14:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants