From cb3f488877b7fb2ab0dfcc5c24fe53035bc765e7 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Tue, 19 Dec 2017 16:10:44 -0800 Subject: [PATCH 1/3] [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection Reflection was mainly used to inject mocks into private fields of the FlinkKafkaConsumerBase, without the need to fully execute all operator life cycle methods. This, however, caused the unit tests to be too implementation-specific. This commit reworks the FlinkKafkaConsumerBaseTest to remove test consumer instantiation methods that rely on reflection for dependency injection. All tests now instantiate dummy test consumers normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. --- .../kafka/FlinkKafkaConsumerBase.java | 5 + .../kafka/FlinkKafkaConsumerBaseTest.java | 447 ++++++++++-------- 2 files changed, 245 insertions(+), 207 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 865d66c3c1412..44e0d5486f953 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -882,4 +882,9 @@ TreeMap getRestoredState() { OffsetCommitMode getOffsetCommitMode() { return offsetCommitMode; } + + @VisibleForTesting + LinkedMap getPendingOffsetsToCommit() { + return pendingOffsetsToCommit; + } } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 168cfd5c94af8..cc251364bd6d9 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -23,7 +23,10 @@ import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; @@ -44,16 +47,13 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; -import org.apache.commons.collections.map.LinkedMap; import org.junit.Assert; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; import java.io.Serializable; -import java.lang.reflect.Field; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -86,30 +86,33 @@ public class FlinkKafkaConsumerBaseTest { * Tests that not both types of timestamp extractors / watermark generators can be used. */ @Test + @SuppressWarnings("unchecked") public void testEitherWatermarkExtractor() { try { - new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null); + new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + .assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null); fail(); } catch (NullPointerException ignored) {} try { - new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null); + new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + .assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null); fail(); } catch (NullPointerException ignored) {} - @SuppressWarnings("unchecked") final AssignerWithPeriodicWatermarks periodicAssigner = mock(AssignerWithPeriodicWatermarks.class); - @SuppressWarnings("unchecked") final AssignerWithPunctuatedWatermarks punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); - DummyFlinkKafkaConsumer c1 = new DummyFlinkKafkaConsumer<>(); + DummyFlinkKafkaConsumer c1 = + new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); c1.assignTimestampsAndWatermarks(periodicAssigner); try { c1.assignTimestampsAndWatermarks(punctuatedAssigner); fail(); } catch (IllegalStateException ignored) {} - DummyFlinkKafkaConsumer c2 = new DummyFlinkKafkaConsumer<>(); + DummyFlinkKafkaConsumer c2 = + new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); c2.assignTimestampsAndWatermarks(punctuatedAssigner); try { c2.assignTimestampsAndWatermarks(periodicAssigner); @@ -123,17 +126,17 @@ public void testEitherWatermarkExtractor() { @Test public void ignoreCheckpointWhenNotRunning() throws Exception { @SuppressWarnings("unchecked") - final AbstractFetcher fetcher = mock(AbstractFetcher.class); + final FlinkKafkaConsumerBase consumer = + new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); - FlinkKafkaConsumerBase consumer = getConsumer(fetcher, new LinkedMap(), false); - OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - TestingListState> listState = new TestingListState<>(); - when(operatorStateStore.getListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + final TestingListState> listState = new TestingListState<>(); + setupConsumer(consumer, false, listState, true, 0, 1); + // snapshot before the fetcher starts running consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1)); + // no state should have been checkpointed assertFalse(listState.get().iterator().hasNext()); - consumer.notifyCheckpointComplete(66L); } /** @@ -142,32 +145,16 @@ public void ignoreCheckpointWhenNotRunning() throws Exception { */ @Test public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception { - OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - - TestingListState restoredListState = new TestingListState<>(); - restoredListState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L)); - restoredListState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L)); - - FlinkKafkaConsumerBase consumer = getConsumer(null, new LinkedMap(), true); - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); - when(context.getNumberOfParallelSubtasks()).thenReturn(1); - when(context.getIndexOfThisSubtask()).thenReturn(0); - consumer.setRuntimeContext(context); - - // mock old 1.2 state (empty) - when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState()); - // mock 1.3 state - when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(restoredListState); - - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); - when(initializationContext.isRestored()).thenReturn(true); - - consumer.initializeState(initializationContext); + @SuppressWarnings("unchecked") + final FlinkKafkaConsumerBase consumer = new DummyFlinkKafkaConsumer<>( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + false); - consumer.open(new Configuration()); + final TestingListState> restoredListState = new TestingListState<>(); + setupConsumer(consumer, true, restoredListState, true, 0, 1); + // snapshot before the fetcher starts running consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17)); // ensure that the list was cleared and refilled. while this is an implementation detail, we use it here @@ -192,67 +179,78 @@ public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception { @Test public void testConfigureOnCheckpointsCommitMode() throws Exception { + @SuppressWarnings("unchecked") + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + true); // auto-commit enabled; this should be ignored in this case + + setupConsumer( + consumer, + false, + null, + true, // enable checkpointing; auto commit should be ignored + 0, + 1); - DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(); - consumer.setIsAutoCommitEnabled(true); // this should be ignored - - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); - when(context.getIndexOfThisSubtask()).thenReturn(0); - when(context.getNumberOfParallelSubtasks()).thenReturn(1); - when(context.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing, auto commit should be ignored - consumer.setRuntimeContext(context); - - consumer.open(new Configuration()); assertEquals(OffsetCommitMode.ON_CHECKPOINTS, consumer.getOffsetCommitMode()); } @Test public void testConfigureAutoCommitMode() throws Exception { + @SuppressWarnings("unchecked") + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + true); + + setupConsumer( + consumer, + false, + null, + false, // disable checkpointing; auto commit should be respected + 0, + 1); - DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(); - consumer.setIsAutoCommitEnabled(true); - - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); - when(context.getIndexOfThisSubtask()).thenReturn(0); - when(context.getNumberOfParallelSubtasks()).thenReturn(1); - when(context.isCheckpointingEnabled()).thenReturn(false); // disable checkpointing, auto commit should be respected - consumer.setRuntimeContext(context); - - consumer.open(new Configuration()); assertEquals(OffsetCommitMode.KAFKA_PERIODIC, consumer.getOffsetCommitMode()); } @Test public void testConfigureDisableOffsetCommitWithCheckpointing() throws Exception { - - DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(); - consumer.setIsAutoCommitEnabled(true); // this should be ignored - - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); - when(context.getIndexOfThisSubtask()).thenReturn(0); - when(context.getNumberOfParallelSubtasks()).thenReturn(1); - when(context.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing, auto commit should be ignored - consumer.setRuntimeContext(context); - + @SuppressWarnings("unchecked") + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + true); // auto-commit enabled; this should be ignored in this case consumer.setCommitOffsetsOnCheckpoints(false); // disabling offset committing should override everything - consumer.open(new Configuration()); + setupConsumer( + consumer, + false, + null, + true, // enable checkpointing; auto commit should be ignored + 0, + 1); + assertEquals(OffsetCommitMode.DISABLED, consumer.getOffsetCommitMode()); } @Test public void testConfigureDisableOffsetCommitWithoutCheckpointing() throws Exception { + @SuppressWarnings("unchecked") + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + false); + + setupConsumer( + consumer, + false, + null, + false, // disable checkpointing; auto commit should be respected + 0, + 1); - DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(); - consumer.setIsAutoCommitEnabled(false); - - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); - when(context.getIndexOfThisSubtask()).thenReturn(0); - when(context.getNumberOfParallelSubtasks()).thenReturn(1); - when(context.isCheckpointingEnabled()).thenReturn(false); // disable checkpointing, auto commit should be respected - consumer.setRuntimeContext(context); - - consumer.open(new Configuration()); assertEquals(OffsetCommitMode.DISABLED, consumer.getOffsetCommitMode()); } @@ -278,36 +276,37 @@ public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception { // -------------------------------------------------------------------- - final AbstractFetcher fetcher = mock(AbstractFetcher.class); + final OneShotLatch runLatch = new OneShotLatch(); + final OneShotLatch stopLatch = new OneShotLatch(); + final AbstractFetcher fetcher = getRunnableMockFetcher(runLatch, stopLatch); when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3); - final LinkedMap pendingOffsetsToCommit = new LinkedMap(); - - FlinkKafkaConsumerBase consumer = getConsumer(fetcher, pendingOffsetsToCommit, true); - StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class); - when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing - when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0); - when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1); - consumer.setRuntimeContext(mockRuntimeContext); - - assertEquals(0, pendingOffsetsToCommit.size()); + final FlinkKafkaConsumerBase consumer = new DummyFlinkKafkaConsumer<>( + fetcher, + mock(AbstractPartitionDiscoverer.class), + false); - OperatorStateStore backend = mock(OperatorStateStore.class); + final TestingListState listState = new TestingListState<>(); - TestingListState listState = new TestingListState<>(); - // mock old 1.2 state (empty) - when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState()); - // mock 1.3 state - when(backend.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + // setup and run the consumer; wait until the consumer reaches the main fetch loop before continuing test + setupConsumer(consumer, false, listState, true, 0, 1); - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(backend); - when(initializationContext.isRestored()).thenReturn(false, true, true, true); + final CheckedThread runThread = new CheckedThread() { + @Override + public void go() throws Exception { + consumer.run(mock(SourceFunction.SourceContext.class)); + } - consumer.initializeState(initializationContext); + @Override + public void sync() throws Exception { + stopLatch.trigger(); + super.sync(); + } + }; + runThread.start(); + runLatch.await(); - consumer.open(new Configuration()); + assertEquals(0, consumer.getPendingOffsetsToCommit().size()); // checkpoint 1 consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138)); @@ -320,8 +319,8 @@ public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception { } assertEquals(state1, snapshot1); - assertEquals(1, pendingOffsetsToCommit.size()); - assertEquals(state1, pendingOffsetsToCommit.get(138L)); + assertEquals(1, consumer.getPendingOffsetsToCommit().size()); + assertEquals(state1, consumer.getPendingOffsetsToCommit().get(138L)); // checkpoint 2 consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140)); @@ -334,13 +333,13 @@ public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception { } assertEquals(state2, snapshot2); - assertEquals(2, pendingOffsetsToCommit.size()); - assertEquals(state2, pendingOffsetsToCommit.get(140L)); + assertEquals(2, consumer.getPendingOffsetsToCommit().size()); + assertEquals(state2, consumer.getPendingOffsetsToCommit().get(140L)); // ack checkpoint 1 consumer.notifyCheckpointComplete(138L); - assertEquals(1, pendingOffsetsToCommit.size()); - assertTrue(pendingOffsetsToCommit.containsKey(140L)); + assertEquals(1, consumer.getPendingOffsetsToCommit().size()); + assertTrue(consumer.getPendingOffsetsToCommit().containsKey(140L)); // checkpoint 3 consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141)); @@ -353,37 +352,35 @@ public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception { } assertEquals(state3, snapshot3); - assertEquals(2, pendingOffsetsToCommit.size()); - assertEquals(state3, pendingOffsetsToCommit.get(141L)); + assertEquals(2, consumer.getPendingOffsetsToCommit().size()); + assertEquals(state3, consumer.getPendingOffsetsToCommit().get(141L)); // ack checkpoint 3, subsumes number 2 consumer.notifyCheckpointComplete(141L); - assertEquals(0, pendingOffsetsToCommit.size()); + assertEquals(0, consumer.getPendingOffsetsToCommit().size()); consumer.notifyCheckpointComplete(666); // invalid checkpoint - assertEquals(0, pendingOffsetsToCommit.size()); - - OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - listState = new TestingListState<>(); - when(operatorStateStore.getListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + assertEquals(0, consumer.getPendingOffsetsToCommit().size()); // create 500 snapshots for (int i = 100; i < 600; i++) { consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i)); listState.clear(); } - assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingOffsetsToCommit.size()); + assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, consumer.getPendingOffsetsToCommit().size()); // commit only the second last consumer.notifyCheckpointComplete(598); - assertEquals(1, pendingOffsetsToCommit.size()); + assertEquals(1, consumer.getPendingOffsetsToCommit().size()); // access invalid checkpoint consumer.notifyCheckpointComplete(590); // and the last consumer.notifyCheckpointComplete(599); - assertEquals(0, pendingOffsetsToCommit.size()); + assertEquals(0, consumer.getPendingOffsetsToCommit().size()); + + runThread.sync(); } @Test @@ -407,38 +404,38 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception // -------------------------------------------------------------------- - final AbstractFetcher fetcher = mock(AbstractFetcher.class); + final OneShotLatch runLatch = new OneShotLatch(); + final OneShotLatch stopLatch = new OneShotLatch(); + final AbstractFetcher fetcher = getRunnableMockFetcher(runLatch, stopLatch); when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3); - final LinkedMap pendingOffsetsToCommit = new LinkedMap(); - - FlinkKafkaConsumerBase consumer = getConsumer(fetcher, pendingOffsetsToCommit, true); - StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class); - when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing - when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0); - when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1); - consumer.setRuntimeContext(mockRuntimeContext); - + final FlinkKafkaConsumerBase consumer = new DummyFlinkKafkaConsumer<>( + fetcher, + mock(AbstractPartitionDiscoverer.class), + false); consumer.setCommitOffsetsOnCheckpoints(false); // disable offset committing - assertEquals(0, pendingOffsetsToCommit.size()); - - OperatorStateStore backend = mock(OperatorStateStore.class); + final TestingListState listState = new TestingListState<>(); - TestingListState listState = new TestingListState<>(); - // mock old 1.2 state (empty) - when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState()); - // mock 1.3 state - when(backend.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + // setup and run the consumer; wait until the consumer reaches the main fetch loop before continuing test + setupConsumer(consumer, false, listState, true, 0, 1); - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(backend); - when(initializationContext.isRestored()).thenReturn(false, true, true, true); + final CheckedThread runThread = new CheckedThread() { + @Override + public void go() throws Exception { + consumer.run(mock(SourceFunction.SourceContext.class)); + } - consumer.initializeState(initializationContext); + @Override + public void sync() throws Exception { + stopLatch.trigger(); + super.sync(); + } + }; + runThread.start(); + runLatch.await(); - consumer.open(new Configuration()); + assertEquals(0, consumer.getPendingOffsetsToCommit().size()); // checkpoint 1 consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138)); @@ -451,7 +448,7 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception } assertEquals(state1, snapshot1); - assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated + assertEquals(0, consumer.getPendingOffsetsToCommit().size()); // pending offsets to commit should not be updated // checkpoint 2 consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140)); @@ -464,7 +461,7 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception } assertEquals(state2, snapshot2); - assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated + assertEquals(0, consumer.getPendingOffsetsToCommit().size()); // pending offsets to commit should not be updated // ack checkpoint 1 consumer.notifyCheckpointComplete(138L); @@ -481,7 +478,7 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception } assertEquals(state3, snapshot3); - assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated + assertEquals(0, consumer.getPendingOffsetsToCommit().size()); // pending offsets to commit should not be updated // ack checkpoint 3, subsumes number 2 consumer.notifyCheckpointComplete(141L); @@ -490,16 +487,12 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception consumer.notifyCheckpointComplete(666); // invalid checkpoint verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed - OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - listState = new TestingListState<>(); - when(operatorStateStore.getListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); - // create 500 snapshots for (int i = 100; i < 600; i++) { consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i)); listState.clear(); } - assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated + assertEquals(0, consumer.getPendingOffsetsToCommit().size()); // pending offsets to commit should not be updated // commit only the second last consumer.notifyCheckpointComplete(598); @@ -532,7 +525,7 @@ public void testScaleDown() throws Exception { * of topics fetched from Kafka. */ @SuppressWarnings("unchecked") - void testRescaling( + private void testRescaling( final int initialParallelism, final int numPartitions, final int restoredParallelism, @@ -554,8 +547,14 @@ void testRescaling( new AbstractStreamOperatorTestHarness[initialParallelism]; for (int i = 0; i < initialParallelism; i++) { - consumers[i] = new DummyFlinkKafkaConsumer<>( - Collections.singletonList("test-topic"), mockFetchedPartitionsOnStartup); + TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer( + new KafkaTopicsDescriptor(Collections.singletonList("test-topic"), null), + i, + initialParallelism, + TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList("test-topic")), + TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockFetchedPartitionsOnStartup)); + + consumers[i] = new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), partitionDiscoverer, false); testHarnesses[i] = createTestHarness(consumers[i], initialParallelism, i); // initializeState() is always called, null signals that we didn't restore @@ -602,8 +601,14 @@ void testRescaling( new AbstractStreamOperatorTestHarness[restoredParallelism]; for (int i = 0; i < restoredParallelism; i++) { - restoredConsumers[i] = new DummyFlinkKafkaConsumer<>( - Collections.singletonList("test-topic"), mockFetchedPartitionsAfterRestore); + TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer( + new KafkaTopicsDescriptor(Collections.singletonList("test-topic"), null), + i, + restoredParallelism, + TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList("test-topic")), + TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockFetchedPartitionsAfterRestore)); + + restoredConsumers[i] = new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), partitionDiscoverer, false); restoredTestHarnesses[i] = createTestHarness(restoredConsumers[i], restoredParallelism, i); // initializeState() is always called, null signals that we didn't restore @@ -630,28 +635,6 @@ void testRescaling( // ------------------------------------------------------------------------ - private static FlinkKafkaConsumerBase getConsumer( - AbstractFetcher fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception { - FlinkKafkaConsumerBase consumer = new DummyFlinkKafkaConsumer<>(); - StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class); - Mockito.when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); - consumer.setRuntimeContext(mockRuntimeContext); - - Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher"); - fetcherField.setAccessible(true); - fetcherField.set(consumer, fetcher); - - Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit"); - mapField.setAccessible(true); - mapField.set(consumer, pendingOffsetsToCommit); - - Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running"); - runningField.setAccessible(true); - runningField.set(consumer, running); - - return consumer; - } - private static AbstractStreamOperatorTestHarness createTestHarness( SourceFunction source, int numSubtasks, int subtaskIndex) throws Exception { @@ -667,25 +650,33 @@ private static AbstractStreamOperatorTestHarness createTestHarness( // ------------------------------------------------------------------------ + /** + * An instantiable dummy {@link FlinkKafkaConsumerBase} that supports injecting + * mocks for {@link FlinkKafkaConsumerBase#kafkaFetcher}, {@link FlinkKafkaConsumerBase#partitionDiscoverer}, + * and {@link FlinkKafkaConsumerBase#getIsAutoCommitEnabled()}. + */ private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase { private static final long serialVersionUID = 1L; - boolean isAutoCommitEnabled = false; - - private List fixedMockGetAllTopicsReturnSequence; - private List fixedMockGetAllPartitionsForTopicsReturnSequence; - - public DummyFlinkKafkaConsumer() { - this(Collections.singletonList("dummy-topic"), Collections.singletonList(new KafkaTopicPartition("dummy-topic", 0))); - } + private AbstractFetcher testFetcher; + private AbstractPartitionDiscoverer testPartitionDiscoverer; + private boolean isAutoCommitEnabled; @SuppressWarnings("unchecked") - public DummyFlinkKafkaConsumer( - List fixedMockGetAllTopicsReturnSequence, - List fixedMockGetAllPartitionsForTopicsReturnSequence) { - super(Arrays.asList("dummy-topic"), null, (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class), 0); - this.fixedMockGetAllTopicsReturnSequence = Preconditions.checkNotNull(fixedMockGetAllTopicsReturnSequence); - this.fixedMockGetAllPartitionsForTopicsReturnSequence = Preconditions.checkNotNull(fixedMockGetAllPartitionsForTopicsReturnSequence); + DummyFlinkKafkaConsumer( + AbstractFetcher testFetcher, + AbstractPartitionDiscoverer testPartitionDiscoverer, + boolean isAutoCommitEnabled) { + + super( + Collections.singletonList("dummy-topic"), + null, + (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class), + PARTITION_DISCOVERY_DISABLED); + + this.testFetcher = testFetcher; + this.testPartitionDiscoverer = testPartitionDiscoverer; + this.isAutoCommitEnabled = isAutoCommitEnabled; } @Override @@ -697,7 +688,7 @@ public DummyFlinkKafkaConsumer( SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode) throws Exception { - return mock(AbstractFetcher.class); + return this.testFetcher; } @Override @@ -705,21 +696,12 @@ protected AbstractPartitionDiscoverer createPartitionDiscoverer( KafkaTopicsDescriptor topicsDescriptor, int indexOfThisSubtask, int numParallelSubtasks) { - return new TestPartitionDiscoverer( - topicsDescriptor, - indexOfThisSubtask, - numParallelSubtasks, - TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(fixedMockGetAllTopicsReturnSequence), - TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(fixedMockGetAllPartitionsForTopicsReturnSequence)); + return this.testPartitionDiscoverer; } @Override protected boolean getIsAutoCommitEnabled() { - return isAutoCommitEnabled; - } - - public void setIsAutoCommitEnabled(boolean isAutoCommitEnabled) { - this.isAutoCommitEnabled = isAutoCommitEnabled; + return this.isAutoCommitEnabled; } } @@ -748,8 +730,59 @@ public List getList() { return list; } - public boolean isClearCalled() { + boolean isClearCalled() { return clearCalled; } } + + /** + * Returns a mock {@link AbstractFetcher}, with run / stop latches injected in + * the {@link AbstractFetcher#runFetchLoop()} method. + */ + private static AbstractFetcher getRunnableMockFetcher( + OneShotLatch runLatch, + OneShotLatch stopLatch) throws Exception { + + @SuppressWarnings("unchecked") + final AbstractFetcher fetcher = mock(AbstractFetcher.class); + + Mockito.doAnswer(invocationOnMock -> { + runLatch.trigger(); + stopLatch.await(); + return null; + }).when(fetcher).runFetchLoop(); + + return fetcher; + } + + @SuppressWarnings("unchecked") + private static void setupConsumer( + FlinkKafkaConsumerBase consumer, + boolean isRestored, + ListState restoredListState, + boolean isCheckpointingEnabled, + int subtaskIndex, + int totalNumSubtasks) throws Exception { + + final StreamingRuntimeContext runtimeContext = Mockito.mock(StreamingRuntimeContext.class); + when(runtimeContext.isCheckpointingEnabled()).thenReturn(isCheckpointingEnabled); + when(runtimeContext.getIndexOfThisSubtask()).thenReturn(subtaskIndex); + when(runtimeContext.getNumberOfParallelSubtasks()).thenReturn(totalNumSubtasks); + when(runtimeContext.getMetricGroup()).thenReturn(mock(MetricGroup.class)); + + final OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + when(operatorStateStore.getUnionListState((ListStateDescriptor) Matchers.any(ListStateDescriptor.class))) + .thenReturn(restoredListState); + // also mock the legacy 1.2 Kafka consumer state (return empty state) + when(operatorStateStore.getSerializableListState(Matchers.anyString())).thenReturn(new TestingListState<>()); + + final FunctionInitializationContext initializationContext = Mockito.mock(FunctionInitializationContext.class); + when(initializationContext.isRestored()).thenReturn(isRestored); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + + // run setup procedure in operator life cycle + consumer.setRuntimeContext(runtimeContext); + consumer.initializeState(initializationContext); + consumer.open(new Configuration()); + } } From 689807119bba4e2c7fd0be54d12708fa847bbe11 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Thu, 11 Jan 2018 21:11:38 +0800 Subject: [PATCH 2/3] fixup! Use proper mocks for consumer setup --- .../kafka/FlinkKafkaConsumerBaseTest.java | 143 +++++++++++++++--- 1 file changed, 126 insertions(+), 17 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index cc251364bd6d9..b0a6f8a53c36e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -18,6 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -26,12 +29,16 @@ import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; @@ -764,25 +771,127 @@ private static void setupConsumer( int subtaskIndex, int totalNumSubtasks) throws Exception { - final StreamingRuntimeContext runtimeContext = Mockito.mock(StreamingRuntimeContext.class); - when(runtimeContext.isCheckpointingEnabled()).thenReturn(isCheckpointingEnabled); - when(runtimeContext.getIndexOfThisSubtask()).thenReturn(subtaskIndex); - when(runtimeContext.getNumberOfParallelSubtasks()).thenReturn(totalNumSubtasks); - when(runtimeContext.getMetricGroup()).thenReturn(mock(MetricGroup.class)); + // run setup procedure in operator life cycle + consumer.setRuntimeContext(new MockRuntimeContext(isCheckpointingEnabled, totalNumSubtasks, subtaskIndex)); + consumer.initializeState(new MockFunctionInitializationContext(isRestored, new MockOperatorStateStore(restoredListState))); + consumer.open(new Configuration()); + } - final OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - when(operatorStateStore.getUnionListState((ListStateDescriptor) Matchers.any(ListStateDescriptor.class))) - .thenReturn(restoredListState); - // also mock the legacy 1.2 Kafka consumer state (return empty state) - when(operatorStateStore.getSerializableListState(Matchers.anyString())).thenReturn(new TestingListState<>()); + private static class MockRuntimeContext extends StreamingRuntimeContext { - final FunctionInitializationContext initializationContext = Mockito.mock(FunctionInitializationContext.class); - when(initializationContext.isRestored()).thenReturn(isRestored); - when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + private final boolean isCheckpointingEnabled; - // run setup procedure in operator life cycle - consumer.setRuntimeContext(runtimeContext); - consumer.initializeState(initializationContext); - consumer.open(new Configuration()); + private final int numParallelSubtasks; + private final int subtaskIndex; + + private MockRuntimeContext( + boolean isCheckpointingEnabled, + int numParallelSubtasks, + int subtaskIndex) { + + super( + new MockStreamOperator(), + new MockEnvironment("mockTask", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), + Collections.>emptyMap()); + + this.isCheckpointingEnabled = isCheckpointingEnabled; + this.numParallelSubtasks = numParallelSubtasks; + this.subtaskIndex = subtaskIndex; + } + + @Override + public MetricGroup getMetricGroup() { + return new UnregisteredMetricsGroup(); + } + + @Override + public boolean isCheckpointingEnabled() { + return isCheckpointingEnabled; + } + + @Override + public int getIndexOfThisSubtask() { + return subtaskIndex; + } + + @Override + public int getNumberOfParallelSubtasks() { + return numParallelSubtasks; + } + + // ------------------------------------------------------------------------ + + private static class MockStreamOperator extends AbstractStreamOperator { + private static final long serialVersionUID = -1153976702711944427L; + + @Override + public ExecutionConfig getExecutionConfig() { + return new ExecutionConfig(); + } + } + } + + private static class MockOperatorStateStore implements OperatorStateStore { + + private final ListState mockRestoredUnionListState; + + private MockOperatorStateStore(ListState restoredUnionListState) { + this.mockRestoredUnionListState = restoredUnionListState; + } + + @Override + @SuppressWarnings("unchecked") + public ListState getUnionListState(ListStateDescriptor stateDescriptor) throws Exception { + return (ListState) mockRestoredUnionListState; + } + + @Override + public ListState getSerializableListState(String stateName) throws Exception { + // return empty state for the legacy 1.2 Kafka consumer state + return new TestingListState<>(); + } + + // ------------------------------------------------------------------------ + + @Override + public ListState getOperatorState(ListStateDescriptor stateDescriptor) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public ListState getListState(ListStateDescriptor stateDescriptor) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredStateNames() { + throw new UnsupportedOperationException(); + } + } + + private static class MockFunctionInitializationContext implements FunctionInitializationContext { + + private final boolean isRestored; + private final OperatorStateStore operatorStateStore; + + private MockFunctionInitializationContext(boolean isRestored, OperatorStateStore operatorStateStore) { + this.isRestored = isRestored; + this.operatorStateStore = operatorStateStore; + } + + @Override + public boolean isRestored() { + return isRestored; + } + + @Override + public OperatorStateStore getOperatorStateStore() { + return operatorStateStore; + } + + @Override + public KeyedStateStore getKeyedStateStore() { + throw new UnsupportedOperationException(); + } } } From 40bed45a6bbc42e0ae290c67cdda397328249e0f Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Thu, 11 Jan 2018 21:20:10 +0800 Subject: [PATCH 3/3] fixup! Add convenience DummyFlinkKafkaConsumer constructors to deduplicate code --- .../kafka/FlinkKafkaConsumerBaseTest.java | 52 ++++++++----------- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index b0a6f8a53c36e..fcfdd36481f35 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -96,30 +96,26 @@ public class FlinkKafkaConsumerBaseTest { @SuppressWarnings("unchecked") public void testEitherWatermarkExtractor() { try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) - .assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null); + new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null); fail(); } catch (NullPointerException ignored) {} try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) - .assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null); + new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null); fail(); } catch (NullPointerException ignored) {} final AssignerWithPeriodicWatermarks periodicAssigner = mock(AssignerWithPeriodicWatermarks.class); final AssignerWithPunctuatedWatermarks punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); - DummyFlinkKafkaConsumer c1 = - new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); + DummyFlinkKafkaConsumer c1 = new DummyFlinkKafkaConsumer<>(); c1.assignTimestampsAndWatermarks(periodicAssigner); try { c1.assignTimestampsAndWatermarks(punctuatedAssigner); fail(); } catch (IllegalStateException ignored) {} - DummyFlinkKafkaConsumer c2 = - new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); + DummyFlinkKafkaConsumer c2 = new DummyFlinkKafkaConsumer<>(); c2.assignTimestampsAndWatermarks(punctuatedAssigner); try { c2.assignTimestampsAndWatermarks(periodicAssigner); @@ -133,8 +129,7 @@ public void testEitherWatermarkExtractor() { @Test public void ignoreCheckpointWhenNotRunning() throws Exception { @SuppressWarnings("unchecked") - final FlinkKafkaConsumerBase consumer = - new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); + final FlinkKafkaConsumerBase consumer = new DummyFlinkKafkaConsumer<>(); final TestingListState> listState = new TestingListState<>(); setupConsumer(consumer, false, listState, true, 0, 1); @@ -153,10 +148,7 @@ public void ignoreCheckpointWhenNotRunning() throws Exception { @Test public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception { @SuppressWarnings("unchecked") - final FlinkKafkaConsumerBase consumer = new DummyFlinkKafkaConsumer<>( - mock(AbstractFetcher.class), - mock(AbstractPartitionDiscoverer.class), - false); + final FlinkKafkaConsumerBase consumer = new DummyFlinkKafkaConsumer<>(); final TestingListState> restoredListState = new TestingListState<>(); setupConsumer(consumer, true, restoredListState, true, 0, 1); @@ -187,10 +179,8 @@ public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception { @Test public void testConfigureOnCheckpointsCommitMode() throws Exception { @SuppressWarnings("unchecked") - final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>( - mock(AbstractFetcher.class), - mock(AbstractPartitionDiscoverer.class), - true); // auto-commit enabled; this should be ignored in this case + // auto-commit enabled; this should be ignored in this case + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>(true); setupConsumer( consumer, @@ -206,10 +196,7 @@ public void testConfigureOnCheckpointsCommitMode() throws Exception { @Test public void testConfigureAutoCommitMode() throws Exception { @SuppressWarnings("unchecked") - final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>( - mock(AbstractFetcher.class), - mock(AbstractPartitionDiscoverer.class), - true); + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>(true); setupConsumer( consumer, @@ -225,10 +212,8 @@ public void testConfigureAutoCommitMode() throws Exception { @Test public void testConfigureDisableOffsetCommitWithCheckpointing() throws Exception { @SuppressWarnings("unchecked") - final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>( - mock(AbstractFetcher.class), - mock(AbstractPartitionDiscoverer.class), - true); // auto-commit enabled; this should be ignored in this case + // auto-commit enabled; this should be ignored in this case + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>(true); consumer.setCommitOffsetsOnCheckpoints(false); // disabling offset committing should override everything setupConsumer( @@ -245,10 +230,7 @@ public void testConfigureDisableOffsetCommitWithCheckpointing() throws Exception @Test public void testConfigureDisableOffsetCommitWithoutCheckpointing() throws Exception { @SuppressWarnings("unchecked") - final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>( - mock(AbstractFetcher.class), - mock(AbstractPartitionDiscoverer.class), - false); + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>(false); setupConsumer( consumer, @@ -669,6 +651,16 @@ private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase testFetcher,