Skip to content

Commit

Permalink
refactor: rename runAt to runAfter
Browse files Browse the repository at this point in the history
This name is more fitting as we don't guarantee that the scheduled task
is ran at the provided timestamp. The only guarantee we provide is that
it is ran at some point at or after the timestamp.
  • Loading branch information
korthout committed May 24, 2024
1 parent 5a0e143 commit 176af26
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void schedule(final long dueDate) {
final long scheduleFor = now + Math.max(dueDate - now, timerResolution);
if (!(currentlyPlanned instanceof final Scheduled currentlyScheduled)
|| (currentlyScheduled.scheduledFor() - scheduleFor > timerResolution)) {
final var task = scheduleService.runAt(scheduleFor, this::execute);
final var task = scheduleService.runAfter(scheduleFor, this::execute);
return Optional.of(new Scheduled(scheduleFor, task));
}
return Optional.empty();
Expand All @@ -131,9 +131,9 @@ public void schedule(final long dueDate) {
public void onRecovered(final ReadonlyStreamProcessorContext processingContext) {
final var scheduleService = processingContext.getScheduleService();
if (scheduleAsync) {
this.scheduleService = scheduleService::runAtAsync;
this.scheduleService = scheduleService::runAfterAsync;
} else {
this.scheduleService = scheduleService::runAt;
this.scheduleService = scheduleService::runAfter;
}

shouldRescheduleChecker = true;
Expand Down Expand Up @@ -169,10 +169,11 @@ public void onResumed() {
interface ScheduleDelayed {
/**
* Implemented by either {@link
* io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService#runAt(long, Task)} or {@link
* io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService#runAtAsync(long, Task)}
* io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService#runAfter(long, Task)} or
* {@link io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService#runAfterAsync(long,
* Task)}
*/
ScheduledTask runAt(long timestamp, final Task task);
ScheduledTask runAfter(long timestamp, final Task task);
}

interface NextExecution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void shouldNotScheduleTwoTasks() {
final var mockScheduleService = mock(ProcessingScheduleService.class);
when(mockContext.getScheduleService()).thenReturn(mockScheduleService);
dueDateChecker.onRecovered(mockContext);
verify(mockScheduleService).runAt(anyLong(), any(Task.class));
verify(mockScheduleService).runAfter(anyLong(), any(Task.class));
dueDateChecker.execute(mock(TaskResultBuilder.class));
Mockito.clearInvocations(mockScheduleService);

Expand All @@ -45,7 +45,7 @@ public void shouldNotScheduleTwoTasks() {
dueDateChecker.schedule(currentTimeMillis + 1000); // in one second

// then
verify(mockScheduleService).runAt(anyLong(), any(Task.class));
verify(mockScheduleService).runAfter(anyLong(), any(Task.class));
}

@Test
Expand All @@ -56,11 +56,11 @@ public void shouldScheduleForAnEarlierTasks() {
final var mockContext = mock(ReadonlyStreamProcessorContext.class);
final var mockScheduleService = mock(ProcessingScheduleService.class);
final var mockScheduledTask = mock(ScheduledTask.class);
when(mockScheduleService.runAt(anyLong(), any(Task.class))).thenReturn(mockScheduledTask);
when(mockScheduleService.runAfter(anyLong(), any(Task.class))).thenReturn(mockScheduledTask);

when(mockContext.getScheduleService()).thenReturn(mockScheduleService);
dueDateChecker.onRecovered(mockContext);
verify(mockScheduleService).runAt(anyLong(), any(Task.class));
verify(mockScheduleService).runAfter(anyLong(), any(Task.class));
dueDateChecker.execute(mock(TaskResultBuilder.class));
Mockito.clearInvocations(mockScheduleService);

Expand All @@ -70,7 +70,7 @@ public void shouldScheduleForAnEarlierTasks() {
dueDateChecker.schedule(currentTimeMillis + 100); // in 100 millis

// then
verify(mockScheduleService, times(2)).runAt(anyLong(), any(Task.class));
verify(mockScheduleService, times(2)).runAfter(anyLong(), any(Task.class));
verify(mockScheduledTask).cancel();
}

Expand All @@ -85,18 +85,18 @@ public void shouldRescheduleAutomatically() {
final var mockContext = mock(ReadonlyStreamProcessorContext.class);
final var mockScheduleService = mock(ProcessingScheduleService.class);
final var mockScheduledTask = mock(ScheduledTask.class);
when(mockScheduleService.runAt(anyLong(), any(Task.class))).thenReturn(mockScheduledTask);
when(mockScheduleService.runAfter(anyLong(), any(Task.class))).thenReturn(mockScheduledTask);

when(mockContext.getScheduleService()).thenReturn(mockScheduleService);
dueDateChecker.onRecovered(mockContext);
verify(mockScheduleService).runAt(anyLong(), any(Task.class));
verify(mockScheduleService).runAfter(anyLong(), any(Task.class));
Mockito.clearInvocations(mockScheduleService);

// when
dueDateChecker.execute(mock(TaskResultBuilder.class));

// then
verify(mockScheduleService).runAt(anyLong(), any(Task.class));
verify(mockScheduleService).runAfter(anyLong(), any(Task.class));
}

@Test
Expand All @@ -110,23 +110,23 @@ public void shouldScheduleEarlierIfRescheduledAutomatically() {
final var mockContext = mock(ReadonlyStreamProcessorContext.class);
final var mockScheduleService = mock(ProcessingScheduleService.class);
final var mockScheduledTask = mock(ScheduledTask.class);
when(mockScheduleService.runAt(anyLong(), any(Task.class))).thenReturn(mockScheduledTask);
when(mockScheduleService.runAfter(anyLong(), any(Task.class))).thenReturn(mockScheduledTask);

when(mockContext.getScheduleService()).thenReturn(mockScheduleService);
dueDateChecker.onRecovered(mockContext);
verify(mockScheduleService).runAt(anyLong(), any(Task.class));
verify(mockScheduleService).runAfter(anyLong(), any(Task.class));
Mockito.clearInvocations(mockScheduleService);

dueDateChecker.execute(mock(TaskResultBuilder.class));
// expect that there is a next execution scheduled after execution
verify(mockScheduleService).runAt(anyLong(), any(Task.class));
verify(mockScheduleService).runAfter(anyLong(), any(Task.class));
Mockito.clearInvocations(mockScheduleService);

// when
dueDateChecker.schedule(ActorClock.currentTimeMillis() + 100); // in one second

// then
verify(mockScheduleService).runAt(anyLong(), any(Task.class));
verify(mockScheduleService).runAfter(anyLong(), any(Task.class));
verify(mockScheduledTask).cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public ScheduledTimer runAtFixedRate(final Duration delay, final Runnable runnab
* @param runnable The runnable to run at (or after) the timestamp
* @return A handle to the scheduled timer task
*/
public ScheduledTimer runAt(final long timestamp, final Runnable runnable) {
ensureCalledFromWithinActor("runAt(...)");
public ScheduledTimer runAfter(final long timestamp, final Runnable runnable) {
ensureCalledFromWithinActor("runAfter(...)");
return scheduleTimerSubscription(runnable, job -> new StampedTimerSubscription(job, timestamp));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void shouldNotRunActionIfDeadlineNotReached() throws InterruptedException
new Actor() {
@Override
protected void onActorStarted() {
actor.runAt(1000, action);
actor.runAfter(1000, action);
}
};

Expand All @@ -221,7 +221,7 @@ public void shouldRunActionWhenDeadlineReached() throws InterruptedException {
new Actor() {
@Override
protected void onActorStarted() {
actor.runAt(1000, action);
actor.runAfter(1000, action);
}
};

Expand All @@ -240,7 +240,7 @@ protected void onActorStarted() {
public void shouldCancelRunAt() {
// given
final Runnable action = mock(Runnable.class);
final TimerActor actor = new TimerActor(actorControl -> actorControl.runAt(1000, action));
final TimerActor actor = new TimerActor(actorControl -> actorControl.runAfter(1000, action));

// when
actorScheduler.setClockTime(100);
Expand All @@ -259,7 +259,7 @@ public void shouldCancelRunAt() {
public void shouldCancelRunDelayedAfterExecution() {
// given
final Runnable action = mock(Runnable.class);
final var actor = new TimerActor(actorControl -> actorControl.runAt(1000, action));
final var actor = new TimerActor(actorControl -> actorControl.runAfter(1000, action));

// when
actorScheduler.setClockTime(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ public interface ProcessingScheduleService extends SimpleProcessingScheduleServi
* @implNote If the delay is short, cancellation via {@link ScheduledTask} may happen after
* execution and have no effect.
*/
ScheduledTask runAtAsync(final long timestamp, final Task task);
ScheduledTask runAfterAsync(final long timestamp, final Task task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface SimpleProcessingScheduleService {
* @implNote Can be silently ignored if the scheduling service is not ready.
* @return A representation of the scheduled task.
*/
ScheduledTask runAt(long timestamp, Task task);
ScheduledTask runAfter(long timestamp, Task task);

/**
* Schedule a task to execute at a fixed rate. After an initial delay, the task is executed. Once
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public ScheduledTask runDelayedAsync(final Duration delay, final Task task) {
}

@Override
public ScheduledTask runAtAsync(final long timestamp, final Task task) {
public ScheduledTask runAfterAsync(final long timestamp, final Task task) {
final var futureScheduledTask = concurrencyControl.<ScheduledTask>createFuture();
concurrencyControl.run(
() -> {
// we must run in different actor in order to schedule task
final var scheduledTask = asyncActorService.runAt(timestamp, task);
final var scheduledTask = asyncActorService.runAfter(timestamp, task);
futureScheduledTask.complete(scheduledTask);
});
return new AsyncScheduledTask(futureScheduledTask);
Expand Down Expand Up @@ -91,11 +91,11 @@ public ScheduledTask runDelayed(final Duration delay, final Task task) {
}

@Override
public ScheduledTask runAt(final long timestamp, final Task task) {
public ScheduledTask runAfter(final long timestamp, final Task task) {
if (alwaysAsync) {
return runAtAsync(timestamp, task);
return runAfterAsync(timestamp, task);
} else {
return processorActorService.runAt(timestamp, task);
return processorActorService.runAfter(timestamp, task);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public ScheduledTask runDelayed(final Duration delay, final Task task) {
}

@Override
public ScheduledTask runAt(final long timestamp, final Task task) {
public ScheduledTask runAfter(final long timestamp, final Task task) {
if (actorControl == null) {
LOG.warn("ProcessingScheduleService hasn't been opened yet, ignore scheduled task.");
return NOOP_SCHEDULED_TASK;
}
final var scheduledTimer = actorControl.runAt(timestamp, toRunnable(task));
final var scheduledTimer = actorControl.runAfter(timestamp, toRunnable(task));
return scheduledTimer::cancel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,9 @@ public ScheduledTask runDelayed(final Duration delay, final Task task) {
}

@Override
public ScheduledTask runAt(final long timestamp, final Task task) {
public ScheduledTask runAfter(final long timestamp, final Task task) {
final var futureScheduledTask =
actor.call(() -> processingScheduleService.runAt(timestamp, task));
actor.call(() -> processingScheduleService.runAfter(timestamp, task));
return () ->
actor.run(
() ->
Expand Down Expand Up @@ -535,7 +535,7 @@ void shouldNotExecuteScheduledTaskBeforeTimestamp() {
final var mockedTask = spy(new DummyTask());

// when
scheduleService.runAt(90, mockedTask);
scheduleService.runAfter(90, mockedTask);
actorScheduler.workUntilDone();

// then
Expand All @@ -548,7 +548,7 @@ void shouldExecuteScheduledTaskAtTimestamp() {
final var mockedTask = spy(new DummyTask());

// when
scheduleService.runAt(100, mockedTask);
scheduleService.runAfter(100, mockedTask);
actorScheduler.workUntilDone();

// then
Expand All @@ -562,8 +562,8 @@ void shouldExecuteScheduledTaskInOrderOfTimestamp() {
final var mockedTask2 = spy(new DummyTask());

// when
scheduleService.runAt(100, mockedTask);
scheduleService.runAt(90, mockedTask2);
scheduleService.runAfter(100, mockedTask);
scheduleService.runAfter(90, mockedTask2);
actorScheduler.workUntilDone();

// then
Expand All @@ -580,7 +580,7 @@ void shouldNotExecuteScheduledTaskIfNotInProcessingPhase() {
final var mockedTask = spy(new DummyTask());

// when
scheduleService.runAt(100, mockedTask);
scheduleService.runAfter(100, mockedTask);
// The task will be resubmitted infinitely. So workUntilDone will never return.
actorScheduler.resume();

Expand All @@ -595,7 +595,7 @@ void shouldNotExecuteScheduledTaskIfAborted() {
final var mockedTask = spy(new DummyTask());

// when
scheduleService.runAt(100, mockedTask);
scheduleService.runAfter(100, mockedTask);
actorScheduler.workUntilDone();

// then
Expand All @@ -609,7 +609,7 @@ void shouldExecuteScheduledTaskInProcessing() {
final var mockedTask = spy(new DummyTask());

// when
scheduleService.runAt(100, mockedTask);
scheduleService.runAfter(100, mockedTask);
// The task will be resubmitted infinitely. So workUntilDone will never return.
actorScheduler.resume();
verify(mockedTask, never()).execute(any());
Expand All @@ -632,7 +632,7 @@ void shouldNotExecuteTasksWhenScheduledOnClosedActor() {
final var mockedTask = spy(new DummyTask());

// when
notOpenScheduleService.runAt(100, mockedTask);
notOpenScheduleService.runAfter(100, mockedTask);
actorScheduler.workUntilDone();

// then
Expand All @@ -644,7 +644,7 @@ void shouldWriteRecordAfterTaskWasExecuted() {
// given

// when
scheduleService.runAt(
scheduleService.runAfter(
100,
(builder) -> {
builder.appendCommandRecord(1, ACTIVATE_ELEMENT, Records.processInstance(1));
Expand Down

0 comments on commit 176af26

Please sign in to comment.