diff --git a/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java index cbebbde69f..4b1e2812e1 100644 --- a/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java +++ b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java @@ -19,14 +19,15 @@ package org.apache.samza.scheduler; +import com.google.common.annotations.VisibleForTesting; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Per-task scheduler for keyed timers. @@ -36,7 +37,7 @@ * 3) triggers listener whenever a timer fires. */ public class EpochTimeScheduler { - + private static final Logger LOG = LoggerFactory.getLogger(EpochTimeScheduler.class); /** * For run loop to listen to timer firing so it can schedule the callbacks. */ @@ -57,9 +58,33 @@ private EpochTimeScheduler(ScheduledExecutorService executor) { this.executor = executor; } + @VisibleForTesting + Map getScheduledFutures() { + return scheduledFutures; + } + public void setTimer(K key, long timestamp, ScheduledCallback callback) { - checkState(!scheduledFutures.containsKey(key), - String.format("Duplicate key %s registration for the same timer", key)); + if (scheduledFutures.containsKey(key)) { + LOG.warn("Registering duplicate callback for key: {}. Attempting to cancel the previous callback", key); + ScheduledFuture scheduledFuture = scheduledFutures.get(key); + + /* + * We can have a race between the time we check for the presence of the key and the time we attempt to cancel; + * Hence we check for non-null criteria to ensure the executor hasn't kicked off the callback for the key which + * removes the future from the map before invoking onTimer. + * 1. In the event that callback is running then we will not attempt to interrupt the action and + * cancel will return as unsuccessful. + * 2. In case of the callback successfully executed, we want to allow duplicate registration to keep the + * behavior consistent with the scenario where the callback is already executed or in progress even before + * we entered this condition. + */ + if (scheduledFuture != null + && !scheduledFuture.cancel(false) + && !scheduledFuture.isDone()) { + LOG.warn("Failed to cancel the previous callback successfully. Ignoring the current request to register new callback"); + return; + } + } final long delay = timestamp - System.currentTimeMillis(); final ScheduledFuture scheduledFuture = executor.schedule(() -> { diff --git a/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java index 5db908c45e..4fd3dcf214 100644 --- a/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java +++ b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import org.apache.samza.task.MessageCollector; @@ -56,6 +58,125 @@ private void fireTimers(EpochTimeScheduler factory) { }); } + @Test + @SuppressWarnings("unchecked") + public void testDuplicateTimerWithCancelableCallback() { + final String timerKey = "timer-1"; + ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class); + ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class); + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + + when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(true); + when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject())) + .thenReturn(mockScheduledFuture1) + .thenAnswer(invocation -> { + Object[] args = invocation.getArguments(); + Runnable runnable = (Runnable) args[0]; + runnable.run(); + return mockScheduledFuture2; + }); + + EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor); + long timestamp = System.currentTimeMillis() + 10000; + + ScheduledCallback expectedScheduledCallback = mock(ScheduledCallback.class); + scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class)); + scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback); + + // verify the interactions with the scheduled future and the scheduler + verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(), anyObject()); + verify(mockScheduledFuture1, times(1)).cancel(anyBoolean()); + + // verify the ready timer and its callback contents to ensure the second invocation callback overwrites the + // first callback + Set, ScheduledCallback>> readyTimers = + scheduler.removeReadyTimers().entrySet(); + assertEquals("Only one timer should be ready to be fired", readyTimers.size(), 1); + + Map.Entry, ScheduledCallback> timerEntry = readyTimers.iterator().next(); + assertEquals("Expected the scheduled callback from the second invocation", + timerEntry.getValue(), + expectedScheduledCallback); + assertEquals("Expected timer-1 as the key for ready timer", + timerEntry.getKey().getKey(), + timerKey); + } + + @Test + @SuppressWarnings("unchecked") + public void testDuplicateTimerWithUnsuccessfulCancellation() { + final String timerKey = "timer-1"; + ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class); + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + + when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false); + when(mockScheduledFuture1.isDone()).thenReturn(false); + when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject())) + .thenReturn(mockScheduledFuture1); + + EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor); + long timestamp = System.currentTimeMillis() + 10000; + + scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class)); + scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class)); + + // verify the interactions with the scheduled future and the scheduler + verify(executor, times(1)).schedule((Runnable) anyObject(), anyLong(), anyObject()); + verify(mockScheduledFuture1, times(1)).cancel(anyBoolean()); + verify(mockScheduledFuture1, times(1)).isDone(); + + Map scheduledFutures = scheduler.getScheduledFutures(); + assertTrue("Expected the timer to be in the queue", scheduledFutures.containsKey(timerKey)); + assertEquals("Expected the scheduled callback from the first invocation", + scheduledFutures.get(timerKey), + mockScheduledFuture1); + } + + @Test + public void testDuplicateTimerWithFinishedCallbacks() { + final String timerKey = "timer-1"; + ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class); + ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class); + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + + when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false); + when(mockScheduledFuture1.isDone()).thenReturn(true); + when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject())) + .thenReturn(mockScheduledFuture1) + .thenAnswer(invocation -> { + Object[] args = invocation.getArguments(); + Runnable runnable = (Runnable) args[0]; + runnable.run(); + return mockScheduledFuture2; + }); + + EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor); + long timestamp = System.currentTimeMillis() + 10000; + + ScheduledCallback expectedScheduledCallback = mock(ScheduledCallback.class); + scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class)); + scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback); + + // verify the interactions with the scheduled future and the scheduler + verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(), anyObject()); + verify(mockScheduledFuture1, times(1)).cancel(anyBoolean()); + verify(mockScheduledFuture1, times(1)).isDone(); + + // verify the ready timer and its callback contents to ensure the second invocation callback overwrites the + // first callback + Set, ScheduledCallback>> readyTimers = + scheduler.removeReadyTimers().entrySet(); + assertEquals("Only one timer should be ready to be fired", readyTimers.size(), 1); + + Map.Entry, ScheduledCallback> timerEntry = readyTimers.iterator().next(); + assertEquals("Expected the scheduled callback from the second invocation", + timerEntry.getValue(), + expectedScheduledCallback); + assertEquals("Expected timer-1 as the key for ready timer", + timerEntry.getKey().getKey(), + timerKey); + } + @Test public void testSingleTimer() { EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService());