Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

package org.apache.samza.scheduler;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Per-task scheduler for keyed timers.
Expand All @@ -36,7 +37,7 @@
* 3) triggers listener whenever a timer fires.
*/
public class EpochTimeScheduler {

private static final Logger LOG = LoggerFactory.getLogger(EpochTimeScheduler.class);
/**
* For run loop to listen to timer firing so it can schedule the callbacks.
*/
Expand All @@ -57,9 +58,33 @@ private EpochTimeScheduler(ScheduledExecutorService executor) {
this.executor = executor;
}

@VisibleForTesting
Map<Object, ScheduledFuture> getScheduledFutures() {
return scheduledFutures;
}

public <K> void setTimer(K key, long timestamp, ScheduledCallback<K> callback) {
checkState(!scheduledFutures.containsKey(key),
String.format("Duplicate key %s registration for the same timer", key));
if (scheduledFutures.containsKey(key)) {
LOG.warn("Registering duplicate callback for key: {}. Attempting to cancel the previous callback", key);
ScheduledFuture<?> scheduledFuture = scheduledFutures.get(key);

/*
* We can have a race between the time we check for the presence of the key and the time we attempt to cancel;
* Hence we check for non-null criteria to ensure the executor hasn't kicked off the callback for the key which
* removes the future from the map before invoking onTimer.
* 1. In the event that callback is running then we will not attempt to interrupt the action and
* cancel will return as unsuccessful.
* 2. In case of the callback successfully executed, we want to allow duplicate registration to keep the
* behavior consistent with the scenario where the callback is already executed or in progress even before
* we entered this condition.
*/
if (scheduledFuture != null
&& !scheduledFuture.cancel(false)
&& !scheduledFuture.isDone()) {
LOG.warn("Failed to cancel the previous callback successfully. Ignoring the current request to register new callback");
return;
}
}

final long delay = timestamp - System.currentTimeMillis();
final ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.apache.samza.task.MessageCollector;
Expand Down Expand Up @@ -56,6 +58,125 @@ private void fireTimers(EpochTimeScheduler factory) {
});
}

@Test
@SuppressWarnings("unchecked")
public void testDuplicateTimerWithCancelableCallback() {
final String timerKey = "timer-1";
ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class);
ScheduledExecutorService executor = mock(ScheduledExecutorService.class);

when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(true);
when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
.thenReturn(mockScheduledFuture1)
.thenAnswer(invocation -> {
Object[] args = invocation.getArguments();
Runnable runnable = (Runnable) args[0];
runnable.run();
return mockScheduledFuture2;
});

EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
long timestamp = System.currentTimeMillis() + 10000;

ScheduledCallback<String> expectedScheduledCallback = mock(ScheduledCallback.class);
scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback);

// verify the interactions with the scheduled future and the scheduler
verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(), anyObject());
verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());

// verify the ready timer and its callback contents to ensure the second invocation callback overwrites the
// first callback
Set<Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback>> readyTimers =
scheduler.removeReadyTimers().entrySet();
assertEquals("Only one timer should be ready to be fired", readyTimers.size(), 1);

Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback> timerEntry = readyTimers.iterator().next();
assertEquals("Expected the scheduled callback from the second invocation",
timerEntry.getValue(),
expectedScheduledCallback);
assertEquals("Expected timer-1 as the key for ready timer",
timerEntry.getKey().getKey(),
timerKey);
}

@Test
@SuppressWarnings("unchecked")
public void testDuplicateTimerWithUnsuccessfulCancellation() {
final String timerKey = "timer-1";
ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
ScheduledExecutorService executor = mock(ScheduledExecutorService.class);

when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false);
when(mockScheduledFuture1.isDone()).thenReturn(false);
when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
.thenReturn(mockScheduledFuture1);

EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
long timestamp = System.currentTimeMillis() + 10000;

scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));

// verify the interactions with the scheduled future and the scheduler
verify(executor, times(1)).schedule((Runnable) anyObject(), anyLong(), anyObject());
verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
verify(mockScheduledFuture1, times(1)).isDone();

Map<Object, ScheduledFuture> scheduledFutures = scheduler.getScheduledFutures();
assertTrue("Expected the timer to be in the queue", scheduledFutures.containsKey(timerKey));
assertEquals("Expected the scheduled callback from the first invocation",
scheduledFutures.get(timerKey),
mockScheduledFuture1);
}

@Test
public void testDuplicateTimerWithFinishedCallbacks() {
final String timerKey = "timer-1";
ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class);
ScheduledExecutorService executor = mock(ScheduledExecutorService.class);

when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false);
when(mockScheduledFuture1.isDone()).thenReturn(true);
when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
.thenReturn(mockScheduledFuture1)
.thenAnswer(invocation -> {
Object[] args = invocation.getArguments();
Runnable runnable = (Runnable) args[0];
runnable.run();
return mockScheduledFuture2;
});

EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
long timestamp = System.currentTimeMillis() + 10000;

ScheduledCallback<String> expectedScheduledCallback = mock(ScheduledCallback.class);
scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback);

// verify the interactions with the scheduled future and the scheduler
verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(), anyObject());
verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
verify(mockScheduledFuture1, times(1)).isDone();

// verify the ready timer and its callback contents to ensure the second invocation callback overwrites the
// first callback
Set<Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback>> readyTimers =
scheduler.removeReadyTimers().entrySet();
assertEquals("Only one timer should be ready to be fired", readyTimers.size(), 1);

Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback> timerEntry = readyTimers.iterator().next();
assertEquals("Expected the scheduled callback from the second invocation",
timerEntry.getValue(),
expectedScheduledCallback);
assertEquals("Expected timer-1 as the key for ready timer",
timerEntry.getKey().getKey(),
timerKey);
}

@Test
public void testSingleTimer() {
EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService());
Expand Down