diff --git a/client/build.gradle b/client/build.gradle index aeddf4f..043f006 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -11,7 +11,7 @@ plugins { } group 'io.dapr' -version = '1.5.10' +version = '1.5.11-SNAPSHOT' archivesBaseName = 'durabletask-client' def grpcVersion = '1.69.0' diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java index 94bc18a..1e56577 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java @@ -178,6 +178,19 @@ default Task> anyOf(Task... tasks) { */ Task createTimer(Duration delay); + /** + * Creates a durable timer that expires after the specified delay. + *

+ * Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple, + * internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However, + * it may be visible in framework logs and the stored history state. + * + * @param name of the timer + * @param delay the amount of time before the timer should expire + * @return a new {@code Task} that completes after the specified delay + */ + Task createTimer(String name, Duration delay); + /** * Creates a durable timer that expires after the specified timestamp with specific zone. *

@@ -190,6 +203,19 @@ default Task> anyOf(Task... tasks) { */ Task createTimer(ZonedDateTime zonedDateTime); + /** + * Creates a durable timer that expires after the specified timestamp with specific zone. + *

+ * Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple, + * internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However, + * it may be visible in framework logs and the stored history state. + * + * @param name for the timer + * @param zonedDateTime timestamp with specific zone when the timer should expire + * @return a new {@code Task} that completes after the specified delay + */ + Task createTimer(String name, ZonedDateTime zonedDateTime); + /** * Transitions the orchestration into the {@link OrchestrationRuntimeStatus#COMPLETED} state with the given output. * diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 8ef9ffb..74bbe5f 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -492,7 +492,7 @@ public Task waitForExternalEvent(String name, Duration timeout, Class // If a non-infinite timeout is specified, schedule an internal durable timer. // If the timer expires and the external event task hasn't yet completed, we'll cancel the task. if (hasTimeout) { - this.createTimer(timeout).future.thenRun(() -> { + this.createTimer(name, timeout).future.thenRun(() -> { if (!eventTask.isDone()) { // Book-keeping - remove the task record for the canceled task eventQueue.removeIf(t -> t.task == eventTask); @@ -632,7 +632,26 @@ public Task createTimer(Duration duration) { Helpers.throwIfArgumentNull(duration, "duration"); Instant finalFireAt = this.currentInstant.plus(duration); - return createTimer(finalFireAt); + return createTimer("", finalFireAt); + } + + public Task createTimer(String name, Duration duration) { + Helpers.throwIfOrchestratorComplete(this.isComplete); + Helpers.throwIfArgumentNull(duration, "duration"); + Helpers.throwIfArgumentNull(name, "name"); + + Instant finalFireAt = this.currentInstant.plus(duration); + return createTimer(name, finalFireAt); + } + + @Override + public Task createTimer(String name, ZonedDateTime zonedDateTime) { + Helpers.throwIfOrchestratorComplete(this.isComplete); + Helpers.throwIfArgumentNull(zonedDateTime, "zonedDateTime"); + Helpers.throwIfArgumentNull(name, "name"); + + Instant finalFireAt = zonedDateTime.toInstant(); + return createTimer(name, finalFireAt); } @Override @@ -641,18 +660,19 @@ public Task createTimer(ZonedDateTime zonedDateTime) { Helpers.throwIfArgumentNull(zonedDateTime, "zonedDateTime"); Instant finalFireAt = zonedDateTime.toInstant(); - return createTimer(finalFireAt); + return createTimer("", finalFireAt); } - private Task createTimer(Instant finalFireAt) { - return new TimerTask(finalFireAt); + private Task createTimer(String name, Instant finalFireAt) { + return new TimerTask(name, finalFireAt); } - private CompletableTask createInstantTimer(int id, Instant fireAt) { + private CompletableTask createInstantTimer(String name, int id, Instant fireAt) { Timestamp ts = DataConverter.getTimestampFromInstant(fireAt); this.pendingActions.put(id, OrchestratorAction.newBuilder() .setId(id) - .setCreateTimer(CreateTimerAction.newBuilder().setFireAt(ts)) + .setCreateTimer(CreateTimerAction.newBuilder() + .setName(name + "-" + id).setFireAt(ts)) .build()); if (!this.isReplaying) { @@ -1022,10 +1042,10 @@ private class TimerTask extends CompletableTask { private Instant finalFireAt; CompletableTask task; - public TimerTask(Instant finalFireAt) { + public TimerTask(String name, Instant finalFireAt) { super(); - CompletableTask firstTimer = createTimerTask(finalFireAt); - CompletableFuture timerChain = createTimerChain(finalFireAt, firstTimer.future); + CompletableTask firstTimer = createTimerTask(name, finalFireAt); + CompletableFuture timerChain = createTimerChain(name, finalFireAt, firstTimer.future); this.task = new CompletableTask<>(timerChain); this.finalFireAt = finalFireAt; } @@ -1035,26 +1055,26 @@ public TimerTask(Instant finalFireAt) { // currentFuture completes, we check if we have not yet reached finalFireAt. If that is the case, we create a new sub-timer // task and make a recursive call on that new sub-timer task so that once it completes, another sub-timer task is created // if necessary. Otherwise, we return and no more sub-timers are created. - private CompletableFuture createTimerChain(Instant finalFireAt, CompletableFuture currentFuture) { + private CompletableFuture createTimerChain(String name, Instant finalFireAt, CompletableFuture currentFuture) { return currentFuture.thenRun(() -> { Instant currentInstsanceMinusNanos = currentInstant.minusNanos(currentInstant.getNano()); Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano()); if (currentInstsanceMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) { return; } - Task nextTimer = createTimerTask(finalFireAt); - createTimerChain(finalFireAt, nextTimer.future); + Task nextTimer = createTimerTask(name + "-next", finalFireAt); + createTimerChain(name, finalFireAt, nextTimer.future); }); } - private CompletableTask createTimerTask(Instant finalFireAt) { + private CompletableTask createTimerTask(String name, Instant finalFireAt) { CompletableTask nextTimer; Duration remainingTime = Duration.between(currentInstant, finalFireAt); if (remainingTime.compareTo(maximumTimerInterval) > 0) { Instant nextFireAt = currentInstant.plus(maximumTimerInterval); - nextTimer = createInstantTimer(sequenceNumber++, nextFireAt); + nextTimer = createInstantTimer(name, sequenceNumber++, nextFireAt); } else { - nextTimer = createInstantTimer(sequenceNumber++, finalFireAt); + nextTimer = createInstantTimer(name, sequenceNumber++, finalFireAt); } nextTimer.setParentTask(this); return nextTimer; @@ -1185,7 +1205,7 @@ public void tryRetry(TaskFailedException ex) { Duration delay = this.getNextDelay(); if (!delay.isZero() && !delay.isNegative()) { // Use a durable timer to create the delay between retries - this.context.createTimer(delay).await(); + this.context.createTimer(getName() + "-retry",delay).await(); } this.totalRetryTime = Duration.between(this.startTime, this.context.getCurrentInstant()); diff --git a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java index 5d64153..a78325e 100644 --- a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java +++ b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java @@ -76,6 +76,7 @@ void emptyOrchestration() throws TimeoutException { defaultTimeout, true); + assertNotNull(instance); assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); assertEquals(input, instance.readInputAs(String.class)); @@ -174,6 +175,8 @@ void loopWithTimer() throws IOException, TimeoutException { } + + } @Test