Skip to content

Commit

Permalink
CORDA-1099: Orchestrated clean shutdown from Shell (#2831)
Browse files Browse the repository at this point in the history
  • Loading branch information
sollecitom committed Mar 19, 2018
1 parent c964e50 commit 7a077e7
Show file tree
Hide file tree
Showing 35 changed files with 680 additions and 304 deletions.
103 changes: 94 additions & 9 deletions .ci/api-current.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class NodeMonitorModel {
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
val client = CordaRPCClient(
nodeHostAndPort,
CordaRPCClientConfiguration.DEFAULT.copy(
connectionMaxRetryInterval = 10.seconds
)
object : CordaRPCClientConfiguration {
override val connectionMaxRetryInterval = 10.seconds
}
)
val connection = client.start(username, password)
val proxy = connection.proxy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@ import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.node.services.Permissions.Companion.all
import net.corda.testing.core.*
import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.subjects.PublishSubject
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue

class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) {
private val rpcUser = User("user1", "test", permissions = setOf(
startFlow<CashIssueFlow>(),
startFlow<CashPaymentFlow>(),
invokeRpc("vaultQueryBy"),
invokeRpc(CordaRPCOps::stateMachinesFeed),
invokeRpc("vaultQueryByCriteria"))
private val rpcUser = User("user1", "test", permissions = setOf(all())
)
private lateinit var node: StartedNode<Node>
private lateinit var identity: Party
Expand All @@ -53,7 +53,9 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
@Before
fun setUp() {
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!)
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, object : CordaRPCClientConfiguration {
override val maxReconnectAttempts = 5
})
identity = node.info.identityFromX500Name(ALICE_NAME)
}

Expand Down Expand Up @@ -81,6 +83,61 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
}
}

@Test
fun `shutdown command stops the node`() {

val nodeIsShut: PublishSubject<Unit> = PublishSubject.create()
val latch = CountDownLatch(1)
var successful = false
val maxCount = 20
var count = 0
CloseableExecutor(Executors.newSingleThreadScheduledExecutor()).use { scheduler ->

val task = scheduler.scheduleAtFixedRate({
try {
println("Checking whether node is still running...")
client.start(rpcUser.username, rpcUser.password).use {
println("... node is still running.")
if (count == maxCount) {
nodeIsShut.onError(AssertionError("Node does not get shutdown by RPC"))
}
count++
}
} catch (e: ActiveMQNotConnectedException) {
println("... node is not running.")
nodeIsShut.onCompleted()
} catch (e: ActiveMQSecurityException) {
// nothing here - this happens if trying to connect before the node is started
} catch (e: Throwable) {
nodeIsShut.onError(e)
}
}, 1, 1, TimeUnit.SECONDS)

nodeIsShut.doOnError { error ->
error.printStackTrace()
successful = false
task.cancel(true)
latch.countDown()
}.doOnCompleted {
successful = (node.internals.started == null)
task.cancel(true)
latch.countDown()
}.subscribe()

client.start(rpcUser.username, rpcUser.password).use { rpc -> rpc.proxy.shutdown() }

latch.await()
assertThat(successful).isTrue()
}
}

private class CloseableExecutor(private val delegate: ScheduledExecutorService) : AutoCloseable, ScheduledExecutorService by delegate {

override fun close() {
delegate.shutdown()
}
}

