From 12af5d8b0e43b62935dc619258fb8f957b11d0bc Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Fri, 14 Jul 2017 19:51:03 +0800 Subject: [PATCH] [FLINK-7195] [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer --- .../kafka/FlinkKafkaConsumerBase.java | 27 ++- .../kafka/FlinkKafkaConsumerBaseTest.java | 113 ++++++++++- .../KafkaConsumerPartitionAssignmentTest.java | 176 ++++++++---------- 3 files changed, 203 insertions(+), 113 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 87bedce6b6574..20c1b83135c65 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -347,24 +347,17 @@ public void open(Configuration configuration) { getRuntimeContext().getIndexOfThisSubtask()); } - // initialize subscribed partitions - List kafkaTopicPartitions = getKafkaPartitions(topics); - Preconditions.checkNotNull(kafkaTopicPartitions, "TopicPartitions must not be null."); - - subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size()); - if (restoredState != null) { - for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { - if (restoredState.containsKey(kafkaTopicPartition)) { - subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition)); - } - } + subscribedPartitionsToStartOffsets = restoredState; LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); } else { - initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets, + // initialize subscribed partitions + List kafkaTopicPartitions = getKafkaPartitions(topics); + Preconditions.checkNotNull(kafkaTopicPartitions, "TopicPartitions must not be null."); + + subscribedPartitionsToStartOffsets = initializeSubscribedPartitionsToStartOffsets( kafkaTopicPartitions, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), @@ -680,7 +673,6 @@ public TypeInformation getProducedType() { * values. The method decides which partitions this consumer instance should subscribe to, and also * sets the initial offset each subscribed partition should be started from based on the configured startup mode. * - * @param subscribedPartitionsToStartOffsets to subscribedPartitionsToStartOffsets to initialize * @param kafkaTopicPartitions the complete list of all Kafka partitions * @param indexOfThisSubtask the index of this consumer instance * @param numParallelSubtasks total number of parallel consumer instances @@ -690,14 +682,15 @@ public TypeInformation getProducedType() { * * Note: This method is also exposed for testing. */ - protected static void initializeSubscribedPartitionsToStartOffsets( - Map subscribedPartitionsToStartOffsets, + protected static Map initializeSubscribedPartitionsToStartOffsets( List kafkaTopicPartitions, int indexOfThisSubtask, int numParallelSubtasks, StartupMode startupMode, Map specificStartupOffsets) { + Map subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size()); + for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { if (startupMode != StartupMode.SPECIFIC_OFFSETS) { @@ -722,6 +715,8 @@ protected static void initializeSubscribedPartitionsToStartOffsets( } } } + + return subscribedPartitionsToStartOffsets; } /** diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index ccf2ed2044a0c..9651704609c87 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.junit.Assert; import org.junit.Test; @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed } + @Test + public void testRestoredStateInsensitiveToMissingPartitions() throws Exception { + List mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // missing fetched partitions on restore + List mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2); + + testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore); + } + + @Test + public void testRestoredStateInsensitiveToNewPartitions() throws Exception { + List mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // new partitions on restore + List mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup); + mockFetchedPartitionsOnRestore.add(new KafkaTopicPartition("test-topic", 3)); + mockFetchedPartitionsOnRestore.add(new KafkaTopicPartition("test-topic", 4)); + + testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore); + } + + @Test + public void testRestoredStateInsensitiveToDifferentPartitionOrdering() throws Exception { + List mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // different partition ordering on restore + List mockFetchedPartitionsOnRestore = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 2), + new KafkaTopicPartition("test-topic", 1)); + + testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore); + } + + private void testRestoredStateInsensitiveToFetchedPartitions( + List mockFetchedPartitionsOnStartup, + List mockFetchedPartitionsOnRestore) throws Exception { + + StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class); + when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing + when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1); + when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0); + + // startup run for consumer + DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(mockFetchedPartitionsOnStartup); + consumer.setRuntimeContext(mockRuntimeContext); + + TestingListState listState = new TestingListState<>(); + + OperatorStateStore backend = mock(OperatorStateStore.class); + when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(backend); + when(initializationContext.isRestored()).thenReturn(false); + + consumer.initializeState(initializationContext); + consumer.open(new Configuration()); + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141)); + + HashMap startupSnapshot = new HashMap<>(); + + for (Serializable serializable : listState.get()) { + Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2) serializable; + startupSnapshot.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); + } + + // restore run for consumer; re-initialize consumer, this + // time mocking partition fetching to have missing partitions + consumer = new DummyFlinkKafkaConsumer(mockFetchedPartitionsOnRestore); + consumer.setRuntimeContext(mockRuntimeContext); + + // re-initialize mock state init context to return true for isRestored + initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(backend); + when(initializationContext.isRestored()).thenReturn(true); + + consumer.initializeState(initializationContext); + consumer.open(new Configuration()); + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141)); + + HashMap restoreSnapshot = new HashMap<>(); + for (Serializable serializable : listState.get()) { + Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2) serializable; + restoreSnapshot.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); + } + + // no state should be missing regardless of what partitions were fetched on the restore run + Assert.assertEquals(startupSnapshot, restoreSnapshot); + } + // ------------------------------------------------------------------------ private static FlinkKafkaConsumerBase getConsumer( @@ -564,9 +666,16 @@ private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase mockFetchedPartitions; + public DummyFlinkKafkaConsumer() { + this(Collections.emptyList()); + } + + @SuppressWarnings("unchecked") + public DummyFlinkKafkaConsumer(List mockFetchedPartitions) { super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class)); + this.mockFetchedPartitions = Preconditions.checkNotNull(mockFetchedPartitions); } @Override @@ -583,7 +692,7 @@ public DummyFlinkKafkaConsumer() { @Override protected List getKafkaPartitions(List topics) { - return Collections.emptyList(); + return mockFetchedPartitions; } @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java index c24640dfd2a38..5f9fc8495bd81 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java @@ -27,7 +27,6 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map; -import java.util.HashMap; import java.util.List; import java.util.Set; @@ -50,14 +49,13 @@ public void testPartitionsEqualConsumers() { new KafkaTopicPartition("test-topic", 1)); for (int i = 0; i < inPartitions.size(); i++) { - Map subscribedPartitionsToStartOffsets = new HashMap<>(); - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets, - inPartitions, - i, - inPartitions.size(), - StartupMode.GROUP_OFFSETS, - null); + Map subscribedPartitionsToStartOffsets = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + inPartitions, + i, + inPartitions.size(), + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); @@ -90,14 +88,13 @@ public void testMultiplePartitionsPerConsumers() { final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1; for (int i = 0; i < numConsumers; i++) { - Map subscribedPartitionsToStartOffsets = new HashMap<>(); - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets, - partitions, - i, - numConsumers, - StartupMode.GROUP_OFFSETS, - null); + Map subscribedPartitionsToStartOffsets = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + partitions, + i, + numConsumers, + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); @@ -134,14 +131,13 @@ public void testPartitionsFewerThanConsumers() { final int numConsumers = 2 * inPartitions.size() + 3; for (int i = 0; i < numConsumers; i++) { - Map subscribedPartitionsToStartOffsets = new HashMap<>(); - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets, - inPartitions, - i, - numConsumers, - StartupMode.GROUP_OFFSETS, - null); + Map subscribedPartitionsToStartOffsets = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + inPartitions, + i, + numConsumers, + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); @@ -166,24 +162,22 @@ public void testPartitionsFewerThanConsumers() { public void testAssignEmptyPartitions() { try { List ep = new ArrayList<>(); - Map subscribedPartitionsToStartOffsets = new HashMap<>(); - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets, - ep, - 2, - 4, - StartupMode.GROUP_OFFSETS, - null); + Map subscribedPartitionsToStartOffsets = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + ep, + 2, + 4, + StartupMode.GROUP_OFFSETS, + null); assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty()); - subscribedPartitionsToStartOffsets = new HashMap<>(); - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets, - ep, - 0, - 1, - StartupMode.GROUP_OFFSETS, - null); + subscribedPartitionsToStartOffsets = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + ep, + 0, + 1, + StartupMode.GROUP_OFFSETS, + null); assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty()); } catch (Exception e) { @@ -214,33 +208,29 @@ public void testGrowingPartitionsRemainsStable() { final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers; final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1; - Map subscribedPartitionsToStartOffsets1 = new HashMap<>(); - Map subscribedPartitionsToStartOffsets2 = new HashMap<>(); - Map subscribedPartitionsToStartOffsets3 = new HashMap<>(); - - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets1, - initialPartitions, - 0, - numConsumers, - StartupMode.GROUP_OFFSETS, - null); - - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets2, - initialPartitions, - 1, - numConsumers, - StartupMode.GROUP_OFFSETS, - null); - - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets3, - initialPartitions, - 2, - numConsumers, - StartupMode.GROUP_OFFSETS, - null); + Map subscribedPartitionsToStartOffsets1 = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + initialPartitions, + 0, + numConsumers, + StartupMode.GROUP_OFFSETS, + null); + + Map subscribedPartitionsToStartOffsets2 = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + initialPartitions, + 1, + numConsumers, + StartupMode.GROUP_OFFSETS, + null); + + Map subscribedPartitionsToStartOffsets3 = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + initialPartitions, + 2, + numConsumers, + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet()); List subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet()); @@ -273,33 +263,29 @@ public void testGrowingPartitionsRemainsStable() { // grow the set of partitions and distribute anew - subscribedPartitionsToStartOffsets1 = new HashMap<>(); - subscribedPartitionsToStartOffsets2 = new HashMap<>(); - subscribedPartitionsToStartOffsets3 = new HashMap<>(); - - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets1, - newPartitions, - 0, - numConsumers, - StartupMode.GROUP_OFFSETS, - null); - - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets2, - newPartitions, - 1, - numConsumers, - StartupMode.GROUP_OFFSETS, - null); - - FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( - subscribedPartitionsToStartOffsets3, - newPartitions, - 2, - numConsumers, - StartupMode.GROUP_OFFSETS, - null); + subscribedPartitionsToStartOffsets1 = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + newPartitions, + 0, + numConsumers, + StartupMode.GROUP_OFFSETS, + null); + + subscribedPartitionsToStartOffsets2 = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + newPartitions, + 1, + numConsumers, + StartupMode.GROUP_OFFSETS, + null); + + subscribedPartitionsToStartOffsets3 = + FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( + newPartitions, + 2, + numConsumers, + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet()); List subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());