From 0ba9232b3f827ef1b98f11d6f9a6e74b015e139f Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Fri, 10 Mar 2017 21:11:42 +0800 Subject: [PATCH] [FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer --- docs/dev/connectors/kafka.md | 35 +++++- .../connectors/kafka/Kafka010ITCase.java | 4 + .../connectors/kafka/Kafka08ITCase.java | 9 +- .../connectors/kafka/Kafka09ITCase.java | 4 + .../kafka/FlinkKafkaConsumerBase.java | 101 +++++++++++++++++- .../connectors/kafka/config/StartupMode.java | 9 +- .../KafkaConsumerPartitionAssignmentTest.java | 33 ++++-- .../kafka/KafkaConsumerTestBase.java | 82 ++++++++++++-- 8 files changed, 251 insertions(+), 26 deletions(-) diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 06e40b257da36..6d58b0c4b43af 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -220,7 +220,40 @@ All versions of the Flink Kafka Consumer have the above explicit configuration m record. Under these modes, committed offsets in Kafka will be ignored and not used as starting positions. -Note that these settings do not affect the start position when the job is +You can also specify the exact offsets the consumer should start from for each partition: + +
+
+{% highlight java %} +Map specificStartOffsets = new HashMap<>(); +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L); +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L); + +myConsumer.setStartFromSpecificOffsets(specificStartOffsets); +{% endhighlight %} +
+
+{% highlight scala %} +val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]() +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L) +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L) +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L) + +myConsumer.setStartFromSpecificOffsets(specificStartOffsets) +{% endhighlight %} +
+
+ +The above example configures the consumer to start from the specified offsets for +partitions 0, 1, and 2 of topic `myTopic`. The offset values should be the +next record that the consumer should read for each partition. Note that +if the consumer needs to read a partition which does not have a specified +offset within the provided offsets map, it will fallback to the default +group offsets behaviour (i.e. `setStartFromGroupOffsets()`) for that +particular partition. + +Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure or manually restored using a savepoint. On restore, the start position of each Kafka partition is determined by the offsets stored in the savepoint or checkpoint diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index a375fb612b889..208516934fae9 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -147,6 +147,10 @@ public void testStartFromGroupOffsets() throws Exception { runStartFromGroupOffsets(); } + @Test(timeout = 60000) + public void testStartFromSpecificOffsets() throws Exception { + runStartFromSpecificOffsets(); + } // --- offset committing --- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 3fc00e9a9092a..2e7c368864202 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -90,7 +90,7 @@ public void testInvalidOffset() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.getConfig().disableSysoutLogging(); - readSequence(env, StartupMode.GROUP_OFFSETS, standardProps, parallelism, topic, valuesCount, startFrom); + readSequence(env, StartupMode.GROUP_OFFSETS, null, standardProps, parallelism, topic, valuesCount, startFrom); deleteTestTopic(topic); } @@ -136,6 +136,11 @@ public void testStartFromGroupOffsets() throws Exception { runStartFromGroupOffsets(); } + @Test(timeout = 60000) + public void testStartFromSpecificOffsets() throws Exception { + runStartFromSpecificOffsets(); + } + // --- offset committing --- @Test(timeout = 60000) @@ -200,7 +205,7 @@ public void testOffsetAutocommitTest() throws Exception { readProps.setProperty("auto.commit.interval.ms", "500"); // read so that the offset can be committed to ZK - readSequence(env, StartupMode.GROUP_OFFSETS, readProps, parallelism, topicName, 100, 0); + readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, parallelism, topicName, 100, 0); // get the offset CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index 6added7c1045d..ca9965c0a8aed 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -127,6 +127,10 @@ public void testStartFromGroupOffsets() throws Exception { runStartFromGroupOffsets(); } + @Test(timeout = 60000) + public void testStartFromSpecificOffsets() throws Exception { + runStartFromSpecificOffsets(); + } // --- offset committing --- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 144ede86e0e18..027751c3e4bb5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -40,11 +40,13 @@ import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -104,6 +106,9 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti /** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */ protected StartupMode startupMode = StartupMode.GROUP_OFFSETS; + /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS} */ + protected Map specificStartupOffsets; + // ------------------------------------------------------------------------ // runtime state (used individually by each parallel subtask) // ------------------------------------------------------------------------ @@ -210,23 +215,33 @@ public FlinkKafkaConsumerBase assignTimestampsAndWatermarks(AssignerWithPerio /** * Specifies the consumer to start reading from the earliest offset for all partitions. - * This ignores any committed group offsets in Zookeeper / Kafka brokers. + * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + * + * This method does not effect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + * savepoint, only the offsets in the restored state will be used. * * @return The consumer object, to allow function chaining. */ public FlinkKafkaConsumerBase setStartFromEarliest() { this.startupMode = StartupMode.EARLIEST; + this.specificStartupOffsets = null; return this; } /** * Specifies the consumer to start reading from the latest offset for all partitions. - * This ignores any committed group offsets in Zookeeper / Kafka brokers. + * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + * + * This method does not effect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + * savepoint, only the offsets in the restored state will be used. * * @return The consumer object, to allow function chaining. */ public FlinkKafkaConsumerBase setStartFromLatest() { this.startupMode = StartupMode.LATEST; + this.specificStartupOffsets = null; return this; } @@ -236,10 +251,41 @@ public FlinkKafkaConsumerBase setStartFromLatest() { * properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset" * set in the configuration properties will be used for the partition. * + * This method does not effect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + * savepoint, only the offsets in the restored state will be used. + * * @return The consumer object, to allow function chaining. */ public FlinkKafkaConsumerBase setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; + this.specificStartupOffsets = null; + return this; + } + + /** + * Specifies the consumer to start reading partitions from specific offsets, set independently for each partition. + * The specified offset should be the offset of the next record that will be read from partitions. + * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + * + * If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not subscribed by the + * consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided + * map of offsets, the consumer will fallback to the default group offset behaviour (see + * {@link FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition. + * + * If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group + * offsets but still no group offset could be found for it, then the "auto.offset.reset" behaviour set in the + * configuration properties will be used for the partition + * + * This method does not effect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + * savepoint, only the offsets in the restored state will be used. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase setStartFromSpecificOffsets(Map specificStartupOffsets) { + this.startupMode = StartupMode.SPECIFIC_OFFSETS; + this.specificStartupOffsets = checkNotNull(specificStartupOffsets); return this; } @@ -269,7 +315,8 @@ public void open(Configuration configuration) { kafkaTopicPartitions, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), - startupMode); + startupMode, + specificStartupOffsets); if (subscribedPartitionsToStartOffsets.size() != 0) { switch (startupMode) { @@ -285,6 +332,28 @@ public void open(Configuration configuration) { subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); break; + case SPECIFIC_OFFSETS: + LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + specificStartupOffsets, + subscribedPartitionsToStartOffsets.keySet()); + + List partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size()); + for (Map.Entry subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { + if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { + partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey()); + } + } + + if (partitionsDefaultedToGroupOffsets.size() > 0) { + LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" + + "; their startup offsets will be defaulted to their committed group offsets in Kafka.", + getRuntimeContext().getIndexOfThisSubtask(), + partitionsDefaultedToGroupOffsets.size(), + partitionsDefaultedToGroupOffsets); + } + break; default: case GROUP_OFFSETS: LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", @@ -550,6 +619,8 @@ public TypeInformation getProducedType() { * @param indexOfThisSubtask the index of this consumer instance * @param numParallelSubtasks total number of parallel consumer instances * @param startupMode the configured startup mode for the consumer + * @param specificStartupOffsets specific partition offsets to start from + * (only relevant if startupMode is {@link StartupMode#SPECIFIC_OFFSETS}) * * Note: This method is also exposed for testing. */ @@ -558,11 +629,31 @@ protected static void initializeSubscribedPartitionsToStartOffsets( List kafkaTopicPartitions, int indexOfThisSubtask, int numParallelSubtasks, - StartupMode startupMode) { + StartupMode startupMode, + Map specificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { - subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + } else { + if (specificStartupOffsets == null) { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + } + + KafkaTopicPartition partition = kafkaTopicPartitions.get(i); + + Long specificOffset = specificStartupOffsets.get(partition); + if (specificOffset != null) { + // since the specified offsets represent the next record to read, we subtract + // it by one so that the initial state of the consumer will be correct + subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1); + } else { + subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + } + } } } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java index f796e62255c0e..8fc2fe0d3eff0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java @@ -30,7 +30,14 @@ public enum StartupMode { EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET), /** Start from the latest offset */ - LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET), + + /** + * Start from user-supplied specific offsets for each partition. + * Since this mode will have specific offsets to start with, we do not need a sentinel value; + * using Long.MIN_VALUE as a placeholder. + */ + SPECIFIC_OFFSETS(Long.MIN_VALUE); /** The sentinel offset value corresponding to this startup mode */ private long stateSentinel; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java index 379d53a4bca2c..c24640dfd2a38 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java @@ -56,7 +56,8 @@ public void testPartitionsEqualConsumers() { inPartitions, i, inPartitions.size(), - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); @@ -95,7 +96,8 @@ public void testMultiplePartitionsPerConsumers() { partitions, i, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); @@ -138,7 +140,8 @@ public void testPartitionsFewerThanConsumers() { inPartitions, i, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); @@ -169,7 +172,8 @@ public void testAssignEmptyPartitions() { ep, 2, 4, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty()); subscribedPartitionsToStartOffsets = new HashMap<>(); @@ -178,7 +182,8 @@ public void testAssignEmptyPartitions() { ep, 0, 1, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty()); } catch (Exception e) { @@ -218,21 +223,24 @@ public void testGrowingPartitionsRemainsStable() { initialPartitions, 0, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets2, initialPartitions, 1, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets3, initialPartitions, 2, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet()); List subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet()); @@ -274,21 +282,24 @@ public void testGrowingPartitionsRemainsStable() { newPartitions, 0, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets2, newPartitions, 1, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets3, newPartitions, 2, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet()); List subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet()); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 580c5075e4cf8..ddac61c655ede 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -62,6 +62,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; @@ -349,7 +350,7 @@ public Object map(String value) throws Exception { (o3 != null) ? o3.intValue() : 0 )); - readSequence(env2, StartupMode.GROUP_OFFSETS, standardProps, topicName, partitionsToValuesCountAndStartOffset); + readSequence(env2, StartupMode.GROUP_OFFSETS, null, standardProps, topicName, partitionsToValuesCountAndStartOffset); kafkaOffsetHandler.close(); deleteTestTopic(topicName); @@ -465,7 +466,7 @@ public void runStartFromEarliestOffsets() throws Exception { kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); - readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + readSequence(env, StartupMode.EARLIEST, null, readProps, parallelism, topicName, recordsInEachPartition, 0); kafkaOffsetHandler.close(); deleteTestTopic(topicName); @@ -619,7 +620,7 @@ public void cancel() { * partition 2 --> start from offset 43, read to offset 49 (7 records) */ public void runStartFromGroupOffsets() throws Exception { - // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + // 3 partitions with 50 records each (offsets 0-49) final int parallelism = 3; final int recordsInEachPartition = 50; @@ -645,7 +646,71 @@ public void runStartFromGroupOffsets() throws Exception { partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 0)); // partition 1 should read offset 0-49 partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 43)); // partition 2 should read offset 43-49 - readSequence(env, StartupMode.GROUP_OFFSETS, readProps, topicName, partitionsToValueCountAndStartOffsets); + readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, topicName, partitionsToValueCountAndStartOffsets); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that the consumer correctly uses user-supplied specific offsets when explicitly configured to + * start from specific offsets. For partitions which a specific offset can not be found for, the starting position + * for them should fallback to the group offsets behaviour. + * + * 4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is: + * partition 0 --> start from offset 19 + * partition 1 --> not set + * partition 2 --> start from offset 22 + * partition 3 --> not set + * partition 4 --> start from offset 26 (this should be ignored because the partition does not exist) + * + * The partitions and their committed group offsets are setup as: + * partition 0 --> committed offset 23 + * partition 1 --> committed offset 31 + * partition 2 --> committed offset 43 + * partition 3 --> no commit offset + * + * When configured to start from these specific offsets, each partition should read: + * partition 0 --> start from offset 19, read to offset 49 (31 records) + * partition 1 --> fallback to group offsets, so start from offset 31, read to offset 49 (19 records) + * partition 2 --> start from offset 22, read to offset 49 (28 records) + * partition 3 --> fallback to group offsets, but since there is no group offset for this partition, + * will default to "auto.offset.reset" (set to "earliest"), + * so start from offset 0, read to offset 49 (50 records) + */ + public void runStartFromSpecificOffsets() throws Exception { + // 4 partitions with 50 records each (offsets 0-49) + final int parallelism = 4; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromSpecificOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // partition 3 should default back to this behaviour + + Map specificStartupOffsets = new HashMap<>(); + specificStartupOffsets.put(new KafkaTopicPartition(topicName, 0), 19L); + specificStartupOffsets.put(new KafkaTopicPartition(topicName, 2), 22L); + specificStartupOffsets.put(new KafkaTopicPartition(topicName, 4), 26L); // non-existing partition, should be ignored + + // only the committed offset for partition 1 should be used, because partition 1 has no entry in specific offset map + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Map> partitionsToValueCountAndStartOffsets = new HashMap<>(); + partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(31, 19)); // partition 0 should read offset 19-49 + partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(19, 31)); // partition 1 should read offset 31-49 + partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22)); // partition 2 should read offset 22-49 + partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0)); // partition 3 should read offset 0-49 + + readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets); kafkaOffsetHandler.close(); deleteTestTopic(topicName); @@ -1781,6 +1846,7 @@ public TypeInformation> getProducedType() { */ protected void readSequence(final StreamExecutionEnvironment env, final StartupMode startupMode, + final Map specificStartupOffsets, final Properties cc, final String topicName, final Map> partitionsToValuesCountAndStartOffset) throws Exception { @@ -1807,6 +1873,9 @@ protected void readSequence(final StreamExecutionEnvironment env, case LATEST: consumer.setStartFromLatest(); break; + case SPECIFIC_OFFSETS: + consumer.setStartFromSpecificOffsets(specificStartupOffsets); + break; case GROUP_OFFSETS: consumer.setStartFromGroupOffsets(); break; @@ -1874,11 +1943,12 @@ public void flatMap(Tuple2 value, Collector out) thro } /** - * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Properties, String, Map)} to + * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Map, Properties, String, Map)} to * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic. */ protected void readSequence(final StreamExecutionEnvironment env, final StartupMode startupMode, + final Map specificStartupOffsets, final Properties cc, final int sourceParallelism, final String topicName, @@ -1888,7 +1958,7 @@ protected void readSequence(final StreamExecutionEnvironment env, for (int i = 0; i < sourceParallelism; i++) { partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom)); } - readSequence(env, startupMode, cc, topicName, partitionsToValuesCountAndStartOffset); + readSequence(env, startupMode, specificStartupOffsets, cc, topicName, partitionsToValuesCountAndStartOffset); } protected String writeSequence(