From e26f4075af6f2c990e23dc9f8fc8be2233652a9f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 15 Dec 2016 16:02:23 -0800 Subject: [PATCH] Remove deprecated methods of InMemoryTimerInternals --- .../util/state/InMemoryTimerInternals.java | 26 ------------ .../state/InMemoryTimerInternalsTest.java | 40 +++++++------------ 2 files changed, 15 insertions(+), 51 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java index 159b5830b642..44f901648ddc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -273,17 +273,6 @@ private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) { } } - /** Advances input watermark to the given value and fires event-time timers accordingly. - * - * @deprecated Use advanceInputWatermark without callback and fireEventTimers. - */ - @Deprecated - public void advanceInputWatermark( - TimerCallback timerCallback, Instant newInputWatermark) throws Exception { - advanceInputWatermark(newInputWatermark); - advanceAndFire(timerCallback, newInputWatermark, TimeDomain.EVENT_TIME); - } - /** Advances processing time to the given value and fires processing-time timers accordingly. * * @deprecated Use advanceProcessingTime without callback and fireProcessingTimers. @@ -295,21 +284,6 @@ public void advanceProcessingTime( advanceAndFire(timerCallback, newProcessingTime, TimeDomain.PROCESSING_TIME); } - /** - * Advances synchronized processing time to the given value and fires processing-time timers - * accordingly. - * - * @deprecated Use advanceInputWatermark without callback and fireSynchronizedProcessingTimers. - */ - @Deprecated - public void advanceSynchronizedProcessingTime( - TimerCallback timerCallback, Instant newSynchronizedProcessingTime) - throws Exception { - advanceSynchronizedProcessingTime(newSynchronizedProcessingTime); - advanceAndFire( - timerCallback, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - } - @Deprecated private void advanceAndFire( TimerCallback timerCallback, Instant currentTime, TimeDomain domain) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java index 1e42864bf3ed..4a2763ccc76e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java @@ -17,20 +17,18 @@ */ package org.apache.beam.sdk.util.state; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.times; +import static org.junit.Assert.assertThat; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.joda.time.Instant; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; /** * Tests for {@link InMemoryTimerInternals}. @@ -40,14 +38,6 @@ public class InMemoryTimerInternalsTest { private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); - @Mock - private TimerCallback timerCallback; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - @Test public void testFiringTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); @@ -85,24 +75,24 @@ public void testFiringTimersWithCallback() throws Exception { underTest.setTimer(processingTime1); underTest.setTimer(processingTime2); - underTest.advanceProcessingTime(timerCallback, new Instant(20)); - Mockito.verify(timerCallback).onTimer(processingTime1); - Mockito.verifyNoMoreInteractions(timerCallback); + underTest.advanceProcessingTime(new Instant(20)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); // Advancing just a little shouldn't refire - underTest.advanceProcessingTime(timerCallback, new Instant(21)); - Mockito.verifyNoMoreInteractions(timerCallback); + underTest.advanceProcessingTime(new Instant(21)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); - // Adding the timer and advancing a little should refire + // Adding the timer and advancing a little should fire again underTest.setTimer(processingTime1); - underTest.advanceProcessingTime(timerCallback, new Instant(21)); - Mockito.verify(timerCallback, times(2)).onTimer(processingTime1); - Mockito.verifyNoMoreInteractions(timerCallback); + underTest.advanceProcessingTime(new Instant(21)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); // And advancing the rest of the way should still have the other timer - underTest.advanceProcessingTime(timerCallback, new Instant(30)); - Mockito.verify(timerCallback).onTimer(processingTime2); - Mockito.verifyNoMoreInteractions(timerCallback); + underTest.advanceProcessingTime(new Instant(30)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); } @Test