Skip to content

Commit

Permalink
merge: #8827
Browse files Browse the repository at this point in the history
8827: [Backport stable/1.3] fix: throw instead of silently overwriting timers r=oleschoenburg a=github-actions[bot]

# Description
Backport of #8785 to `stable/1.3`.

relates to #8776 #8776

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-cloud[bot] and lenaschoenburg committed Feb 22, 2022
2 parents d19ab52 + f1ca72c commit d4088b8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,29 @@
*/
package io.camunda.zeebe.util.sched;

import io.camunda.zeebe.util.Loggers;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import java.util.concurrent.TimeUnit;
import org.agrona.DeadlineTimerWheel;
import org.agrona.collections.Long2ObjectHashMap;
import org.slf4j.Logger;

public final class ActorTimerQueue extends DeadlineTimerWheel {
private static final Logger LOG = Loggers.ACTOR_LOGGER;
private static final int DEFAULT_TICKS_PER_WHEEL = 32;
private final Long2ObjectHashMap<TimerSubscription> timerJobMap = new Long2ObjectHashMap<>();

private final TimerHandler timerHandler =
new TimerHandler() {
@Override
public boolean onTimerExpiry(final TimeUnit timeUnit, final long now, final long timerId) {
final TimerSubscription timer = timerJobMap.remove(timerId);
(timeUnit, now, timerId) -> {
final TimerSubscription timer = timerJobMap.remove(timerId);

if (timer != null) {
timer.onTimerExpired(timeUnit, now);
}

return true;
if (timer != null) {
timer.onTimerExpired(timeUnit, now);
} else {
LOG.warn("Timer with id {} expired but is not known in this timer queue.", timerId);
}

return true;
};

public ActorTimerQueue(final ActorClock clock) {
Expand All @@ -52,6 +54,13 @@ public void schedule(final TimerSubscription timer, final ActorClock now) {

final long timerId = scheduleTimer(deadline);
timer.setTimerId(timerId);
if (timerJobMap.containsKey(timerId)) {
throw new IllegalStateException(
"Failed scheduling, timer with id "
+ timerId
+ " already exists: "
+ timerJobMap.get(timerId));
}

timerJobMap.put(timerId, timer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,24 @@ public void onTimerExpired(final TimeUnit timeUnit, final long now) {
public void run() {
thread.removeTimer(this);
}

@Override
public String toString() {
return "TimerSubscription{"
+ "timerId="
+ timerId
+ ", deadline="
+ deadline
+ ", timeUnit="
+ timeUnit
+ ", isRecurring="
+ isRecurring
+ ", isDone="
+ isDone
+ ", isCanceled="
+ isCanceled
+ ", thread="
+ thread
+ '}';
}
}

0 comments on commit d4088b8

Please sign in to comment.