@Test
fun `close-send deadlock and premature shutdown on empty observable`() {
println("Starting client")
Expand Down Expand Up @@ -141,7 +198,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C

val updates = proxy.stateMachinesFeed().updates

node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0),identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow()
node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow()
proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow()
proxy.startFlowDynamic(CashIssueFlow::class.java, 1000.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.corda.client.rpc

import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.context.Trace
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
Expand Down Expand Up @@ -105,7 +105,7 @@ class RPCStabilityTests {
Try.on {
startRpcClient<RPCOps>(
server.get().broker.hostAndPort!!,
configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1)
configuration = CordaRPCClientConfigurationImpl.default.copy(minimumServerProtocolVersion = 1)
).get()
}
}
Expand Down Expand Up @@ -240,7 +240,7 @@ class RPCStabilityTests {
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
// Set retry interval to 1s to reduce test duration
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds)
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
Expand All @@ -266,7 +266,7 @@ class RPCStabilityTests {
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
// Set retry interval to 1s to reduce test duration
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
Expand Down Expand Up @@ -298,7 +298,7 @@ class RPCStabilityTests {
val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()

val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
Expand Down
63 changes: 43 additions & 20 deletions client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package net.corda.client.rpc

import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.messaging.CordaRPCOps
Expand All @@ -23,23 +23,46 @@ class CordaRPCConnection internal constructor(connection: RPCConnection<CordaRPC

/**
* Can be used to configure the RPC client connection.
*
* @property connectionMaxRetryInterval How much time to wait between connection retries if the server goes down. This
* time will be reached via exponential backoff.
*/
data class CordaRPCClientConfiguration(val connectionMaxRetryInterval: Duration) {
internal fun toRpcClientConfiguration(): RPCClientConfiguration {
return RPCClientConfiguration.default.copy(
connectionMaxRetryInterval = connectionMaxRetryInterval
)
}
interface CordaRPCClientConfiguration {

/** The minimum protocol version required from the server */
val minimumServerProtocolVersion: Int get() = default().minimumServerProtocolVersion
/**
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
* constructing call stacks is a moderately expensive operation.
*/
val trackRpcCallSites: Boolean get() = default().trackRpcCallSites
/**
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
* duration. If set too low it wastes client side cycles.
*/
val reapInterval: Duration get() = default().reapInterval
/** The number of threads to use for observations (for executing [Observable.onNext]) */
val observationExecutorPoolSize: Int get() = default().observationExecutorPoolSize
/**
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
* See the implementation of [com.google.common.cache.LocalCache] for details.
*/
val cacheConcurrencyLevel: Int get() = default().cacheConcurrencyLevel
/** The retry interval of artemis connections in milliseconds */
val connectionRetryInterval: Duration get() = default().connectionRetryInterval
/** The retry interval multiplier for exponential backoff */
val connectionRetryIntervalMultiplier: Double get() = default().connectionRetryIntervalMultiplier
/** Maximum retry interval */
val connectionMaxRetryInterval: Duration get() = default().connectionMaxRetryInterval
/** Maximum reconnect attempts on failover */
val maxReconnectAttempts: Int get() = default().maxReconnectAttempts
/** Maximum file size */
val maxFileSize: Int get() = default().maxFileSize
/** The cache expiry of a deduplication watermark per client. */
val deduplicationCacheExpiry: Duration get() = default().deduplicationCacheExpiry

companion object {
/**
* Returns the default configuration we recommend you use.
*/
@JvmField
val DEFAULT = CordaRPCClientConfiguration(connectionMaxRetryInterval = RPCClientConfiguration.default.connectionMaxRetryInterval)
fun default(): CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default
}
}

Expand Down Expand Up @@ -72,25 +95,25 @@ data class CordaRPCClientConfiguration(val connectionMaxRetryInterval: Duration)
*/
class CordaRPCClient private constructor(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null
) {
@JvmOverloads
constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(hostAndPort, configuration, null)
constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(hostAndPort, configuration, null)

companion object {
internal fun createWithSsl(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null
): CordaRPCClient {
return CordaRPCClient(hostAndPort, configuration, sslConfiguration)
}

internal fun createWithSslAndClassLoader(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null
): CordaRPCClient {
Expand All @@ -112,7 +135,7 @@ class CordaRPCClient private constructor(

private val rpcClient = RPCClient<CordaRPCOps>(
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration),
configuration.toRpcClientConfiguration(),
configuration,
if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,32 @@ package net.corda.client.rpc.internal

import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.pendingFlowsCount
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.config.SSLConfiguration
import rx.Observable

/** Utility which exposes the internal Corda RPC constructor to other internal Corda components */
fun createCordaRPCClientWithSsl(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null
) = CordaRPCClient.createWithSsl(hostAndPort, configuration, sslConfiguration)

fun createCordaRPCClientWithSslAndClassLoader(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)

fun CordaRPCOps.drainAndShutdown(): Observable<Unit> {

setFlowsDrainingModeEnabled(true)
return pendingFlowsCount().updates
.doOnError { error ->
throw error
}
.doOnCompleted { shutdown() }.map { }
}
Loading

0 comments on commit 7a077e7

Please sign in to comment.