Skip to content

Commit

Permalink
[FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determinati…
Browse files Browse the repository at this point in the history
…on to open()

This closes #3378.
  • Loading branch information
tzulitai committed Feb 27, 2017
1 parent 72f56d1 commit ed68fed
Show file tree
Hide file tree
Showing 21 changed files with 510 additions and 473 deletions.
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.util.SerializedValue;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -128,8 +128,7 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
List<KafkaTopicPartition> thisSubtaskPartitions,
HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception {
Expand All @@ -138,8 +137,7 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>

return new Kafka010Fetcher<>(
sourceContext,
thisSubtaskPartitions,
restoredSnapshotState,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
Expand All @@ -151,7 +149,6 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
deserializer,
properties,
pollTimeout,
startupMode,
useMetrics);
}
}
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
Expand All @@ -32,8 +31,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
Expand All @@ -48,8 +46,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {

public Kafka010Fetcher(
SourceContext<T> sourceContext,
List<KafkaTopicPartition> assignedPartitions,
HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
Expand All @@ -61,13 +58,11 @@ public Kafka010Fetcher(
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
StartupMode startupMode,
boolean useMetrics) throws Exception
{
super(
sourceContext,
assignedPartitions,
restoredSnapshotState,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
Expand All @@ -79,7 +74,6 @@ public Kafka010Fetcher(
deserializer,
kafkaProperties,
pollTimeout,
startupMode,
useMetrics);
}

Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.List;

/**
Expand All @@ -39,12 +40,12 @@ public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition>
}

@Override
public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
consumer.seekToBeginning(Collections.singletonList(partition));
}

@Override
public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
consumer.seekToEnd(partitions);
public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
consumer.seekToEnd(Collections.singletonList(partition));
}
}
Expand Up @@ -24,10 +24,10 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
Expand Down Expand Up @@ -118,13 +118,13 @@ public Void answer(InvocationOnMock invocation) {

@SuppressWarnings("unchecked")
SourceContext<String> sourceContext = mock(SourceContext.class);
List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());

final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
topics,
null, /* no restored state */
partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
new TestProcessingTimeService(),
Expand All @@ -136,7 +136,6 @@ public Void answer(InvocationOnMock invocation) {
schema,
new Properties(),
0L,
StartupMode.GROUP_OFFSETS,
false);

// ----- run the fetcher -----
Expand Down Expand Up @@ -256,13 +255,13 @@ public Void answer(InvocationOnMock invocation) {

@SuppressWarnings("unchecked")
SourceContext<String> sourceContext = mock(SourceContext.class);
List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());

final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
topics,
null, /* no restored state */
partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
new TestProcessingTimeService(),
Expand All @@ -274,7 +273,6 @@ public Void answer(InvocationOnMock invocation) {
schema,
new Properties(),
0L,
StartupMode.GROUP_OFFSETS,
false);

// ----- run the fetcher -----
Expand Down Expand Up @@ -372,13 +370,13 @@ public void testCancellationWhenEmitBlocks() throws Exception {
// ----- build a fetcher -----

BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());

final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
topics,
null, /* no restored state */
partitionsWithInitialOffsets,
null, /* periodic watermark extractor */
null, /* punctuated watermark extractor */
new TestProcessingTimeService(),
Expand All @@ -390,7 +388,6 @@ public void testCancellationWhenEmitBlocks() throws Exception {
schema,
new Properties(),
0L,
StartupMode.GROUP_OFFSETS,
false);

// ----- run the fetcher -----
Expand Down
Expand Up @@ -45,10 +45,10 @@
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

Expand Down Expand Up @@ -194,19 +194,23 @@ public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> d
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
List<KafkaTopicPartition> thisSubtaskPartitions,
HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception {

boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));

return new Kafka08Fetcher<>(sourceContext,
thisSubtaskPartitions, restoredSnapshotState,
watermarksPeriodic, watermarksPunctuated,
runtimeContext, deserializer, kafkaProperties,
autoCommitInterval, startupMode, useMetrics);
return new Kafka08Fetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext,
deserializer,
kafkaProperties,
autoCommitInterval,
useMetrics);
}

@Override
Expand Down
Expand Up @@ -91,27 +91,23 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {

public Kafka08Fetcher(
SourceContext<T> sourceContext,
List<KafkaTopicPartition> assignedPartitions,
HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long autoCommitInterval,
StartupMode startupMode,
boolean useMetrics) throws Exception
{
super(
sourceContext,
assignedPartitions,
restoredSnapshotState,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
startupMode,
useMetrics);

this.deserializer = checkNotNull(deserializer);
Expand All @@ -122,7 +118,7 @@ public Kafka08Fetcher(
this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();

// initially, all these partitions are not assigned to a specific broker connection
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
unassignedPartitionsQueue.add(partition);
}
}
Expand All @@ -146,43 +142,32 @@ public void runFetchLoop() throws Exception {
PeriodicOffsetCommitter periodicCommitter = null;
try {

// if we're not restored from a checkpoint, all partitions will not have their offset set;
// depending on the configured startup mode, accordingly set the starting offsets
if (!isRestored) {
switch (startupMode) {
case EARLIEST:
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
partition.setOffset(OffsetRequest.EarliestTime());
}
break;
case LATEST:
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
partition.setOffset(OffsetRequest.LatestTime());
}
break;
default:
case GROUP_OFFSETS:
List<KafkaTopicPartition> partitions = new ArrayList<>(subscribedPartitions().length);
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
partitions.add(partition.getKafkaTopicPartition());
}

Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitions);
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
// the committed offset in ZK represents the next record to process,
// so we subtract it by 1 to correctly represent internal state
partition.setOffset(offset - 1);
} else {
// if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
// we default to "auto.offset.reset" like the Kafka high-level consumer
LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
" resetting starting offset to 'auto.offset.reset'", partition);

partition.setOffset(invalidOffsetBehavior);
}
}
// offsets in the state may still be placeholder sentinel values if we are starting fresh, or the
// checkpoint / savepoint state we were restored with had not completely been replaced with actual offset
// values yet; replace those with actual offsets, according to what the sentinel value represent.
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
// this will be replaced by an actual offset in SimpleConsumerThread
partition.setOffset(OffsetRequest.EarliestTime());
} else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
// this will be replaced by an actual offset in SimpleConsumerThread
partition.setOffset(OffsetRequest.LatestTime());
} else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
if (committedOffset != null) {
// the committed offset in ZK represents the next record to process,
// so we subtract it by 1 to correctly represent internal state
partition.setOffset(committedOffset - 1);
} else {
// if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
// we default to "auto.offset.reset" like the Kafka high-level consumer
LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
" resetting starting offset to 'auto.offset.reset'", partition);

partition.setOffset(invalidOffsetBehavior);
}
} else {
// the partition already has a specific start offset and is ready to be consumed
}
}

Expand All @@ -191,7 +176,7 @@ public void runFetchLoop() throws Exception {
LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval);

periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler,
subscribedPartitions(), errorHandler, autoCommitInterval);
subscribedPartitionStates(), errorHandler, autoCommitInterval);
periodicCommitter.setName("Periodic Kafka partition offset committer");
periodicCommitter.setDaemon(true);
periodicCommitter.start();
Expand Down Expand Up @@ -388,7 +373,7 @@ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
}

// Set committed offsets in topic partition state
KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions();
KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
Expand Down

0 comments on commit ed68fed

Please sign in to comment.