diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index a727f85fbbdc9..0f700ab7f0051 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -164,11 +164,64 @@ For convenience, Flink provides the following schemas: The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as an optional "metadata" field that exposes the offset/partition/topic for this message. +### Kafka Consumers Start Position Configuration + +The Flink Kafka Consumer allows configuring how the start position for Kafka +partitions are determined. + +Example: + +
+
+{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +FlinkKafkaConsumer08 myConsumer = new FlinkKafkaConsumer08<>(...); +myConsumer.setStartFromEarliest(); // start from the earliest record possible +myConsumer.setStartFromLatest(); // start from the latest record +myConsumer.setStartFromGroupOffsets(); // the default behaviour + +DataStream stream = env.addSource(myConsumer); +... +{% endhighlight %} +
+
+{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +val myConsumer = new FlinkKafkaConsumer08[String](...) +myConsumer.setStartFromEarliest() // start from the earliest record possible +myConsumer.setStartFromLatest() // start from the latest record +myConsumer.setStartFromGroupOffsets() // the default behaviour + +val stream = env.addSource(myConsumer) +... +{% endhighlight %} +
+
+ +All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. + + * `setStartFromGroupOffsets` (default behaviour): Start reading partitions from + the consumer group's (`group.id` setting in the consumer properties) committed + offsets in Kafka brokers (or Zookeeper for Kafka 0.8). If offsets could not be + found for a partition, the `auto.offset.reset` setting in the properties will be used. + * `setStartFromEarliest()` / `setStartFromLatest()`: Start from the earliest / latest + record. Under these modes, committed offsets in Kafka will be ignored and + not used as starting positions. + +Note that these settings do not affect the start position when the job is +automatically restored from a failure or manually restored using a savepoint. +On restore, the start position of each Kafka partition is determined by the +offsets stored in the savepoint or checkpoint +(please see the next section for information about checkpointing to enable +fault tolerance for the consumer). + ### Kafka Consumers and Fault Tolerance With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore -the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where +the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint. The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure. diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index a9ce336472b45..3a58216f5a887 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -29,6 +29,7 @@ import org.apache.flink.util.SerializedValue; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Properties; @@ -128,6 +129,7 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema protected AbstractFetcher createFetcher( SourceContext sourceContext, List thisSubtaskPartitions, + HashMap restoredSnapshotState, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { @@ -137,6 +139,7 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema return new Kafka010Fetcher<>( sourceContext, thisSubtaskPartitions, + restoredSnapshotState, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), @@ -148,6 +151,7 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema deserializer, properties, pollTimeout, + startupMode, useMetrics); } } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java index 71dd29ae6597c..efb6f88d0beae 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -31,6 +32,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import java.util.HashMap; import java.util.List; import java.util.Properties; @@ -47,6 +49,7 @@ public class Kafka010Fetcher extends Kafka09Fetcher { public Kafka010Fetcher( SourceContext sourceContext, List assignedPartitions, + HashMap restoredSnapshotState, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, @@ -58,11 +61,13 @@ public Kafka010Fetcher( KeyedDeserializationSchema deserializer, Properties kafkaProperties, long pollTimeout, + StartupMode startupMode, boolean useMetrics) throws Exception { super( sourceContext, assignedPartitions, + restoredSnapshotState, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, @@ -74,6 +79,7 @@ public Kafka010Fetcher( deserializer, kafkaProperties, pollTimeout, + startupMode, useMetrics); } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java index a81b098b87453..1e0bc5b674248 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java @@ -37,4 +37,14 @@ public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge { public void assignPartitions(KafkaConsumer consumer, List topicPartitions) throws Exception { consumer.assign(topicPartitions); } + + @Override + public void seekPartitionsToBeginning(KafkaConsumer consumer, List partitions) { + consumer.seekToBeginning(partitions); + } + + @Override + public void seekPartitionsToEnd(KafkaConsumer consumer, List partitions) { + consumer.seekToEnd(partitions); + } } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java index 6ee0429e5d089..3bc154e148eeb 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java @@ -22,9 +22,9 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internal.Handover; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; @@ -122,20 +122,22 @@ public Void answer(InvocationOnMock invocation) { KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( - sourceContext, - topics, - null, /* periodic assigner */ - null, /* punctuated assigner */ - new TestProcessingTimeService(), - 10, - getClass().getClassLoader(), - false, /* checkpointing */ - "taskname-with-subtask", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); + sourceContext, + topics, + null, /* no restored state */ + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10, + getClass().getClassLoader(), + false, /* checkpointing */ + "taskname-with-subtask", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + StartupMode.GROUP_OFFSETS, + false); // ----- run the fetcher ----- @@ -256,23 +258,24 @@ public Void answer(InvocationOnMock invocation) { SourceContext sourceContext = mock(SourceContext.class); List topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( - sourceContext, - topics, - null, /* periodic assigner */ - null, /* punctuated assigner */ - new TestProcessingTimeService(), - 10, - getClass().getClassLoader(), - false, /* checkpointing */ - "taskname-with-subtask", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); + sourceContext, + topics, + null, /* no restored state */ + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10, + getClass().getClassLoader(), + false, /* checkpointing */ + "taskname-with-subtask", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + StartupMode.GROUP_OFFSETS, + false); // ----- run the fetcher ----- @@ -374,20 +377,22 @@ public void testCancellationWhenEmitBlocks() throws Exception { KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( - sourceContext, - topics, - null, /* periodic watermark extractor */ - null, /* punctuated watermark extractor */ - new TestProcessingTimeService(), - 10, /* watermark interval */ - this.getClass().getClassLoader(), - true, /* checkpointing */ - "task_name", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); + sourceContext, + topics, + null, /* no restored state */ + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + StartupMode.GROUP_OFFSETS, + false); // ----- run the fetcher ----- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index 08511c9a9ce1d..a375fb612b889 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -51,7 +51,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { // Suite of Tests // ------------------------------------------------------------------------ - @Test(timeout = 60000) public void testFailOnNoBroker() throws Exception { runFailOnNoBrokerTest(); @@ -131,6 +130,24 @@ public void testMetricsAndEndOfStream() throws Exception { runEndOfStreamTest(); } + // --- startup mode --- + + @Test(timeout = 60000) + public void testStartFromEarliestOffsets() throws Exception { + runStartFromEarliestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromLatestOffsets() throws Exception { + runStartFromLatestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromGroupOffsets() throws Exception { + runStartFromGroupOffsets(); + } + + // --- offset committing --- @Test(timeout = 60000) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index f15fd45f7ec49..bc1faaf09ac82 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -47,6 +47,8 @@ import java.net.BindException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.Properties; import java.util.UUID; @@ -129,8 +131,8 @@ public DataStreamSink produceIntoKafka(DataStream stream, String topic } @Override - public KafkaOffsetHandler createOffsetHandler(Properties props) { - return new KafkaOffsetHandlerImpl(props); + public KafkaOffsetHandler createOffsetHandler() { + return new KafkaOffsetHandlerImpl(); } @Override @@ -401,7 +403,12 @@ private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { private final KafkaConsumer offsetClient; - public KafkaOffsetHandlerImpl(Properties props) { + public KafkaOffsetHandlerImpl() { + Properties props = new Properties(); + props.putAll(standardProps); + props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + offsetClient = new KafkaConsumer<>(props); } @@ -411,6 +418,13 @@ public Long getCommittedOffset(String topicName, int partition) { return (committed != null) ? committed.offset() : null; } + @Override + public void setCommittedOffset(String topicName, int partition, long offset) { + Map partitionAndOffset = new HashMap<>(); + partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset)); + offsetClient.commitSync(partitionAndOffset); + } + @Override public void close() { offsetClient.close(); diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 0f11c728eba14..c0e4dd79c41b4 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.connectors.kafka; -import kafka.api.OffsetRequest; import kafka.cluster.Broker; import kafka.common.ErrorMapping; import kafka.javaapi.PartitionMetadata; @@ -49,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.HashMap; import java.util.Properties; import java.util.Random; @@ -112,9 +112,6 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { /** The properties to parametrize the Kafka consumer and ZooKeeper client */ private final Properties kafkaProperties; - /** The behavior when encountering an invalid offset (see {@link OffsetRequest}) */ - private final long invalidOffsetBehavior; - /** The interval in which to automatically commit (-1 if deactivated) */ private final long autoCommitInterval; @@ -188,7 +185,9 @@ public FlinkKafkaConsumer08(List topics, KeyedDeserializationSchema d // validate the zookeeper properties validateZooKeeperConfig(props); - this.invalidOffsetBehavior = getInvalidOffsetBehavior(props); + // eagerly check for invalid "auto.offset.reset" values before launching the job + validateAutoOffsetResetValue(props); + this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000); } @@ -196,16 +195,18 @@ public FlinkKafkaConsumer08(List topics, KeyedDeserializationSchema d protected AbstractFetcher createFetcher( SourceContext sourceContext, List thisSubtaskPartitions, + HashMap restoredSnapshotState, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false")); - return new Kafka08Fetcher<>(sourceContext, thisSubtaskPartitions, + return new Kafka08Fetcher<>(sourceContext, + thisSubtaskPartitions, restoredSnapshotState, watermarksPeriodic, watermarksPunctuated, runtimeContext, deserializer, kafkaProperties, - invalidOffsetBehavior, autoCommitInterval, useMetrics); + autoCommitInterval, startupMode, useMetrics); } @Override @@ -384,16 +385,19 @@ private static void validateSeedBrokers(String[] seedBrokers, Exception exceptio } } - private static long getInvalidOffsetBehavior(Properties config) { + /** + * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting + * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception + * right after a task is started. + * + * @param config kafka consumer properties to check + */ + private static void validateAutoOffsetResetValue(Properties config) { final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); - if (val.equals("none")) { + if (!(val.equals("largest") || val.equals("latest") || val.equals("earliest") || val.equals("smallest"))) { + // largest/smallest is kafka 0.8, latest/earliest is kafka 0.9 throw new IllegalArgumentException("Cannot use '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG - + "' value 'none'. Possible values: 'latest', 'largest', or 'earliest'."); - } - else if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9 - return OffsetRequest.LatestTime(); - } else { - return OffsetRequest.EarliestTime(); + + "' value '" + val + "'. Possible values: 'latest', 'largest', 'earliest', or 'smallest'."); } } } diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index 5a0aed38f859b..ad520d880bd29 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -18,9 +18,12 @@ package org.apache.flink.streaming.connectors.kafka.internals; +import kafka.api.OffsetRequest; import kafka.common.TopicAndPartition; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Node; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; @@ -89,29 +92,32 @@ public class Kafka08Fetcher extends AbstractFetcher { public Kafka08Fetcher( SourceContext sourceContext, List assignedPartitions, + HashMap restoredSnapshotState, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, KeyedDeserializationSchema deserializer, Properties kafkaProperties, - long invalidOffsetBehavior, long autoCommitInterval, + StartupMode startupMode, boolean useMetrics) throws Exception { super( sourceContext, assignedPartitions, + restoredSnapshotState, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), + startupMode, useMetrics); this.deserializer = checkNotNull(deserializer); this.kafkaConfig = checkNotNull(kafkaProperties); this.runtimeContext = runtimeContext; - this.invalidOffsetBehavior = invalidOffsetBehavior; + this.invalidOffsetBehavior = getInvalidOffsetBehavior(kafkaProperties); this.autoCommitInterval = autoCommitInterval; this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); @@ -139,23 +145,44 @@ public void runFetchLoop() throws Exception { PeriodicOffsetCommitter periodicCommitter = null; try { - // read offsets from ZooKeeper for partitions that did not restore offsets - { - List partitionsWithNoOffset = new ArrayList<>(); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - if (!partition.isOffsetDefined()) { - partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); - } - } - Map zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition()); - if (zkOffset != null) { - // the offset in ZK represents the "next record to process", so we need to subtract it by 1 - // to correctly represent our internally checkpointed offsets - partition.setOffset(zkOffset - 1); - } + // if we're not restored from a checkpoint, all partitions will not have their offset set; + // depending on the configured startup mode, accordingly set the starting offsets + if (!isRestored) { + switch (startupMode) { + case EARLIEST: + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.EarliestTime()); + } + break; + case LATEST: + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.LatestTime()); + } + break; + default: + case GROUP_OFFSETS: + List partitions = new ArrayList<>(subscribedPartitions().length); + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + partitions.add(partition.getKafkaTopicPartition()); + } + + Map zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitions); + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + Long offset = zkOffsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + // the committed offset in ZK represents the next record to process, + // so we subtract it by 1 to correctly represent internal state + partition.setOffset(offset - 1); + } else { + // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS, + // we default to "auto.offset.reset" like the Kafka high-level consumer + LOG.warn("No group offset can be found for partition {} in Zookeeper;" + + " resetting starting offset to 'auto.offset.reset'", partition); + + partition.setOffset(invalidOffsetBehavior); + } + } } } @@ -487,4 +514,21 @@ private static Map>> find return leaderToPartitions; } + + /** + * Retrieve the behaviour of "auto.offset.reset" from the config properties. + * A partition needs to fallback to "auto.offset.reset" as default offset when + * we can't find offsets in ZK to start from in {@link StartupMode#GROUP_OFFSETS} startup mode. + * + * @param config kafka consumer properties + * @return either OffsetRequest.LatestTime() or OffsetRequest.EarliestTime() + */ + private static long getInvalidOffsetBehavior(Properties config) { + final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); + if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9 + return OffsetRequest.LatestTime(); + } else { + return OffsetRequest.EarliestTime(); + } + } } diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java index 35e491acfd75e..e9cfdacf704ed 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka.internals; import kafka.api.FetchRequestBuilder; +import kafka.api.OffsetRequest; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; @@ -110,6 +111,8 @@ public SimpleConsumerThread( this.owner = owner; this.errorHandler = errorHandler; this.broker = broker; + // all partitions should have been assigned a starting offset by the fetcher + checkAllPartitionsHaveDefinedStartingOffsets(seedPartitions); this.partitions = seedPartitions; this.deserializer = requireNonNull(deserializer); this.unassignedPartitions = requireNonNull(unassignedPartitions); @@ -144,10 +147,10 @@ public void run() { // create the Kafka consumer that we actually use for fetching consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); - // make sure that all partitions have some offsets to start with - // those partitions that do not have an offset from a checkpoint need to get - // their start offset from ZooKeeper - getMissingOffsetsFromKafka(partitions); + // replace earliest of latest starting offsets with actual offset values fetched from Kafka + requestAndSetEarliestOrLatestOffsetsFromKafka(consumer, partitions); + + LOG.info("Starting to consume {} partitions with consumer thread {}", partitions.size(), getName()); // Now, the actual work starts :-) int offsetOutOfRangeCount = 0; @@ -160,9 +163,12 @@ public void run() { List> newPartitions = newPartitionsQueue.pollBatch(); if (newPartitions != null) { // found some new partitions for this thread's broker - - // check if the new partitions need an offset lookup - getMissingOffsetsFromKafka(newPartitions); + + // the new partitions should already be assigned a starting offset + checkAllPartitionsHaveDefinedStartingOffsets(newPartitions); + // if the new partitions are to start from earliest or latest offsets, + // we need to replace them with actual values from Kafka + requestAndSetEarliestOrLatestOffsetsFromKafka(consumer, newPartitions); // add the new partitions (and check they are not already in there) for (KafkaTopicPartitionState newPartition: newPartitions) { @@ -300,7 +306,7 @@ else if (code != ErrorMapping.NoError()) { } // get valid offsets for these partitions and try again. LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor); - getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); + requestAndSetSpecificTimeOffsetsFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor); continue; // jump back to create a new fetch request. The offset has not been touched. @@ -408,26 +414,6 @@ else if (partitionsRemoved) { } } - private void getMissingOffsetsFromKafka( - List> partitions) throws IOException - { - // collect which partitions we should fetch offsets for - List> partitionsToGetOffsetsFor = new ArrayList<>(); - for (KafkaTopicPartitionState part : partitions) { - if (!part.isOffsetDefined()) { - // retrieve the offset from the consumer - partitionsToGetOffsetsFor.add(part); - } - } - - if (partitionsToGetOffsetsFor.size() > 0) { - getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); - - LOG.info("No checkpoint/savepoint offsets found for some partitions. " + - "Fetched the following start offsets {}", partitionsToGetOffsetsFor); - } - } - /** * Cancels this fetch thread. The thread will release all resources and terminate. */ @@ -447,15 +433,13 @@ public void cancel() { // ------------------------------------------------------------------------ /** - * Request latest offsets for a set of partitions, via a Kafka consumer. - * - *

This method retries three times if the response has an error. + * Request offsets before a specific time for a set of partitions, via a Kafka consumer. * * @param consumer The consumer connected to lead broker * @param partitions The list of partitions we need offsets for * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest) */ - private static void getLastOffsetFromKafka( + private static void requestAndSetSpecificTimeOffsetsFromKafka( SimpleConsumer consumer, List> partitions, long whichTime) throws IOException @@ -465,26 +449,65 @@ private static void getLastOffsetFromKafka( requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1)); } + requestAndSetOffsetsFromKafka(consumer, partitions, requestInfo); + } + + /** + * For a set of partitions, if a partition is set with the special offsets {@link OffsetRequest#EarliestTime()} + * or {@link OffsetRequest#LatestTime()}, replace them with actual offsets requested via a Kafka consumer. + * + * @param consumer The consumer connected to lead broker + * @param partitions The list of partitions we need offsets for + */ + private static void requestAndSetEarliestOrLatestOffsetsFromKafka( + SimpleConsumer consumer, + List> partitions) throws Exception + { + Map requestInfo = new HashMap<>(); + for (KafkaTopicPartitionState part : partitions) { + if (part.getOffset() == OffsetRequest.EarliestTime() || part.getOffset() == OffsetRequest.LatestTime()) { + requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(part.getOffset(), 1)); + } + } + + requestAndSetOffsetsFromKafka(consumer, partitions, requestInfo); + } + + /** + * Request offsets from Kafka with a specified set of partition's offset request information. + * The returned offsets are used to set the internal partition states. + * + *

This method retries three times if the response has an error. + * + * @param consumer The consumer connected to lead broker + * @param partitionStates the partition states, will be set with offsets fetched from Kafka request + * @param partitionToRequestInfo map of each partition to its offset request info + */ + private static void requestAndSetOffsetsFromKafka( + SimpleConsumer consumer, + List> partitionStates, + Map partitionToRequestInfo) throws IOException + { int retries = 0; OffsetResponse response; while (true) { kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( - requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); + partitionToRequestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); response = consumer.getOffsetsBefore(request); if (response.hasError()) { StringBuilder exception = new StringBuilder(); - for (KafkaTopicPartitionState part : partitions) { + for (KafkaTopicPartitionState part : partitionStates) { short code; if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) { exception.append("\nException for topic=").append(part.getTopic()) - .append(" partition=").append(part.getPartition()).append(": ") - .append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code))); + .append(" partition=").append(part.getPartition()).append(": ") + .append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code))); } } if (++retries >= 3) { - throw new IOException("Unable to get last offset for partitions " + partitions + ": " - + exception.toString()); + throw new IOException("Unable to get last offset for partitions " + partitionStates + ": " + + exception.toString()); } else { LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception); } @@ -493,12 +516,25 @@ private static void getLastOffsetFromKafka( } } - for (KafkaTopicPartitionState part: partitions) { - final long offset = response.offsets(part.getTopic(), part.getPartition())[0]; - - // the offset returned is that of the next record to fetch. because our state reflects the latest - // successfully emitted record, we subtract one - part.setOffset(offset - 1); + for (KafkaTopicPartitionState part: partitionStates) { + // there will be offsets only for partitions that were requested for + if (partitionToRequestInfo.containsKey(part.getKafkaPartitionHandle())) { + final long offset = response.offsets(part.getTopic(), part.getPartition())[0]; + + // the offset returned is that of the next record to fetch. because our state reflects the latest + // successfully emitted record, we subtract one + part.setOffset(offset - 1); + } + } + } + + private static void checkAllPartitionsHaveDefinedStartingOffsets( + List> partitions) + { + for (KafkaTopicPartitionState part : partitions) { + if (!part.isOffsetDefined()) { + throw new IllegalArgumentException("SimpleConsumerThread received a partition with undefined starting offset"); + } } } -} \ No newline at end of file +} diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 0cdf465268545..334bd2b2ff1ab 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -20,6 +20,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; import org.junit.Test; @@ -81,9 +82,9 @@ public void testFailOnDeploy() throws Exception { @Test(timeout = 60000) public void testInvalidOffset() throws Exception { - + final int parallelism = 1; - + // write 20 messages into topic: final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1); @@ -98,8 +99,8 @@ public void testInvalidOffset() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.getConfig().disableSysoutLogging(); - - readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom); + + readSequence(env, StartupMode.GROUP_OFFSETS, standardProps, parallelism, topic, valuesCount, startFrom); deleteTestTopic(topic); } @@ -128,6 +129,23 @@ public void testBrokerFailure() throws Exception { runBrokerFailureTest(); } + // --- startup mode --- + + @Test(timeout = 60000) + public void testStartFromEarliestOffsets() throws Exception { + runStartFromEarliestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromLatestOffsets() throws Exception { + runStartFromLatestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromGroupOffsets() throws Exception { + runStartFromGroupOffsets(); + } + // --- offset committing --- @Test(timeout = 60000) @@ -192,7 +210,7 @@ public void testOffsetAutocommitTest() throws Exception { readProps.setProperty("auto.commit.interval.ms", "500"); // read so that the offset can be committed to ZK - readSequence(env, readProps, parallelism, topicName, 100, 0); + readSequence(env, StartupMode.GROUP_OFFSETS, readProps, parallelism, topicName, 100, 0); // get the offset CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 62354490fcba6..6c2672a02886f 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -33,7 +33,6 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; @@ -128,8 +127,8 @@ public DataStreamSink produceIntoKafka(DataStream stream, String topic } @Override - public KafkaOffsetHandler createOffsetHandler(Properties props) { - return new KafkaOffsetHandlerImpl(props); + public KafkaOffsetHandler createOffsetHandler() { + return new KafkaOffsetHandlerImpl(); } @Override @@ -378,9 +377,9 @@ private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { private final CuratorFramework offsetClient; private final String groupId; - public KafkaOffsetHandlerImpl(Properties props) { + public KafkaOffsetHandlerImpl() { offsetClient = createCuratorClient(); - groupId = props.getProperty("group.id"); + groupId = standardProps.getProperty("group.id"); } @Override @@ -392,6 +391,15 @@ public Long getCommittedOffset(String topicName, int partition) { } } + @Override + public void setCommittedOffset(String topicName, int partition, long offset) { + try { + ZookeeperOffsetHandler.setOffsetInZooKeeper(offsetClient, groupId, topicName, partition, offset); + } catch (Exception e) { + throw new RuntimeException("Exception when writing offsets to Zookeeper", e); + } + } + @Override public void close() { offsetClient.close(); diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 2b816c4418d2c..9a61b912b9b88 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.HashMap; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -171,6 +172,7 @@ public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema d protected AbstractFetcher createFetcher( SourceContext sourceContext, List thisSubtaskPartitions, + HashMap restoredSnapshotState, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { @@ -180,6 +182,7 @@ public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema d return new Kafka09Fetcher<>( sourceContext, thisSubtaskPartitions, + restoredSnapshotState, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), @@ -191,6 +194,7 @@ public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema d deserializer, properties, pollTimeout, + startupMode, useMetrics); } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index d495327b5ee75..b7c9bc246cc7e 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; @@ -71,6 +72,7 @@ public class Kafka09Fetcher extends AbstractFetcher { public Kafka09Fetcher( SourceContext sourceContext, List assignedPartitions, + HashMap restoredSnapshotState, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, @@ -82,16 +84,19 @@ public Kafka09Fetcher( KeyedDeserializationSchema deserializer, Properties kafkaProperties, long pollTimeout, + StartupMode startupMode, boolean useMetrics) throws Exception { super( sourceContext, assignedPartitions, + restoredSnapshotState, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, + startupMode, useMetrics); this.deserializer = deserializer; @@ -114,6 +119,8 @@ public Kafka09Fetcher( createCallBridge(), getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, + startupMode, + isRestored, useMetrics); } @@ -141,7 +148,6 @@ public void runFetchLoop() throws Exception { records.records(partition.getKafkaPartitionHandle()); for (ConsumerRecord record : partitionRecords) { - final T value = deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset()); diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java index c17aae62d402e..a97b3cf44a67a 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java @@ -29,7 +29,7 @@ * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10, * for example changing {@code assign(List)} to {@code assign(Collection)}. * - * Because of that, we need to two versions whose compiled code goes against different method signatures. + * Because of that, we need to have two versions whose compiled code goes against different method signatures. * Even though the source of subclasses may look identical, the byte code will be different, because they * are compiled against different dependencies. */ @@ -38,4 +38,17 @@ public class KafkaConsumerCallBridge { public void assignPartitions(KafkaConsumer consumer, List topicPartitions) throws Exception { consumer.assign(topicPartitions); } + + public void seekPartitionsToBeginning(KafkaConsumer consumer, List partitions) { + for (TopicPartition partition : partitions) { + consumer.seekToBeginning(partition); + } + } + + public void seekPartitionsToEnd(KafkaConsumer consumer, List partitions) { + for (TopicPartition partition : partitions) { + consumer.seekToEnd(partition); + } + } + } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java index 9cfa8404b8703..03fe2c6d802c6 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka.internal; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -79,6 +80,12 @@ public class KafkaConsumerThread extends Thread { /** The maximum number of milliseconds to wait for a fetch batch */ private final long pollTimeout; + /** The configured startup mode (relevant only if we're restored from checkpoint / savepoint) */ + private final StartupMode startupMode; + + /** Flag whether or not we're restored from checkpoint / savepoint */ + private final boolean isRestored; + /** Flag whether to add Kafka's metrics to the Flink metrics */ private final boolean useMetrics; @@ -101,6 +108,8 @@ public KafkaConsumerThread( KafkaConsumerCallBridge consumerCallBridge, String threadName, long pollTimeout, + StartupMode startupMode, + boolean isRestored, boolean useMetrics) { super(threadName); @@ -109,9 +118,24 @@ public KafkaConsumerThread( this.log = checkNotNull(log); this.handover = checkNotNull(handover); this.kafkaProperties = checkNotNull(kafkaProperties); - this.subscribedPartitions = checkNotNull(subscribedPartitions); this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup); this.consumerCallBridge = checkNotNull(consumerCallBridge); + this.startupMode = checkNotNull(startupMode); + + this.subscribedPartitions = checkNotNull(subscribedPartitions); + this.isRestored = isRestored; + + // if we are restoring from a checkpoint / savepoint, all + // subscribed partitions' state should have defined offsets + if (isRestored) { + for (KafkaTopicPartitionState subscribedPartition : subscribedPartitions) { + if (!subscribedPartition.isOffsetDefined()) { + throw new IllegalArgumentException("Restoring from a checkpoint / savepoint, but found a " + + "partition state " + subscribedPartition + " that does not have a defined offset."); + } + } + } + this.pollTimeout = pollTimeout; this.useMetrics = useMetrics; @@ -171,28 +195,39 @@ public void run() { return; } - // seek the consumer to the initial offsets - for (KafkaTopicPartitionState partition : subscribedPartitions) { - if (partition.isOffsetDefined()) { - log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " + - "seeking the consumer to position {}", - partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); + if (isRestored) { + for (KafkaTopicPartitionState partition : subscribedPartitions) { + log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " + + "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } - else { - // for partitions that do not have offsets restored from a checkpoint/savepoint, - // we need to define our internal offset state for them using the initial offsets retrieved from Kafka - // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint - - long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); - - log.info("Partition {} has no initial offset; the consumer has position {}, " + - "so the initial offset will be set to {}", - partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1); + } else { + List partitionList = convertKafkaPartitions(subscribedPartitions); + + // fetch offsets from Kafka, depending on the configured startup mode + switch (startupMode) { + case EARLIEST: + log.info("Setting starting point as earliest offset for partitions {}", partitionList); + + consumerCallBridge.seekPartitionsToBeginning(consumer, partitionList); + break; + case LATEST: + log.info("Setting starting point as latest offset for partitions {}", partitionList); + + consumerCallBridge.seekPartitionsToEnd(consumer, partitionList); + break; + default: + case GROUP_OFFSETS: + log.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}", + kafkaProperties.getProperty("group.id"), partitionList); + } + // on startup, all partition states will not have defined offsets; + // set the initial states with the offsets fetched from Kafka + for (KafkaTopicPartitionState partition : subscribedPartitions) { // the fetched offset represents the next record to process, so we need to subtract it by 1 - partition.setOffset(fetchedOffset - 1); + partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); } } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java index 7a823656184b7..4526aa0d7749d 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internal.Handover; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; @@ -123,6 +124,7 @@ public Void answer(InvocationOnMock invocation) { final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( sourceContext, topics, + null, /* no restored state */ null, /* periodic watermark extractor */ null, /* punctuated watermark extractor */ new TestProcessingTimeService(), @@ -134,6 +136,7 @@ public Void answer(InvocationOnMock invocation) { schema, new Properties(), 0L, + StartupMode.GROUP_OFFSETS, false); // ----- run the fetcher ----- @@ -259,6 +262,7 @@ public Void answer(InvocationOnMock invocation) { final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( sourceContext, topics, + null, /* no restored state */ null, /* periodic watermark extractor */ null, /* punctuated watermark extractor */ new TestProcessingTimeService(), @@ -270,9 +274,9 @@ public Void answer(InvocationOnMock invocation) { schema, new Properties(), 0L, + StartupMode.GROUP_OFFSETS, false); - // ----- run the fetcher ----- final AtomicReference error = new AtomicReference<>(); @@ -374,6 +378,7 @@ public void testCancellationWhenEmitBlocks() throws Exception { final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( sourceContext, topics, + null, /* no restored state */ null, /* periodic watermark extractor */ null, /* punctuated watermark extractor */ new TestProcessingTimeService(), @@ -385,6 +390,7 @@ public void testCancellationWhenEmitBlocks() throws Exception { schema, new Properties(), 0L, + StartupMode.GROUP_OFFSETS, false); diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index d18e2a9a56361..6added7c1045d 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -110,6 +110,24 @@ public void testMetrics() throws Throwable { runMetricsTest(); } + // --- startup mode --- + + @Test(timeout = 60000) + public void testStartFromEarliestOffsets() throws Exception { + runStartFromEarliestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromLatestOffsets() throws Exception { + runStartFromLatestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromGroupOffsets() throws Exception { + runStartFromGroupOffsets(); + } + + // --- offset committing --- @Test(timeout = 60000) diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 1802e0c9b78fe..99c11c4b370a1 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -48,6 +48,8 @@ import java.io.File; import java.net.BindException; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -118,8 +120,8 @@ public DataStreamSink produceIntoKafka(DataStream stream, String topic } @Override - public KafkaOffsetHandler createOffsetHandler(Properties props) { - return new KafkaOffsetHandlerImpl(props); + public KafkaOffsetHandler createOffsetHandler() { + return new KafkaOffsetHandlerImpl(); } @Override @@ -420,7 +422,12 @@ private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { private final KafkaConsumer offsetClient; - public KafkaOffsetHandlerImpl(Properties props) { + public KafkaOffsetHandlerImpl() { + Properties props = new Properties(); + props.putAll(standardProps); + props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + offsetClient = new KafkaConsumer<>(props); } @@ -430,6 +437,13 @@ public Long getCommittedOffset(String topicName, int partition) { return (committed != null) ? committed.offset() : null; } + @Override + public void setCommittedOffset(String topicName, int partition, long offset) { + Map partitionAndOffset = new HashMap<>(); + partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset)); + offsetClient.commitSync(partitionAndOffset); + } + @Override public void close() { offsetClient.close(); diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 2918080cb2dc4..1121d1b34fdc1 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -36,6 +37,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; @@ -89,7 +91,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti protected final KeyedDeserializationSchema deserializer; /** The set of topic partitions that the source will read */ - protected List subscribedPartitions; + private List subscribedPartitions; /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, * to exploit per-partition timestamp characteristics. @@ -103,6 +105,9 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti private transient ListState> offsetsStateForCheckpoint; + /** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */ + protected StartupMode startupMode = StartupMode.GROUP_OFFSETS; + // ------------------------------------------------------------------------ // runtime state (used individually by each parallel subtask) // ------------------------------------------------------------------------ @@ -114,7 +119,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti private transient volatile AbstractFetcher kafkaFetcher; /** The offsets to restore to, if the consumer restores state from a checkpoint */ - private transient volatile HashMap restoreToOffset; + private transient volatile HashMap restoredState; /** Flag indicating whether the consumer is still running **/ private volatile boolean running = true; @@ -218,6 +223,41 @@ public FlinkKafkaConsumerBase assignTimestampsAndWatermarks(AssignerWithPerio } } + /** + * Specifies the consumer to start reading from the earliest offset for all partitions. + * This ignores any committed group offsets in Zookeeper / Kafka brokers. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase setStartFromEarliest() { + this.startupMode = StartupMode.EARLIEST; + return this; + } + + /** + * Specifies the consumer to start reading from the latest offset for all partitions. + * This ignores any committed group offsets in Zookeeper / Kafka brokers. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase setStartFromLatest() { + this.startupMode = StartupMode.LATEST; + return this; + } + + /** + * Specifies the consumer to start reading from any committed group offsets found + * in Zookeeper / Kafka brokers. The "group.id" property must be set in the configuration + * properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset" + * set in the configuration properties will be used for the partition. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase setStartFromGroupOffsets() { + this.startupMode = StartupMode.GROUP_OFFSETS; + return this; + } + // ------------------------------------------------------------------------ // Work methods // ------------------------------------------------------------------------ @@ -231,17 +271,12 @@ public void run(SourceContext sourceContext) throws Exception { // we need only do work, if we actually have partitions assigned if (!subscribedPartitions.isEmpty()) { - // (1) create the fetcher that will communicate with the Kafka brokers + // create the fetcher that will communicate with the Kafka brokers final AbstractFetcher fetcher = createFetcher( - sourceContext, subscribedPartitions, + sourceContext, subscribedPartitions, restoredState, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext()); - // (2) set the fetcher to the restored checkpoint offsets - if (restoreToOffset != null) { - fetcher.restoreOffsets(restoreToOffset); - } - // publish the reference, for snapshot-, commit-, and cancel calls // IMPORTANT: We can only do that now, because only now will calls to // the fetchers 'snapshotCurrentState()' method return at least @@ -321,18 +356,18 @@ public void initializeState(FunctionInitializationContext context) throws Except offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); if (context.isRestored()) { - if (restoreToOffset == null) { - restoreToOffset = new HashMap<>(); + if (restoredState == null) { + restoredState = new HashMap<>(); for (Tuple2 kafkaOffset : offsetsStateForCheckpoint.get()) { - restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + restoredState.put(kafkaOffset.f0, kafkaOffset.f1); } LOG.info("Setting restore state in the FlinkKafkaConsumer."); if (LOG.isDebugEnabled()) { - LOG.debug("Using the following offsets: {}", restoreToOffset); + LOG.debug("Using the following offsets: {}", restoredState); } - } else if (restoreToOffset.isEmpty()) { - restoreToOffset = null; + } else if (restoredState.isEmpty()) { + restoredState = null; } } else { LOG.info("No restore state for FlinkKafkaConsumer."); @@ -352,9 +387,9 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { // the fetcher has not yet been initialized, which means we need to return the // originally restored offsets or the assigned partitions - if (restoreToOffset != null) { + if (restoredState != null) { - for (Map.Entry kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { + for (Map.Entry kafkaTopicPartitionLongEntry : restoredState.entrySet()) { offsetsStateForCheckpoint.add( Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); } @@ -367,7 +402,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() - pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset); + pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState); } else { HashMap currentOffsets = fetcher.snapshotCurrentState(); @@ -393,11 +428,11 @@ public void restoreState(HashMap restoredOffsets) { LOG.info("{} (taskIdx={}) restoring offsets from an older version.", getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask()); - restoreToOffset = restoredOffsets; + restoredState = restoredOffsets; if (LOG.isDebugEnabled()) { LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}", - getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset); + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoredState); } } @@ -470,6 +505,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { protected abstract AbstractFetcher createFetcher( SourceContext sourceContext, List thisSubtaskPartitions, + HashMap restoredSnapshotState, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception; @@ -492,9 +528,9 @@ public TypeInformation getProducedType() { private void assignTopicPartitions(List kafkaTopicPartitions) { subscribedPartitions = new ArrayList<>(); - if (restoreToOffset != null) { + if (restoredState != null) { for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { - if (restoreToOffset.containsKey(kafkaTopicPartition)) { + if (restoredState.containsKey(kafkaTopicPartition)) { subscribedPartitions.add(kafkaTopicPartition); } } @@ -569,4 +605,14 @@ protected static void logPartitionInfo(Logger logger, List logger.info(sb.toString()); } + + @VisibleForTesting + List getSubscribedPartitions() { + return subscribedPartitions; + } + + @VisibleForTesting + HashMap getRestoredState() { + return restoredState; + } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java new file mode 100644 index 0000000000000..331c1a6768ff7 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.config; + +/** + * Startup modes for the Kafka Consumer. + */ +public enum StartupMode { + + /** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default) */ + GROUP_OFFSETS, + + /** Start from the earliest offset possible */ + EARLIEST, + + /** Start from the latest offset */ + LATEST + +} diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 821eb03abf311..b27e9961a150c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.util.SerializedValue; import java.io.IOException; @@ -67,9 +68,15 @@ public abstract class AbstractFetcher { /** The mode describing whether the fetcher also generates timestamps and watermarks */ protected final int timestampWatermarkMode; + /** The startup mode for the consumer (only relevant if the consumer wasn't restored) */ + protected final StartupMode startupMode; + /** Flag whether to register metrics for the fetcher */ protected final boolean useMetrics; + /** Flag whether or not the consumer state was restored from a checkpoint / savepoint */ + protected final boolean isRestored; + /** Only relevant for punctuated watermarks: The current cross partition watermark */ private volatile long maxWatermarkSoFar = Long.MIN_VALUE; @@ -78,15 +85,18 @@ public abstract class AbstractFetcher { protected AbstractFetcher( SourceContext sourceContext, List assignedPartitions, + HashMap restoredSnapshotState, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, + StartupMode startupMode, boolean useMetrics) throws Exception { this.sourceContext = checkNotNull(sourceContext); this.checkpointLock = sourceContext.getCheckpointLock(); + this.startupMode = checkNotNull(startupMode); this.useMetrics = useMetrics; // figure out what we watermark mode we will be using @@ -112,6 +122,18 @@ protected AbstractFetcher( timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, userCodeClassLoader); + + if (restoredSnapshotState != null) { + for (KafkaTopicPartitionState partition : allPartitions) { + Long offset = restoredSnapshotState.get(partition.getKafkaTopicPartition()); + if (offset != null) { + partition.setOffset(offset); + } + } + this.isRestored = true; + } else { + this.isRestored = false; + } // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == PERIODIC_WATERMARKS) { @@ -192,20 +214,6 @@ public HashMap snapshotCurrentState() { return state; } - /** - * Restores the partition offsets. - * - * @param snapshotState The offsets for the partitions - */ - public void restoreOffsets(Map snapshotState) { - for (KafkaTopicPartitionState partition : allPartitions) { - Long offset = snapshotState.get(partition.getKafkaTopicPartition()); - if (offset != null) { - partition.setOffset(offset); - } - } - } - // ------------------------------------------------------------------------ // emitting records // ------------------------------------------------------------------------ diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index fa931387647f8..38a3ce8e5fbe4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -17,14 +17,11 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; @@ -32,21 +29,14 @@ import org.apache.flink.util.SerializedValue; import org.junit.Assert; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import java.io.Serializable; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyMapOf; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; /** @@ -58,55 +48,13 @@ */ public class FlinkKafkaConsumerBaseMigrationTest { - private static String getResourceFilename(String filename) { - ClassLoader cl = FlinkKafkaConsumerBaseMigrationTest.class.getClassLoader(); - URL resource = cl.getResource(filename); - if (resource == null) { - throw new NullPointerException("Missing snapshot resource."); - } - return resource.getFile(); - } - + /** Test restoring from an legacy empty state, when no partitions could be found for topics. */ @Test public void testRestoreFromFlink11WithEmptyStateNoPartitions() throws Exception { - // -------------------------------------------------------------------- - // prepare fake states - // -------------------------------------------------------------------- + final DummyFlinkKafkaConsumer consumerFunction = + new DummyFlinkKafkaConsumer<>(Collections.emptyList()); - final OneShotLatch latch = new OneShotLatch(); - final AbstractFetcher fetcher = mock(AbstractFetcher.class); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - latch.trigger(); - Assert.fail("This should never be called"); - return null; - } - }).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class)); - - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - latch.trigger(); - Assert.fail("This should never be called"); - return null; - } - }).when(fetcher).runFetchLoop(); - - final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>( - new FetcherFactory() { - private static final long serialVersionUID = -2803131905656983619L; - - @Override - public AbstractFetcher createFetcher() { - return fetcher; - } - }, - Collections.emptyList()); - - StreamSource> consumerOperator = - new StreamSource<>(consumerFunction); + StreamSource> consumerOperator = new StreamSource<>(consumerFunction); final AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); @@ -114,88 +62,30 @@ public Void answer(InvocationOnMock invocation) throws Throwable { testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.setup(); + // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state")); testHarness.open(); - final Throwable[] error = new Throwable[1]; - - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - consumerFunction.run(new DummySourceContext() { - @Override - public void collect(String element) { - latch.trigger(); - Assert.fail("This should never be called."); - } - - @Override - public void emitWatermark(Watermark mark) { - latch.trigger(); - assertEquals(Long.MAX_VALUE, mark.getTimestamp()); - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); + // assert that no partitions were found and is empty + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); + Assert.assertTrue(consumerFunction.getSubscribedPartitions().isEmpty()); - if (!latch.isTriggered()) { - latch.await(); - } + // assert that no state was restored + Assert.assertTrue(consumerFunction.getRestoredState() == null); consumerOperator.close(); - consumerOperator.cancel(); - runner.interrupt(); - runner.join(); - - Assert.assertNull(error[0]); } + /** Test restoring from an empty state taken using Flink 1.1, when some partitions could be found for topics. */ @Test public void testRestoreFromFlink11WithEmptyStateWithPartitions() throws Exception { - final OneShotLatch latch = new OneShotLatch(); - final AbstractFetcher fetcher = mock(AbstractFetcher.class); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - latch.trigger(); - Assert.fail("This should never be called"); - return null; - } - }).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class)); - - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - latch.trigger(); - return null; - } - }).when(fetcher).runFetchLoop(); - final List partitions = new ArrayList<>(); partitions.add(new KafkaTopicPartition("abc", 13)); partitions.add(new KafkaTopicPartition("def", 7)); - final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>( - new FetcherFactory() { - private static final long serialVersionUID = -2803131905656983619L; - - @Override - public AbstractFetcher createFetcher() { - return fetcher; - } - }, - partitions); + final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); StreamSource> consumerOperator = new StreamSource<>(consumerFunction); @@ -206,89 +96,31 @@ public Void answer(InvocationOnMock invocation) throws Throwable { testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.setup(); + // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state")); testHarness.open(); - final Throwable[] error = new Throwable[1]; - - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - consumerFunction.run(new DummySourceContext() { - @Override - public void collect(String element) { - latch.trigger(); - Assert.fail("This should never be called."); - } - - @Override - public void emitWatermark(Watermark mark) { - latch.trigger(); - Assert.fail("This should never be called."); - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); + // assert that there are partitions and is identical to expected list + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); + Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty()); + Assert.assertTrue(consumerFunction.getSubscribedPartitions().equals(partitions)); - if (!latch.isTriggered()) { - latch.await(); - } + // assert that no state was restored + Assert.assertTrue(consumerFunction.getRestoredState() == null); consumerOperator.close(); - - runner.join(); - - Assert.assertNull(error[0]); + consumerOperator.cancel(); } + /** Test restoring from a non-empty state taken using Flink 1.1, when some partitions could be found for topics. */ @Test public void testRestoreFromFlink11() throws Exception { - // -------------------------------------------------------------------- - // prepare fake states - // -------------------------------------------------------------------- - - final HashMap state1 = new HashMap<>(); - state1.put(new KafkaTopicPartition("abc", 13), 16768L); - state1.put(new KafkaTopicPartition("def", 7), 987654321L); - - final OneShotLatch latch = new OneShotLatch(); - final AbstractFetcher fetcher = mock(AbstractFetcher.class); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Map map = (HashMap) invocationOnMock.getArguments()[0]; - - latch.trigger(); - assertEquals(state1, map); - return null; - } - }).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class)); - - final List partitions = new ArrayList<>(); partitions.add(new KafkaTopicPartition("abc", 13)); partitions.add(new KafkaTopicPartition("def", 7)); - final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>( - new FetcherFactory() { - private static final long serialVersionUID = -2803131905656983619L; - - @Override - public AbstractFetcher createFetcher() { - return fetcher; - } - }, - partitions); + final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); StreamSource> consumerOperator = new StreamSource<>(consumerFunction); @@ -299,91 +131,60 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.setup(); + // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot")); testHarness.open(); - final Throwable[] error = new Throwable[1]; + // assert that there are partitions and is identical to expected list + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); + Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty()); + Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions()); - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - consumerFunction.run(new DummySourceContext() { - @Override - public void collect(String element) { - //latch.trigger(); - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); + // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot" + final HashMap expectedState = new HashMap<>(); + expectedState.put(new KafkaTopicPartition("abc", 13), 16768L); + expectedState.put(new KafkaTopicPartition("def", 7), 987654321L); - if (!latch.isTriggered()) { - latch.await(); - } + // assert that state is correctly restored from legacy checkpoint + Assert.assertTrue(consumerFunction.getRestoredState() != null); + Assert.assertEquals(expectedState, consumerFunction.getRestoredState()); consumerOperator.close(); - - runner.join(); - - Assert.assertNull(error[0]); - } - - private abstract static class DummySourceContext - implements SourceFunction.SourceContext { - - private final Object lock = new Object(); - - @Override - public void collectWithTimestamp(String element, long timestamp) { - } - - @Override - public void emitWatermark(Watermark mark) { - } - - @Override - public Object getCheckpointLock() { - return lock; - } - - @Override - public void close() { - } + consumerOperator.cancel(); } // ------------------------------------------------------------------------ - private interface FetcherFactory extends Serializable { - AbstractFetcher createFetcher(); + private static String getResourceFilename(String filename) { + ClassLoader cl = FlinkKafkaConsumerBaseMigrationTest.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); } private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase { private static final long serialVersionUID = 1L; - private final FetcherFactory fetcherFactory; - private final List partitions; @SuppressWarnings("unchecked") - DummyFlinkKafkaConsumer( - FetcherFactory fetcherFactory, - List partitions) { + DummyFlinkKafkaConsumer(List partitions) { super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class)); - this.fetcherFactory = fetcherFactory; this.partitions = partitions; } @Override - protected AbstractFetcher createFetcher(SourceContext sourceContext, List thisSubtaskPartitions, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { - return fetcherFactory.createFetcher(); + protected AbstractFetcher createFetcher( + SourceContext sourceContext, + List thisSubtaskPartitions, + HashMap restoredSnapshotState, + SerializedValue> watermarksPeriodic, + SerializedValue> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + return mock(AbstractFetcher.class); } @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index b96ba30e03595..980a025f4954c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -361,16 +361,19 @@ public DummyFlinkKafkaConsumer() { } @Override - protected AbstractFetcher createFetcher(SourceContext sourceContext, List thisSubtaskPartitions, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { - AbstractFetcher fetcher = mock(AbstractFetcher.class); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Assert.fail("Trying to restore offsets even though there was no restore state."); - return null; - } - }).when(fetcher).restoreOffsets(any(HashMap.class)); - return fetcher; + @SuppressWarnings("unchecked") + protected AbstractFetcher createFetcher( + SourceContext sourceContext, + List thisSubtaskPartitions, + HashMap restoredSnapshotState, + SerializedValue> watermarksPeriodic, + SerializedValue> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + if (restoredSnapshotState != null) { + Assert.fail("Trying to restore offsets even though there was no restore state."); + return null; + } + return mock(AbstractFetcher.class); } @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index d7fab8830cf71..cb8b0d0e5cf4e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -61,6 +61,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; @@ -234,7 +235,7 @@ public void run() { final Long l50 = 50L; // the final committed offset in Kafka should be 50 final long deadline = 30000 + System.currentTimeMillis(); - KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); do { Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); @@ -281,7 +282,7 @@ public void runStartFromKafkaCommitOffsets() throws Exception { final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1); - KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); Long o1; Long o2; @@ -348,7 +349,7 @@ public Object map(String value) throws Exception { (o3 != null) ? o3.intValue() : 0 )); - readSequence(env2, standardProps, topicName, partitionsToValuesCountAndStartOffset); + readSequence(env2, StartupMode.GROUP_OFFSETS, standardProps, topicName, partitionsToValuesCountAndStartOffset); kafkaOffsetHandler.close(); deleteTestTopic(topicName); @@ -402,7 +403,7 @@ public void run() { }; runner.start(); - KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); final Long l50 = 50L; // the final committed offset in Kafka should be 50 final long deadline = 30000 + System.currentTimeMillis(); @@ -438,6 +439,217 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 50 records written to each of 3 partitions before launching a latest-starting consuming job + final int parallelism = 3; + final int recordsInEachPartition = 50; + + // each partition will be written an extra 200 records + final int extraRecordsInEachPartition = 200; + + // all already existing data in the topic, before the consuming topology has started, should be ignored + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + // job names for the topologies for writing and consuming the extra records + final String consumeExtraRecordsJobName = "Consume Extra Records Job"; + final String writeExtraRecordsJobName = "Write Extra Records Job"; + + // seriliazation / deserialization schemas for writing and consuming the extra records + final TypeInformation> resultType = + TypeInformation.of(new TypeHint>() {}); + + final KeyedSerializationSchema> serSchema = + new KeyedSerializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + final KeyedDeserializationSchema> deserSchema = + new KeyedDeserializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + // setup and run the latest-consuming job + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + FlinkKafkaConsumerBase> latestReadingConsumer = + kafkaServer.getConsumer(topicName, deserSchema, readProps); + latestReadingConsumer.setStartFromLatest(); + + env + .addSource(latestReadingConsumer).setParallelism(parallelism) + .flatMap(new FlatMapFunction, Object>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception { + if (value.f1 - recordsInEachPartition < 0) { + throw new RuntimeException("test failed; consumed a record that was previously written: " + value); + } + } + }).setParallelism(1) + .addSink(new DiscardingSink<>()); + + final AtomicReference error = new AtomicReference<>(); + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + env.execute(consumeExtraRecordsJobName); + } catch (Throwable t) { + if (!(t.getCause() instanceof JobCancellationException)) { + error.set(t); + } + } + } + }); + consumeThread.start(); + + // wait until the consuming job has started, to be extra safe + JobManagerCommunicationUtils.waitUntilJobIsRunning( + flink.getLeaderGateway(timeout), + consumeExtraRecordsJobName); + + // setup the extra records writing job + final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + + DataStream> extraRecordsStream = env2 + .addSource(new RichParallelSourceFunction>() { + + private boolean running = true; + + @Override + public void run(SourceContext> ctx) throws Exception { + int count = recordsInEachPartition; // the extra records should start from the last written value + int partition = getRuntimeContext().getIndexOfThisSubtask(); + + while (running && count < recordsInEachPartition + extraRecordsInEachPartition) { + ctx.collect(new Tuple2<>(partition, count)); + count++; + } + } + + @Override + public void cancel() { + running = false; + } + }).setParallelism(parallelism); + + kafkaServer.produceIntoKafka(extraRecordsStream, topicName, serSchema, readProps, null); + + try { + env2.execute(writeExtraRecordsJobName); + } + catch (Exception e) { + throw new RuntimeException("Writing extra records failed", e); + } + + // cancel the consume job after all extra records are written + JobManagerCommunicationUtils.cancelCurrentJob( + flink.getLeaderGateway(timeout), + consumeExtraRecordsJobName); + consumeThread.join(); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + + // check whether the consuming thread threw any test errors; + // test will fail here if the consume job had incorrectly read any records other than the extra records + final Throwable consumerError = error.get(); + if (consumerError != null) { + throw new Exception("Exception in the consuming thread", consumerError); + } + } + + /** + * This test ensures that the consumer correctly uses group offsets in Kafka, and defaults to "auto.offset.reset" + * behaviour when necessary, when explicitly configured to start from group offsets. + * + * The partitions and their committed group offsets are setup as: + * partition 0 --> committed offset 23 + * partition 1 --> no commit offset + * partition 2 --> committed offset 43 + * + * When configured to start from group offsets, each partition should read: + * partition 0 --> start from offset 23, read to offset 49 (27 records) + * partition 1 --> default to "auto.offset.reset" (set to earliest), so start from offset 0, read to offset 49 (50 records) + * partition 2 --> start from offset 43, read to offset 49 (7 records) + */ + public void runStartFromGroupOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromGroupOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); + + // the committed group offsets should be used as starting points + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + + // only partitions 0 and 2 have group offsets committed + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Map> partitionsToValueCountAndStartOffsets = new HashMap<>(); + partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(27, 23)); // partition 0 should read offset 23-49 + partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 0)); // partition 1 should read offset 0-49 + partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 43)); // partition 2 should read offset 43-49 + + readSequence(env, StartupMode.GROUP_OFFSETS, readProps, topicName, partitionsToValueCountAndStartOffsets); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } /** * Ensure Kafka is working on both producer and consumer side. @@ -1014,27 +1226,6 @@ public void flatMap(Tuple3 value, Collector o } } - /** - * Serialization scheme forwarding byte[] records. - */ - private static class ByteArraySerializationSchema implements KeyedSerializationSchema { - - @Override - public byte[] serializeKey(byte[] element) { - return null; - } - - @Override - public byte[] serializeValue(byte[] element) { - return element; - } - - @Override - public String getTargetTopic(byte[] element) { - return null; - } - } - private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema>, KeyedSerializationSchema> { @@ -1588,7 +1779,9 @@ public TypeInformation> getProducedType() { * The method allows to individually specify the expected starting offset and total read value count of each partition. * The job will be considered successful only if all partition read results match the start offset and value count criteria. */ - protected void readSequence(StreamExecutionEnvironment env, Properties cc, + protected void readSequence(final StreamExecutionEnvironment env, + final StartupMode startupMode, + final Properties cc, final String topicName, final Map> partitionsToValuesCountAndStartOffset) throws Exception { final int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size(); @@ -1607,6 +1800,17 @@ protected void readSequence(StreamExecutionEnvironment env, Properties cc, // create the consumer cc.putAll(secureProps); FlinkKafkaConsumerBase> consumer = kafkaServer.getConsumer(topicName, deser, cc); + switch (startupMode) { + case EARLIEST: + consumer.setStartFromEarliest(); + break; + case LATEST: + consumer.setStartFromLatest(); + break; + case GROUP_OFFSETS: + consumer.setStartFromGroupOffsets(); + break; + } DataStream> source = env .addSource(consumer).setParallelism(sourceParallelism) @@ -1670,18 +1874,21 @@ public void flatMap(Tuple2 value, Collector out) thro } /** - * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, Properties, String, Map)} to + * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Properties, String, Map)} to * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic. */ - protected void readSequence(StreamExecutionEnvironment env, Properties cc, + protected void readSequence(final StreamExecutionEnvironment env, + final StartupMode startupMode, + final Properties cc, final int sourceParallelism, final String topicName, - final int valuesCount, final int startFrom) throws Exception { + final int valuesCount, + final int startFrom) throws Exception { HashMap> partitionsToValuesCountAndStartOffset = new HashMap<>(); for (int i = 0; i < sourceParallelism; i++) { partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom)); } - readSequence(env, cc, topicName, partitionsToValuesCountAndStartOffset); + readSequence(env, startupMode, cc, topicName, partitionsToValuesCountAndStartOffset); } protected String writeSequence( diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index dccf698d85884..6a1f702475111 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -252,7 +252,6 @@ public void runFailOnAutoOffsetResetNone() throws Exception { try { env.execute("Test auto offset reset none"); } catch(Throwable e) { - System.out.println("MESSAGE: " + e.getCause().getCause().getMessage()); // check if correct exception has been thrown if(!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8 && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9 diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 10c7b86f2b56b..7f2a816717fc0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -94,10 +94,11 @@ public abstract DataStreamSink produceIntoKafka(DataStream stream, Str public interface KafkaOffsetHandler { Long getCommittedOffset(String topicName, int partition); + void setCommittedOffset(String topicName, int partition, long offset); void close(); } - public abstract KafkaOffsetHandler createOffsetHandler(Properties props); + public abstract KafkaOffsetHandler createOffsetHandler(); // -- leader failure simulation diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index 0b3507a39751c..f2091f0b27171 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -31,6 +32,7 @@ import javax.annotation.Nullable; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,6 +56,7 @@ public void testPunctuatedWatermarks() throws Exception { TestFetcher fetcher = new TestFetcher<>( sourceContext, originalPartitions, + null, null, /* periodic watermark assigner */ new SerializedValue>(new PunctuatedTestExtractor()), processingTimeProvider, @@ -128,6 +131,7 @@ public void testPeriodicWatermarks() throws Exception { TestFetcher fetcher = new TestFetcher<>( sourceContext, originalPartitions, + null, new SerializedValue>(new PeriodicTestExtractor()), null, /* punctuated watermarks assigner*/ processingTimeService, @@ -199,12 +203,23 @@ private static final class TestFetcher extends AbstractFetcher { protected TestFetcher( SourceContext sourceContext, List assignedPartitions, + HashMap restoredSnapshotState, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval) throws Exception { - super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false); + super( + sourceContext, + assignedPartitions, + restoredSnapshotState, + watermarksPeriodic, + watermarksPunctuated, + processingTimeProvider, + autoWatermarkInterval, + TestFetcher.class.getClassLoader(), + StartupMode.LATEST, + false); } @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java index acdad5a2613bd..131325f4237f9 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java @@ -34,7 +34,6 @@ public class JobManagerCommunicationUtils { private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); - public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception { while (true) { // find the jobID @@ -53,6 +52,32 @@ public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Excep } } + public static void waitUntilJobIsRunning(ActorGateway jobManager, String name) throws Exception { + while (true) { + Future listResponse = jobManager.ask( + JobManagerMessages.getRequestRunningJobsStatus(), + askTimeout); + + List jobs; + try { + Object result = Await.result(listResponse, askTimeout); + jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); + } + catch (Exception e) { + throw new Exception("Could not wait for job to start - failed to retrieve running jobs from the JobManager.", e); + } + + // see if the running jobs contain the requested job + for (JobStatusMessage job : jobs) { + if (job.getJobName().equals(name)) { + return; + } + } + + Thread.sleep(50); + } + } + public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { cancelCurrentJob(jobManager, null); }