From 206b66b39174964aa0a4d811b1ebd84b3816a1ce Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Tue, 31 Oct 2017 15:38:32 +0100 Subject: [PATCH] [FLINK-7732][kafka-consumer] Do not commit to kafka Flink's sentinel offsets --- .../kafka/internals/Kafka08Fetcher.java | 2 +- .../kafka/internal/Kafka09Fetcher.java | 6 ++- .../kafka/internals/AbstractFetcher.java | 18 ++++++- .../KafkaTopicPartitionStateSentinel.java | 3 ++ .../kafka/internals/AbstractFetcherTest.java | 48 ++++++++++++++++++- 5 files changed, 72 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index 7359e914cf0af..8bcd663d45ddf 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -348,7 +348,7 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti // ------------------------------------------------------------------------ @Override - public void commitInternalOffsetsToKafka( + protected void doCommitInternalOffsetsToKafka( Map offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception { diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index cef70fe6913f8..51f69cdda3e7c 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -44,6 +44,8 @@ import java.util.Map; import java.util.Properties; +import static org.apache.flink.util.Preconditions.checkState; + /** * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API. * @@ -212,7 +214,7 @@ public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) } @Override - public void commitInternalOffsetsToKafka( + protected void doCommitInternalOffsetsToKafka( Map offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception { @@ -224,6 +226,8 @@ public void commitInternalOffsetsToKafka( for (KafkaTopicPartitionState partition : partitions) { Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition()); if (lastProcessedOffset != null) { + checkState(lastProcessedOffset >= 0, "Illegal offset value to commit"); + // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset. // This does not affect Flink's checkpoints/saved state. long offsetToCommit = lastProcessedOffset + 1; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 11f97b2819fef..a128174ff2459 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -36,6 +36,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -242,10 +243,25 @@ protected final List> subscribedPartitionStates() * @param commitCallback The callback that the user should trigger when a commit request completes or fails. * @throws Exception This method forwards exceptions. */ - public abstract void commitInternalOffsetsToKafka( + public final void commitInternalOffsetsToKafka( + Map offsets, + @Nonnull KafkaCommitCallback commitCallback) throws Exception { + // Ignore sentinels. They might appear here if snapshot has started before actual offsets values + // replaced sentinels + doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback); + } + + protected abstract void doCommitInternalOffsetsToKafka( Map offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception; + private Map filterOutSentinels(Map offsets) { + return offsets.entrySet() + .stream() + .filter(entry -> !KafkaTopicPartitionStateSentinel.isSentinel(entry.getValue())) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); + } + /** * Creates the Kafka version specific representation of the given * topic partition. diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java index c218618c8bd74..3857991d61df2 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java @@ -52,4 +52,7 @@ public class KafkaTopicPartitionStateSentinel { */ public static final long GROUP_OFFSET = -915623761773L; + public static boolean isSentinel(long offset) { + return offset < 0; + } } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java index 10631021e1e85..46894a159166e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java @@ -32,8 +32,10 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -46,6 +48,42 @@ @SuppressWarnings("serial") public class AbstractFetcherTest { + @Test + public void testIgnorePartitionStateSentinelInSnapshot() throws Exception { + final String testTopic = "test topic name"; + Map originalPartitions = new HashMap<>(); + originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put(new KafkaTopicPartition(testTopic, 2), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + originalPartitions.put(new KafkaTopicPartition(testTopic, 3), KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + + TestSourceContext sourceContext = new TestSourceContext<>(); + + TestFetcher fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + null, + null, + mock(TestProcessingTimeService.class), + 0); + + synchronized (sourceContext.getCheckpointLock()) { + HashMap currentState = fetcher.snapshotCurrentState(); + fetcher.commitInternalOffsetsToKafka(currentState, new KafkaCommitCallback() { + @Override + public void onSuccess() { + } + + @Override + public void onException(Throwable cause) { + throw new RuntimeException("Callback failed", cause); + } + }); + + assertTrue(fetcher.getLastCommittedOffsets().isPresent()); + assertEquals(Collections.emptyMap(), fetcher.getLastCommittedOffsets().get()); + } + } + // ------------------------------------------------------------------------ // Record emitting tests // ------------------------------------------------------------------------ @@ -327,6 +365,7 @@ public void testPeriodicWatermarks() throws Exception { // ------------------------------------------------------------------------ private static final class TestFetcher extends AbstractFetcher { + protected Optional> lastCommittedOffsets = Optional.empty(); protected TestFetcher( SourceContext sourceContext, @@ -362,10 +401,15 @@ public Object createKafkaPartitionHandle(KafkaTopicPartition partition) { } @Override - public void commitInternalOffsetsToKafka( + protected void doCommitInternalOffsetsToKafka( Map offsets, @Nonnull KafkaCommitCallback callback) throws Exception { - throw new UnsupportedOperationException(); + lastCommittedOffsets = Optional.of(offsets); + callback.onSuccess(); + } + + public Optional> getLastCommittedOffsets() { + return lastCommittedOffsets; } }