Skip to content

Commit

Permalink
[FLINK-4280] [kafka] Explicit start position configuration for FlinkK…
Browse files Browse the repository at this point in the history
…afkaConsumer

This closes #2509.
  • Loading branch information
tzulitai committed Feb 15, 2017
1 parent 6c310a7 commit 7477c5b
Show file tree
Hide file tree
Showing 29 changed files with 944 additions and 491 deletions.
55 changes: 54 additions & 1 deletion docs/dev/connectors/kafka.md
Expand Up @@ -164,11 +164,64 @@ For convenience, Flink provides the following schemas:
The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
an optional "metadata" field that exposes the offset/partition/topic for this message. an optional "metadata" field that exposes the offset/partition/topic for this message.


### Kafka Consumers Start Position Configuration

The Flink Kafka Consumer allows configuring how the start position for Kafka
partitions are determined.

Example:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour

DataStream<String> stream = env.addSource(myConsumer);
...
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromGroupOffsets() // the default behaviour

val stream = env.addSource(myConsumer)
...
{% endhighlight %}
</div>
</div>

All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position.

* `setStartFromGroupOffsets` (default behaviour): Start reading partitions from
the consumer group's (`group.id` setting in the consumer properties) committed
offsets in Kafka brokers (or Zookeeper for Kafka 0.8). If offsets could not be
found for a partition, the `auto.offset.reset` setting in the properties will be used.
* `setStartFromEarliest()` / `setStartFromLatest()`: Start from the earliest / latest
record. Under these modes, committed offsets in Kafka will be ignored and
not used as starting positions.

Note that these settings do not affect the start position when the job is
automatically restored from a failure or manually restored using a savepoint.
On restore, the start position of each Kafka partition is determined by the
offsets stored in the savepoint or checkpoint
(please see the next section for information about checkpointing to enable
fault tolerance for the consumer).

### Kafka Consumers and Fault Tolerance ### Kafka Consumers and Fault Tolerance


With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all
its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore
the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were
stored in the checkpoint. stored in the checkpoint.


The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure. The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.util.SerializedValue; import org.apache.flink.util.SerializedValue;


import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;


Expand Down Expand Up @@ -128,6 +129,7 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
protected AbstractFetcher<T, ?> createFetcher( protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext, SourceContext<T> sourceContext,
List<KafkaTopicPartition> thisSubtaskPartitions, List<KafkaTopicPartition> thisSubtaskPartitions,
HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception { StreamingRuntimeContext runtimeContext) throws Exception {
Expand All @@ -137,6 +139,7 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
return new Kafka010Fetcher<>( return new Kafka010Fetcher<>(
sourceContext, sourceContext,
thisSubtaskPartitions, thisSubtaskPartitions,
restoredSnapshotState,
watermarksPeriodic, watermarksPeriodic,
watermarksPunctuated, watermarksPunctuated,
runtimeContext.getProcessingTimeService(), runtimeContext.getProcessingTimeService(),
Expand All @@ -148,6 +151,7 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
deserializer, deserializer,
properties, properties,
pollTimeout, pollTimeout,
startupMode,
useMetrics); useMetrics);
} }
} }
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
Expand All @@ -31,6 +32,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;


import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;


Expand All @@ -47,6 +49,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
public Kafka010Fetcher( public Kafka010Fetcher(
SourceContext<T> sourceContext, SourceContext<T> sourceContext,
List<KafkaTopicPartition> assignedPartitions, List<KafkaTopicPartition> assignedPartitions,
HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider, ProcessingTimeService processingTimeProvider,
Expand All @@ -58,11 +61,13 @@ public Kafka010Fetcher(
KeyedDeserializationSchema<T> deserializer, KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties, Properties kafkaProperties,
long pollTimeout, long pollTimeout,
StartupMode startupMode,
boolean useMetrics) throws Exception boolean useMetrics) throws Exception
{ {
super( super(
sourceContext, sourceContext,
assignedPartitions, assignedPartitions,
restoredSnapshotState,
watermarksPeriodic, watermarksPeriodic,
watermarksPunctuated, watermarksPunctuated,
processingTimeProvider, processingTimeProvider,
Expand All @@ -74,6 +79,7 @@ public Kafka010Fetcher(
deserializer, deserializer,
kafkaProperties, kafkaProperties,
pollTimeout, pollTimeout,
startupMode,
useMetrics); useMetrics);
} }


Expand Down
Expand Up @@ -37,4 +37,14 @@ public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception { public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
consumer.assign(topicPartitions); consumer.assign(topicPartitions);
} }

@Override
public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}

@Override
public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
consumer.seekToEnd(partitions);
}
} }
Expand Up @@ -22,9 +22,9 @@
import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internal.Handover; import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
Expand Down Expand Up @@ -122,20 +122,22 @@ public Void answer(InvocationOnMock invocation) {
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());


