Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,6 @@ private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
}
}

/** Advances input watermark to the given value and fires event-time timers accordingly.
*
* @deprecated Use advanceInputWatermark without callback and fireEventTimers.
*/
@Deprecated
public void advanceInputWatermark(
TimerCallback timerCallback, Instant newInputWatermark) throws Exception {
advanceInputWatermark(newInputWatermark);
advanceAndFire(timerCallback, newInputWatermark, TimeDomain.EVENT_TIME);
}

/** Advances processing time to the given value and fires processing-time timers accordingly.
*
* @deprecated Use advanceProcessingTime without callback and fireProcessingTimers.
Expand All @@ -295,21 +284,6 @@ public void advanceProcessingTime(
advanceAndFire(timerCallback, newProcessingTime, TimeDomain.PROCESSING_TIME);
}

/**
* Advances synchronized processing time to the given value and fires processing-time timers
* accordingly.
*
* @deprecated Use advanceInputWatermark without callback and fireSynchronizedProcessingTimers.
*/
@Deprecated
public void advanceSynchronizedProcessingTime(
TimerCallback timerCallback, Instant newSynchronizedProcessingTime)
throws Exception {
advanceSynchronizedProcessingTime(newSynchronizedProcessingTime);
advanceAndFire(
timerCallback, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
}

@Deprecated
private void advanceAndFire(
TimerCallback timerCallback, Instant currentTime, TimeDomain domain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@
*/
package org.apache.beam.sdk.util.state;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.times;
import static org.junit.Assert.assertThat;

import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/**
* Tests for {@link InMemoryTimerInternals}.
Expand All @@ -40,14 +38,6 @@ public class InMemoryTimerInternalsTest {

private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");

@Mock
private TimerCallback timerCallback;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}

@Test
public void testFiringTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
Expand Down Expand Up @@ -85,24 +75,24 @@ public void testFiringTimersWithCallback() throws Exception {
underTest.setTimer(processingTime1);
underTest.setTimer(processingTime2);

underTest.advanceProcessingTime(timerCallback, new Instant(20));
Mockito.verify(timerCallback).onTimer(processingTime1);
Mockito.verifyNoMoreInteractions(timerCallback);
underTest.advanceProcessingTime(new Instant(20));
assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
assertThat(underTest.removeNextProcessingTimer(), nullValue());

// Advancing just a little shouldn't refire
underTest.advanceProcessingTime(timerCallback, new Instant(21));
Mockito.verifyNoMoreInteractions(timerCallback);
underTest.advanceProcessingTime(new Instant(21));
assertThat(underTest.removeNextProcessingTimer(), nullValue());

// Adding the timer and advancing a little should refire
// Adding the timer and advancing a little should fire again
underTest.setTimer(processingTime1);
underTest.advanceProcessingTime(timerCallback, new Instant(21));
Mockito.verify(timerCallback, times(2)).onTimer(processingTime1);
Mockito.verifyNoMoreInteractions(timerCallback);
underTest.advanceProcessingTime(new Instant(21));
assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
assertThat(underTest.removeNextProcessingTimer(), nullValue());

// And advancing the rest of the way should still have the other timer
underTest.advanceProcessingTime(timerCallback, new Instant(30));
Mockito.verify(timerCallback).onTimer(processingTime2);
Mockito.verifyNoMoreInteractions(timerCallback);
underTest.advanceProcessingTime(new Instant(30));
assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2));
assertThat(underTest.removeNextProcessingTimer(), nullValue());
}

@Test
Expand Down