-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CORDA-2858: Wire-up Corda components with better RPC reconnect logic #4933
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After much thought @tudor-malene I think this is the wrong approach. This sample code is pretending too much to be a real RPC client API and it's too complex for a sample for people copy. I rather want code that looks and feels like a layer above the RPC, like what your original PR did. We'll confuse people into thinking this a real API that we're providing.
If you look at our samples, they inject the
If we plan to make our samples more resilient, there is really no other way that I can see. |
failOneConnection.set(true) | ||
} | ||
|
||
fun close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's implement AutoCloseable
for the easy win.
/** | ||
* Wrapper over [CordaRPCOps] that handles exceptions when the node or the connection to the node fail. | ||
* | ||
* All operations are retried on failure except flow start operations that died before receiving a valid [FlowHandle], in which case a [CouldNotStartFlowException] is thrown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that die
* All operations are retried on failure except flow start operations that died before receiving a valid [FlowHandle], in which case a [CouldNotStartFlowException] is thrown. | ||
* | ||
* When calling methods that return a [DataFeed] like [CordaRPCOps.vaultTrackBy], the returned [DataFeed.updates] object will no longer | ||
* be a [rx.Observable] but an instance of [ReconnectingObservableImpl]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not true as ReconnectingObservableImpl
is an Observable
, but more to the point, does it matter? ReconnectingObservableImpl
is internal so it's not visible to the client code.
What you should be saying is that the caller has to cast to ReconnectingObservable
and use its subscribe method. Using rx.Observable.subscribe
will throw an exception (judging by the rx code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I meant to say they're instances of ReconnectingObservable
, which is defined in this file
* | ||
* *This class is not a stable API. Any project that wants to use it, must copy and paste it.* | ||
*/ | ||
class ReconnectingCordaRPCOps internal constructor(private val reconnectingRPCConnection: ReconnectingRPCConnection) : CordaRPCOps by proxy(reconnectingRPCConnection) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
internal c'tors are still public in java. let's keep it private and user an internal factory method if need be
} | ||
|
||
private val log = contextLogger() | ||
private val observersPool = Executors.newCachedThreadPool() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be passed in via the c'tor. The client code should control what type of executor they use.
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler { | ||
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow") | ||
|
||
override fun invoke(proxy: Any?, method: Method?, args: Array<out Any>?): Any? = try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
convert to block body for this method
* | ||
* *This class is not a stable API. Any project that wants to use it, must copy and paste it.* | ||
*/ | ||
class ReconnectingCordaRPCOps internal constructor(private val reconnectingRPCConnection: ReconnectingRPCConnection) : CordaRPCOps by proxy(reconnectingRPCConnection) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you rename the file to ReconnectingCordaRPCOps.kt
*/ | ||
internal fun await(): Throwable? = terminated.poll(100, TimeUnit.MINUTES).orElse(null) | ||
|
||
internal fun reset(): ObserverHandle = this.also { it.terminated.clear() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used
|
||
internal class ReconnectingObservableImpl<T> internal constructor( | ||
private val reconnectingRPCConnection: ReconnectingRPCConnection, | ||
val initial: DataFeed<*, T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that your design doesn't block, we can easily give the calling code the snapshot
data. Perhaps just include in the ObserverHandle
class. And document that this snapshot is always going to be the initial snapshot - if there's any re-connections then subsequent snapshots are not returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calling code gets the initial snapshot now as part of the returned DataFeed
I will document that it is the initial one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes of course!
if (subscriptionError == null) return | ||
|
||
// Only continue if the subscription failed. | ||
reconnectingRPCConnection.error(subscriptionError) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not every error from the Observable is a sign that the connection is down. The progress tracker updates will send any exception the flow throws. The shell uses this to print any error from the flow to the console (have a look at ANSIProgressRenderer#renderInternal
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there is a TODO in the reconnectingRPCConnection.error
method to handle error cases.
36a778a
to
ad9c465
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good. We're missing the updates to the docs.
* [runFlow] is a function that starts a flow | ||
* [hasFlowCompleted] is a function that checks if the flow has actually completed by checking some side-effect, for example the vault. | ||
*/ | ||
fun runFlowWithLogicRetry(runFlow: () -> StateMachineRunId, hasFlowCompleted: () -> Boolean, onFlowCompleted: (StateMachineRunId?) -> Unit = {}, timeout: Duration = 4.seconds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this to the top so that's more obvious.
* [runFlow] is a function that starts a flow | ||
* [hasFlowCompleted] is a function that checks if the flow has actually completed by checking some side-effect, for example the vault. | ||
*/ | ||
fun runFlowWithLogicRetry(runFlow: () -> StateMachineRunId, hasFlowCompleted: () -> Boolean, onFlowCompleted: (StateMachineRunId?) -> Unit = {}, timeout: Duration = 4.seconds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't return flow result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You no longer need the FlowHandle
if you provide the onFlowCompleted
and hasFlowCompleted
methods.
Back to you @shamsasari |
089ab8f
to
6edbb58
Compare
docs/source/clientrpc.rst
Outdated
|
||
.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt | ||
In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die | ||
when the underlying Node or tcp connection become unavailable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"when the node"
docs/source/clientrpc.rst
Outdated
|
||
.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt | ||
In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die | ||
when the underlying Node or tcp connection become unavailable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TCP
docs/source/clientrpc.rst
Outdated
In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die | ||
when the underlying Node or tcp connection become unavailable. | ||
|
||
It is the clients responsibility to handle these errors and reconnect once the node is running again to keep running commands. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client's
docs/source/clientrpc.rst
Outdated
In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die | ||
when the underlying Node or tcp connection become unavailable. | ||
|
||
It is the clients responsibility to handle these errors and reconnect once the node is running again to keep running commands. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"to keeping running commands" - not clear what this means.
docs/source/clientrpc.rst
Outdated
|
||
It is the clients responsibility to handle these errors and reconnect once the node is running again to keep running commands. | ||
|
||
Note that commands with side effects such as `start Flow` that don't return because the node died might actually be started on the node, but |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps rephrase to
"RPCs which have a side effect, such as starting flows, may have executed on the node even if the return value is not received by the client. The only way to confirm is to perform a business-level query and retry accordingly. The sample runFlowWithLogicalRetry
helps with this."
docs/source/clientrpc.rst
Outdated
a thorough test that demonstrates it works as expected. | ||
|
||
The code that performs the reconnecting logic is: `ReconnectingCordaRPCOps.kt <https://github.com/corda/samples/blob/release-V|platform_version|/node/src/integration-test/kotlin/net/corda/node/services/rpc/ReconnectingCordaRPCOps.kt>`_. | ||
Note that this code is not exposed as an official Corda API, and must be included directly in the client codebase and adjusted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this a ..note
* [runFlow] - starts a flow and returns the [FlowHandle]. | ||
* [hasFlowCompleted] - Runs a vault query and is able to recreate the result of the flow. | ||
*/ | ||
fun <T> runFlowAndReturnResultWithLogicalRetry(runFlow: (CordaRPCOps) -> FlowHandle<T>, hasFlowCompleted: (CordaRPCOps) -> T?, timeout: Duration = 4.seconds): T = try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
body block
}, | ||
onFlowConfirmed = { stateMachineRunId -> | ||
flowsCountdownLatch.countDown() | ||
log.info("Flow completed for $amount. Remaining flows: ${flowsCountdownLatch.count}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you know the flow has completed? runFlowWithLogicalRetry
does not wait on the result future. It's not clear why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message is wrong. Changing it.
docs/source/clientrpc.rst
Outdated
@@ -356,18 +356,21 @@ Reconnecting RPC clients | |||
------------------------ | |||
|
|||
In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die | |||
when the underlying Node or TCP connection become unavailable. | |||
when the Node or TCP connection become unavailable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lowercase node
if (!hasFlowStarted(this)) { | ||
runFlowWithLogicalRetry(runFlow, hasFlowStarted, onFlowConfirmed, timeout) | ||
} else { | ||
onFlowConfirmed(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How useful is the StateMachineRunId
to the caller if it can be null? I'm wondering if we just remove this param.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I don't think it's useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @tudor-malene!
Now onto the samples! |
/** | ||
* Disconnects from the Corda node for a clean client shutdown. | ||
*/ | ||
override fun close() { | ||
try { | ||
rpcConnection?.notifyServerAndClose() | ||
(rpc as? ReconnectingCordaRPCOps)!!.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need the !!
if you just use as
} | ||
is RPCException -> { | ||
// Deliberately not logging full stack trace as it will be full of internal stacktraces. | ||
log.info("Exception upon establishing connection: ${ex.message}") | ||
log.debug("Exception upon establishing connection: ${ex.message}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use debug { ..}
, and for the two cases below
connection.proxy | ||
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) { | ||
rpcOps = { username: String, password: String -> | ||
if(standalone){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's format this if-else with the relevant whitespace
@@ -438,7 +444,11 @@ object InteractiveShell { | |||
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed() | |||
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } | |||
val subscriber = FlowWatchPrintingSubscriber(out) | |||
stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber) | |||
if (stateMachineUpdates is ReconnectingObservable<*>) { | |||
stateMachineUpdates.asReconnecting().startWithValues(currentStateMachines).subscribe { subscriber.onNext(it) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscriber::onNext
}.toSet() | ||
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates | ||
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet()) | ||
vaultUpdates.asReconnecting().startWithValues(listOf(initialVaultUpdate)).subscribe(vaultUpdatesSubject::onNext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's avoid the extra concept here by passing in the initial values into asReconnecting
. Perhaps an extra asReconnectingWithInitialValues
method instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we've missed the cut for CE4 and C4.1. This PR needs to be split into the docs/sample (which will land in CE4/C4.1) and the integration of the code into explorer and shell (which will go into a later release).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we've missed the cut for CE4 and C4.1. This PR needs to be split into the docs/sample (which will land in CE4/C4.1) and the integration of the code into explorer and shell (which will go into a later release).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we've missed the cut for CE4 and C4.1. This PR needs to be split into the docs/sample (which will land in CE4/C4.1) and the integration of the code into explorer and shell (which will go into a later release).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we've missed the cut for CE4 and C4.1. This PR needs to be split into the docs/sample (which will land in CE4/C4.1) and the integration of the code into explorer and shell (which will go into a later release).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we've missed the cut for CE4 and C4.1. This PR needs to be split into the docs/sample (which will land in CE4/C4.1) and the integration of the code into explorer and shell (which will go into a later release).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we've missed the cut for CE4 and C4.1. This PR needs to be split into the docs/sample (which will land in CE4/C4.1) and the integration of the code into explorer and shell (which will go into a later release).
…isconnects CORDA-2743 - utilities and test to show rpc operations that support disconnects CORDA-2743 - utilities and test to show rpc operations that support disconnects CORDA-2743 - utilities and test to show rpc operations that support disconnects CORDA-2743 - Add more comments. CORDA-2743 - fix typo CORDA-2743 - Address review comments and add proxy to kill connections. CORDA-2743 - Add more varied failure scenarios. CORDA-2743 - Address code review comments and add more failure cases. CORDA-2743 - Add observer for the state machine. CORDA-2743 - tweaks CORDA-2743 - Further cleanup CORDA-2743 - Further cleanup CORDA-2743 - Fix comments. CORDA-2743 - Fix test. CORDA-2743 - Fix after rebase. CORDA-2743 - Fix after rebase. Create reconnecting proxies Showcase retry based on business logic Clean up Address code review comments and rework the logical retry. Address code review comment. Clean up logic and make the test non-flaky. More error conditions. Add support for RPC over SSL Add doc references. Small cleanups Add support for the custom rpc classloader. Cleanup and add blocking start flow method Add support for the custom rpc classloader. Address code review comment. Address code review comment. Implemented the ReconnectingRPC into the WebServer, Standalone Shell, Explorer and BankOfCordaClientApi Address review comments and replicate existing Explorer behaviour
d4a965b
to
e186794
Compare
@shamsasari , this is now the second part of the PR. |
This PR creates a sample wrapper over
CordaRPCOps
that is able to reconnect on any connection exceptions and is also returningDataFeed
Observables that are able to reconnect.This wrapper also provides a method to start flows with a logical retry condition, that would guarantee that this flow will be run even when the underlying node is flaky.
This wrapper is thoroughly tested with a test that randomly kills the node, stops it, kills the connection and introduces random latencies.