final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext, sourceContext,
topics, topics,
null, /* periodic assigner */ null, /* no restored state */
null, /* punctuated assigner */ null, /* periodic assigner */
new TestProcessingTimeService(), null, /* punctuated assigner */
10, new TestProcessingTimeService(),
getClass().getClassLoader(), 10,
false, /* checkpointing */ getClass().getClassLoader(),
"taskname-with-subtask", false, /* checkpointing */
new UnregisteredMetricsGroup(), "taskname-with-subtask",
schema, new UnregisteredMetricsGroup(),
new Properties(), schema,
0L, new Properties(),
false); 0L,
StartupMode.GROUP_OFFSETS,
false);


// ----- run the fetcher ----- // ----- run the fetcher -----


Expand Down Expand Up @@ -256,23 +258,24 @@ public Void answer(InvocationOnMock invocation) {
SourceContext<String> sourceContext = mock(SourceContext.class); SourceContext<String> sourceContext = mock(SourceContext.class);
List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);


final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext, sourceContext,
topics, topics,
null, /* periodic assigner */ null, /* no restored state */
null, /* punctuated assigner */ null, /* periodic assigner */
new TestProcessingTimeService(), null, /* punctuated assigner */
10, new TestProcessingTimeService(),
getClass().getClassLoader(), 10,
false, /* checkpointing */ getClass().getClassLoader(),
"taskname-with-subtask", false, /* checkpointing */
new UnregisteredMetricsGroup(), "taskname-with-subtask",
schema, new UnregisteredMetricsGroup(),
new Properties(), schema,
0L, new Properties(),
false); 0L,
StartupMode.GROUP_OFFSETS,
false);




// ----- run the fetcher ----- // ----- run the fetcher -----
Expand Down Expand Up @@ -374,20 +377,22 @@ public void testCancellationWhenEmitBlocks() throws Exception {
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());


final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext, sourceContext,
topics, topics,
null, /* periodic watermark extractor */ null, /* no restored state */
null, /* punctuated watermark extractor */ null, /* periodic watermark extractor */
new TestProcessingTimeService(), null, /* punctuated watermark extractor */
10, /* watermark interval */ new TestProcessingTimeService(),
this.getClass().getClassLoader(), 10, /* watermark interval */
true, /* checkpointing */ this.getClass().getClassLoader(),
"task_name", true, /* checkpointing */
new UnregisteredMetricsGroup(), "task_name",
schema, new UnregisteredMetricsGroup(),
new Properties(), schema,
0L, new Properties(),
false); 0L,
StartupMode.GROUP_OFFSETS,
false);




// ----- run the fetcher ----- // ----- run the fetcher -----
Expand Down
Expand Up @@ -51,7 +51,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
// Suite of Tests // Suite of Tests
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------



@Test(timeout = 60000) @Test(timeout = 60000)
public void testFailOnNoBroker() throws Exception { public void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest(); runFailOnNoBrokerTest();
Expand Down Expand Up @@ -131,6 +130,24 @@ public void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest(); runEndOfStreamTest();
} }


// --- startup mode ---

@Test(timeout = 60000)
public void testStartFromEarliestOffsets() throws Exception {
runStartFromEarliestOffsets();
}

@Test(timeout = 60000)
public void testStartFromLatestOffsets() throws Exception {
runStartFromLatestOffsets();
}

@Test(timeout = 60000)
public void testStartFromGroupOffsets() throws Exception {
runStartFromGroupOffsets();
}


// --- offset committing --- // --- offset committing ---


@Test(timeout = 60000) @Test(timeout = 60000)
Expand Down
Expand Up @@ -47,6 +47,8 @@
import java.net.BindException; import java.net.BindException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;


Expand Down Expand Up @@ -129,8 +131,8 @@ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic
} }


@Override @Override
public KafkaOffsetHandler createOffsetHandler(Properties props) { public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl(props); return new KafkaOffsetHandlerImpl();
} }


@Override @Override
Expand Down Expand Up @@ -401,7 +403,12 @@ private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {


private final KafkaConsumer<byte[], byte[]> offsetClient; private final KafkaConsumer<byte[], byte[]> offsetClient;


public KafkaOffsetHandlerImpl(Properties props) { public KafkaOffsetHandlerImpl() {
Properties props = new Properties();
props.putAll(standardProps);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

offsetClient = new KafkaConsumer<>(props); offsetClient = new KafkaConsumer<>(props);
} }


Expand All @@ -411,6 +418,13 @@ public Long getCommittedOffset(String topicName, int partition) {
return (committed != null) ? committed.offset() : null; return (committed != null) ? committed.offset() : null;
} }


@Override
public void setCommittedOffset(String topicName, int partition, long offset) {
Map<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<>();
partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset));
offsetClient.commitSync(partitionAndOffset);
}

@Override @Override
public void close() { public void close() {
offsetClient.close(); offsetClient.close();
Expand Down

0 comments on commit 7477c5b

Please sign in to comment.