Permalink
Browse files

[ENT-1774] FlowAsyncOperation deduplication ID (#4068)

  • Loading branch information...
thschroeter committed Oct 19, 2018
1 parent e99fa97 commit f685df46b51a2593c238f587ab93ead704ac544c
@@ -134,6 +134,8 @@
<module name="loadtest_test" target="1.8" />
<module name="mock_main" target="1.8" />
<module name="mock_test" target="1.8" />
<module name="net.corda-verifier_main" target="1.8" />
<module name="net.corda-verifier_test" target="1.8" />
<module name="net.corda_buildSrc_main" target="1.8" />
<module name="net.corda_buildSrc_test" target="1.8" />
<module name="net.corda_canonicalizer_main" target="1.8" />
@@ -12,8 +12,14 @@ import net.corda.core.serialization.CordaSerializable
*/
@CordaSerializable
interface FlowAsyncOperation<R : Any> {
/** Performs the operation in a non-blocking fashion. */
fun execute(): CordaFuture<R>
/**
* Performs the operation in a non-blocking fashion.
* @param deduplicationId If the flow restarts from a checkpoint (due to node restart, or via a visit to the flow
* hospital following an error) the execute method might be called more than once by the Corda flow state machine.
* For each duplicate call, the deduplicationId is guaranteed to be the same allowing duplicate requests to be
* de-duplicated if necessary inside the execute method.
*/
fun execute(deduplicationId: String): CordaFuture<R>
}
// DOCEND FlowAsyncOperation
@@ -24,4 +30,4 @@ fun <T, R : Any> FlowLogic<T>.executeAsync(operation: FlowAsyncOperation<R>, may
val request = FlowIORequest.ExecuteAsyncOperation(operation)
return stateMachine.suspend(request, maySkipCheckpoint)
}
// DOCEND executeAsync
// DOCEND executeAsync
@@ -22,7 +22,7 @@ class WaitForStateConsumption(val stateRefs: Set<StateRef>, val services: Servic
val logger = contextLogger()
}
override fun execute(): CordaFuture<Unit> {
override fun execute(deduplicationId: String): CordaFuture<Unit> {
val futures = stateRefs.map { services.vaultService.whenConsumed(it).toCompletableFuture() }
val completedFutures = futures.filter { it.isDone }
@@ -40,4 +40,4 @@ class WaitForStateConsumption(val stateRefs: Set<StateRef>, val services: Servic
return CompletableFuture.allOf(*futures.toTypedArray()).thenApply { Unit }.asCordaFuture()
}
}
}
@@ -12,7 +12,7 @@
@NotNull
@Override
public CordaFuture<Integer> execute() {
public CordaFuture<Integer> execute(String deduplicationId) {
return CordaFutureImplKt.doneFuture(this.a + this.b);
}
@@ -11,7 +11,7 @@
@NotNull
@Override
public CordaFuture<Integer> execute() {
public CordaFuture<Integer> execute(String deduplicationId) {
throw new IllegalStateException("You shouldn't be calling me");
}
@@ -11,15 +11,15 @@ import net.corda.core.internal.executeAsync
// DOCSTART SummingOperation
class SummingOperation(val a: Int, val b: Int) : FlowAsyncOperation<Int> {
override fun execute(): CordaFuture<Int> {
override fun execute(deduplicationId: String): CordaFuture<Int> {
return doneFuture(a + b)
}
}
// DOCEND SummingOperation
// DOCSTART SummingOperationThrowing
class SummingOperationThrowing(val a: Int, val b: Int) : FlowAsyncOperation<Int> {
override fun execute(): CordaFuture<Int> {
override fun execute(deduplicationId: String): CordaFuture<Int> {
throw IllegalStateException("You shouldn't be calling me")
}
}
@@ -3,9 +3,13 @@ package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.CordaRuntimeException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.executeAsync
import net.corda.core.messaging.startFlow
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.ProgressTracker
@@ -56,6 +60,21 @@ class FlowRetryTest {
assertEquals("$numSessions:$numIterations", result)
}
@Test
fun `async operation deduplication id is stable accross retries`() {
val user = User("mark", "dadada", setOf(Permissions.startFlow<AsyncRetryFlow>()))
driver(DriverParameters(
startNodesInProcess = isQuasarAgentSpecified(),
notarySpecs = emptyList()
)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
}
}
}
@Test
fun `flow gives up after number of exceptions, even if this is the first line of the flow`() {
val user = User("mark", "dadada", setOf(Permissions.startFlow<RetryFlow>()))
@@ -218,6 +237,36 @@ class RetryFlow() : FlowLogic<String>(), IdempotentFlow {
}
}
@StartableByRPC
class AsyncRetryFlow() : FlowLogic<String>(), IdempotentFlow {
companion object {
object FIRST_STEP : ProgressTracker.Step("Step one")
fun tracker() = ProgressTracker(FIRST_STEP)
val deduplicationIds = mutableSetOf<String>()
}
class RecordDeduplicationId: FlowAsyncOperation<String> {
override fun execute(deduplicationId: String): CordaFuture<String> {
val dedupeIdIsNew = deduplicationIds.add(deduplicationId)
if (dedupeIdIsNew) {
throw ExceptionToCauseFiniteRetry()
}
return doneFuture(deduplicationId)
}
}
override val progressTracker = tracker()
@Suspendable
override fun call(): String {
progressTracker.currentStep = FIRST_STEP
executeAsync(RecordDeduplicationId())
return "Result"
}
}
@StartableByRPC
class ThrowingFlow() : FlowLogic<String>(), IdempotentFlow {
companion object {
@@ -237,4 +286,4 @@ class ThrowingFlow() : FlowLogic<String>(), IdempotentFlow {
progressTracker.currentStep = FIRST_STEP
return "Result"
}
}
}
@@ -124,7 +124,7 @@ sealed class Action {
/**
* Execute the specified [operation].
*/
data class ExecuteAsyncOperation(val operation: FlowAsyncOperation<*>) : Action()
data class ExecuteAsyncOperation(val deduplicationId: String, val operation: FlowAsyncOperation<*>) : Action()
/**
* Release soft locks associated with given ID (currently the flow ID).
@@ -221,7 +221,7 @@ class ActionExecutorImpl(
@Suspendable
private fun executeAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) {
val operationFuture = action.operation.execute()
val operationFuture = action.operation.execute(action.deduplicationId)
operationFuture.thenMatch(
success = { result ->
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
@@ -411,7 +411,10 @@ class StartedFlowTransition(
private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult {
return builder {
actions.add(Action.ExecuteAsyncOperation(flowIORequest.operation))
// The `numberOfSuspends` is added to the deduplication ID in case an async
// operation is executed multiple times within the same flow.
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString()
actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation))
FlowContinuation.ProcessEvents
}
}

0 comments on commit f685df4

Please sign in to comment.