Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {
}

group 'io.dapr'
version = '1.5.10'
version = '1.5.11-SNAPSHOT'
archivesBaseName = 'durabletask-client'

def grpcVersion = '1.69.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ default Task<Task<?>> anyOf(Task<?>... tasks) {
*/
Task<Void> createTimer(Duration delay);

/**
* Creates a durable timer that expires after the specified delay.
* <p>
* Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple,
* internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However,
* it may be visible in framework logs and the stored history state.
*
* @param name of the timer
* @param delay the amount of time before the timer should expire
* @return a new {@code Task} that completes after the specified delay
*/
Task<Void> createTimer(String name, Duration delay);

/**
* Creates a durable timer that expires after the specified timestamp with specific zone.
* <p>
Expand All @@ -190,6 +203,19 @@ default Task<Task<?>> anyOf(Task<?>... tasks) {
*/
Task<Void> createTimer(ZonedDateTime zonedDateTime);

/**
* Creates a durable timer that expires after the specified timestamp with specific zone.
* <p>
* Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple,
* internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However,
* it may be visible in framework logs and the stored history state.
*
* @param name for the timer
* @param zonedDateTime timestamp with specific zone when the timer should expire
* @return a new {@code Task} that completes after the specified delay
*/
Task<Void> createTimer(String name, ZonedDateTime zonedDateTime);

/**
* Transitions the orchestration into the {@link OrchestrationRuntimeStatus#COMPLETED} state with the given output.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ public <V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V>
// If a non-infinite timeout is specified, schedule an internal durable timer.
// If the timer expires and the external event task hasn't yet completed, we'll cancel the task.
if (hasTimeout) {
this.createTimer(timeout).future.thenRun(() -> {
this.createTimer(name, timeout).future.thenRun(() -> {
if (!eventTask.isDone()) {
// Book-keeping - remove the task record for the canceled task
eventQueue.removeIf(t -> t.task == eventTask);
Expand Down Expand Up @@ -632,7 +632,26 @@ public Task<Void> createTimer(Duration duration) {
Helpers.throwIfArgumentNull(duration, "duration");

Instant finalFireAt = this.currentInstant.plus(duration);
return createTimer(finalFireAt);
return createTimer("", finalFireAt);
}

public Task<Void> createTimer(String name, Duration duration) {
Helpers.throwIfOrchestratorComplete(this.isComplete);
Helpers.throwIfArgumentNull(duration, "duration");
Helpers.throwIfArgumentNull(name, "name");

Instant finalFireAt = this.currentInstant.plus(duration);
return createTimer(name, finalFireAt);
}

@Override
public Task<Void> createTimer(String name, ZonedDateTime zonedDateTime) {
Helpers.throwIfOrchestratorComplete(this.isComplete);
Helpers.throwIfArgumentNull(zonedDateTime, "zonedDateTime");
Helpers.throwIfArgumentNull(name, "name");

Instant finalFireAt = zonedDateTime.toInstant();
return createTimer(name, finalFireAt);
}

@Override
Expand All @@ -641,18 +660,19 @@ public Task<Void> createTimer(ZonedDateTime zonedDateTime) {
Helpers.throwIfArgumentNull(zonedDateTime, "zonedDateTime");

Instant finalFireAt = zonedDateTime.toInstant();
return createTimer(finalFireAt);
return createTimer("", finalFireAt);
}

private Task<Void> createTimer(Instant finalFireAt) {
return new TimerTask(finalFireAt);
private Task<Void> createTimer(String name, Instant finalFireAt) {
return new TimerTask(name, finalFireAt);
}

private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
private CompletableTask<Void> createInstantTimer(String name, int id, Instant fireAt) {
Timestamp ts = DataConverter.getTimestampFromInstant(fireAt);
this.pendingActions.put(id, OrchestratorAction.newBuilder()
.setId(id)
.setCreateTimer(CreateTimerAction.newBuilder().setFireAt(ts))
.setCreateTimer(CreateTimerAction.newBuilder()
.setName(name + "-" + id).setFireAt(ts))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should add the id here, the name is the name, and whatever is provided should be respected

.build());

if (!this.isReplaying) {
Expand Down Expand Up @@ -1022,10 +1042,10 @@ private class TimerTask extends CompletableTask<Void> {
private Instant finalFireAt;
CompletableTask<Void> task;

public TimerTask(Instant finalFireAt) {
public TimerTask(String name, Instant finalFireAt) {
super();
CompletableTask<Void> firstTimer = createTimerTask(finalFireAt);
CompletableFuture<Void> timerChain = createTimerChain(finalFireAt, firstTimer.future);
CompletableTask<Void> firstTimer = createTimerTask(name, finalFireAt);
CompletableFuture<Void> timerChain = createTimerChain(name, finalFireAt, firstTimer.future);
this.task = new CompletableTask<>(timerChain);
this.finalFireAt = finalFireAt;
}
Expand All @@ -1035,26 +1055,26 @@ public TimerTask(Instant finalFireAt) {
// currentFuture completes, we check if we have not yet reached finalFireAt. If that is the case, we create a new sub-timer
// task and make a recursive call on that new sub-timer task so that once it completes, another sub-timer task is created
// if necessary. Otherwise, we return and no more sub-timers are created.
private CompletableFuture<Void> createTimerChain(Instant finalFireAt, CompletableFuture<Void> currentFuture) {
private CompletableFuture<Void> createTimerChain(String name, Instant finalFireAt, CompletableFuture<Void> currentFuture) {
return currentFuture.thenRun(() -> {
Instant currentInstsanceMinusNanos = currentInstant.minusNanos(currentInstant.getNano());
Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano());
if (currentInstsanceMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) {
return;
}
Task<Void> nextTimer = createTimerTask(finalFireAt);
createTimerChain(finalFireAt, nextTimer.future);
Task<Void> nextTimer = createTimerTask(name + "-next", finalFireAt);
createTimerChain(name, finalFireAt, nextTimer.future);
});
}

private CompletableTask<Void> createTimerTask(Instant finalFireAt) {
private CompletableTask<Void> createTimerTask(String name, Instant finalFireAt) {
CompletableTask<Void> nextTimer;
Duration remainingTime = Duration.between(currentInstant, finalFireAt);
if (remainingTime.compareTo(maximumTimerInterval) > 0) {
Instant nextFireAt = currentInstant.plus(maximumTimerInterval);
nextTimer = createInstantTimer(sequenceNumber++, nextFireAt);
nextTimer = createInstantTimer(name, sequenceNumber++, nextFireAt);
} else {
nextTimer = createInstantTimer(sequenceNumber++, finalFireAt);
nextTimer = createInstantTimer(name, sequenceNumber++, finalFireAt);
}
nextTimer.setParentTask(this);
return nextTimer;
Expand Down Expand Up @@ -1185,7 +1205,7 @@ public void tryRetry(TaskFailedException ex) {
Duration delay = this.getNextDelay();
if (!delay.isZero() && !delay.isNegative()) {
// Use a durable timer to create the delay between retries
this.context.createTimer(delay).await();
this.context.createTimer(getName() + "-retry",delay).await();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case I think this is good, its a made up name, but that it shows that it comes from the activity but is a retry

}

this.totalRetryTime = Duration.between(this.startTime, this.context.getCurrentInstant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void emptyOrchestration() throws TimeoutException {
defaultTimeout,
true);


assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals(input, instance.readInputAs(String.class));
Expand Down Expand Up @@ -174,6 +175,8 @@ void loopWithTimer() throws IOException, TimeoutException {


}


}

@Test
Expand Down
Loading