diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 2918080cb2dc4..5ffdd796d27a7 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -101,7 +101,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValue> punctuatedWatermarkAssigner; - private transient ListState> offsetsStateForCheckpoint; + private transient ListState>> offsetsAndWatermarksStateForCheckpoint; // ------------------------------------------------------------------------ // runtime state (used individually by each parallel subtask) @@ -113,8 +113,8 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti /** The fetcher implements the connections to the Kafka brokers */ private transient volatile AbstractFetcher kafkaFetcher; - /** The offsets to restore to, if the consumer restores state from a checkpoint */ - private transient volatile HashMap restoreToOffset; + /** The offsets and watermarks to restore to, if the consumer restores state from a checkpoint */ + private transient volatile HashMap> restoreToOffsetAndWatermark; /** Flag indicating whether the consumer is still running **/ private volatile boolean running = true; @@ -238,8 +238,8 @@ public void run(SourceContext sourceContext) throws Exception { (StreamingRuntimeContext) getRuntimeContext()); // (2) set the fetcher to the restored checkpoint offsets - if (restoreToOffset != null) { - fetcher.restoreOffsets(restoreToOffset); + if (restoreToOffsetAndWatermark != null) { + fetcher.restoreOffsetsAndWatermarks(restoreToOffsetAndWatermark); } // publish the reference, for snapshot-, commit-, and cancel calls @@ -318,21 +318,21 @@ public void close() throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); - offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); + offsetsAndWatermarksStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); if (context.isRestored()) { - if (restoreToOffset == null) { - restoreToOffset = new HashMap<>(); - for (Tuple2 kafkaOffset : offsetsStateForCheckpoint.get()) { - restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + if (restoreToOffsetAndWatermark == null) { + restoreToOffsetAndWatermark = new HashMap<>(); + for (Tuple2> kafkaOffset : offsetsAndWatermarksStateForCheckpoint.get()) { + restoreToOffsetAndWatermark.put(kafkaOffset.f0, kafkaOffset.f1); } LOG.info("Setting restore state in the FlinkKafkaConsumer."); if (LOG.isDebugEnabled()) { - LOG.debug("Using the following offsets: {}", restoreToOffset); + LOG.debug("Using the following offsets and watermarks: {}", restoreToOffsetAndWatermark); } - } else if (restoreToOffset.isEmpty()) { - restoreToOffset = null; + } else if (restoreToOffsetAndWatermark.isEmpty()) { + restoreToOffsetAndWatermark = null; } } else { LOG.info("No restore state for FlinkKafkaConsumer."); @@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { LOG.debug("snapshotState() called on closed source"); } else { - offsetsStateForCheckpoint.clear(); + offsetsAndWatermarksStateForCheckpoint.clear(); final AbstractFetcher fetcher = this.kafkaFetcher; if (fetcher == null) { // the fetcher has not yet been initialized, which means we need to return the - // originally restored offsets or the assigned partitions + // originally restored offsets and watermarks or the assigned partitions - if (restoreToOffset != null) { + if (restoreToOffsetAndWatermark != null) { - for (Map.Entry kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { - offsetsStateForCheckpoint.add( - Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); + for (Map.Entry> kafkaTopicPartitionOffsetAndWatermark : restoreToOffsetAndWatermark.entrySet()) { + offsetsAndWatermarksStateForCheckpoint.add( + Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue())); } } else if (subscribedPartitions != null) { for (KafkaTopicPartition subscribedPartition : subscribedPartitions) { - offsetsStateForCheckpoint.add( - Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET)); + offsetsAndWatermarksStateForCheckpoint.add( + Tuple2.of(subscribedPartition, Tuple2.of(KafkaTopicPartitionState.OFFSET_NOT_SET, Long.MIN_VALUE))); } } // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() - pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset); + pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffsetAndWatermark); } else { - HashMap currentOffsets = fetcher.snapshotCurrentState(); + HashMap> currentOffsetsAndWatermarks = fetcher.snapshotCurrentState(); // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() - pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); + pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsetsAndWatermarks); - for (Map.Entry kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { - offsetsStateForCheckpoint.add( - Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); + for (Map.Entry> kafkaTopicPartitionOffsetAndWatermark : currentOffsetsAndWatermarks.entrySet()) { + offsetsAndWatermarksStateForCheckpoint.add( + Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue())); } } @@ -393,11 +393,14 @@ public void restoreState(HashMap restoredOffsets) { LOG.info("{} (taskIdx={}) restoring offsets from an older version.", getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask()); - restoreToOffset = restoredOffsets; + restoreToOffsetAndWatermark = new HashMap<>(); + for (Map.Entry kafkaTopicPartitionOffset : restoredOffsets.entrySet()) { + restoreToOffsetAndWatermark.put(kafkaTopicPartitionOffset.getKey(), Tuple2.of(kafkaTopicPartitionOffset.getValue(), Long.MIN_VALUE)); + } if (LOG.isDebugEnabled()) { LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}", - getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset); + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoredOffsets); } } @@ -427,18 +430,24 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } @SuppressWarnings("unchecked") - HashMap offsets = - (HashMap) pendingOffsetsToCommit.remove(posInMap); + HashMap> offsetsAndWatermarks = + (HashMap>) pendingOffsetsToCommit.remove(posInMap); // remove older checkpoints in map for (int i = 0; i < posInMap; i++) { pendingOffsetsToCommit.remove(0); } - if (offsets == null || offsets.size() == 0) { + if (offsetsAndWatermarks == null || offsetsAndWatermarks.size() == 0) { LOG.debug("Checkpoint state was empty."); return; } + // separate the offsets from the watermarks + HashMap offsets = new HashMap<>(); + for (Map.Entry> offsetAndWatermark : offsetsAndWatermarks.entrySet()) { + offsets.put(offsetAndWatermark.getKey(), offsetAndWatermark.getValue().f0); + } + fetcher.commitInternalOffsetsToKafka(offsets); } catch (Exception e) { @@ -492,9 +501,9 @@ public TypeInformation getProducedType() { private void assignTopicPartitions(List kafkaTopicPartitions) { subscribedPartitions = new ArrayList<>(); - if (restoreToOffset != null) { + if (restoreToOffsetAndWatermark != null) { for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { - if (restoreToOffset.containsKey(kafkaTopicPartition)) { + if (restoreToOffsetAndWatermark.containsKey(kafkaTopicPartition)) { subscribedPartitions.add(kafkaTopicPartition); } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 821eb03abf311..b35d2923b17e6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.internals; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; @@ -175,34 +176,115 @@ protected final KafkaTopicPartitionState[] subscribedPartitions() { // ------------------------------------------------------------------------ /** - * Takes a snapshot of the partition offsets. + * Takes a snapshot of the partition offsets and watermarks. * *

Important: This method mus be called under the checkpoint lock. * - * @return A map from partition to current offset. + * @return A map from partition to current offset and watermark. */ - public HashMap snapshotCurrentState() { + public HashMap> snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap> state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), Long.MIN_VALUE)); + } + + return state; + } + + case PERIODIC_WATERMARKS: { + KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentWatermarkTimestamp())); + } + + return state; + } + + case PUNCTUATED_WATERMARKS: { + KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentPartitionWatermark())); + } + + return state; + } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); } - return state; } /** - * Restores the partition offsets. + * Restores the partition offsets and watermarks. * - * @param snapshotState The offsets for the partitions + * @param snapshotState The offsets and watermarks for the partitions */ - public void restoreOffsets(Map snapshotState) { - for (KafkaTopicPartitionState partition : allPartitions) { - Long offset = snapshotState.get(partition.getKafkaTopicPartition()); - if (offset != null) { - partition.setOffset(offset); + public void restoreOffsetsAndWatermarks(Map> snapshotState) { + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + for (KafkaTopicPartitionState partition : allPartitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { + partition.setOffset(offset); + } + } + break; + } + + case PERIODIC_WATERMARKS: { + KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { + partition.setOffset(offset); + } + + Long watermarkTimestamp = snapshotState.get(partition.getKafkaTopicPartition()).f1; + if (watermarkTimestamp != null) { + partition.setCurrentWatermarkTimestamp(watermarkTimestamp); + } + } + break; } + + case PUNCTUATED_WATERMARKS: { + KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks partition : partitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { + partition.setOffset(offset); + } + + Long watermarkTimestamp = snapshotState.get(partition.getKafkaTopicPartition()).f1; + if (watermarkTimestamp != null) { + partition.setCurrentWatermarkTimestamp(watermarkTimestamp); + } + } + break; + } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java index efdc73fc0f390..5dc52d9c0a977 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java @@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp() { return partitionWatermark; } + void setCurrentWatermarkTimestamp(long watermarkTimestamp) { + partitionWatermark = watermarkTimestamp; + } + // ------------------------------------------------------------------------ @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java index edf40ce671e26..1731726851760 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java @@ -74,6 +74,10 @@ public long getCurrentPartitionWatermark() { return partitionWatermark; } + void setCurrentWatermarkTimestamp(long watermarkTimestamp) { + partitionWatermark = watermarkTimestamp; + } + // ------------------------------------------------------------------------ @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index c315d31b49544..36d5b8e6877f1 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; @@ -46,7 +47,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyMapOf; +import static org.mockito.Matchers.anyMap; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -84,7 +85,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Assert.fail("This should never be called"); return null; } - }).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class)); + }).when(fetcher).restoreOffsetsAndWatermarks(anyMap()); doAnswer(new Answer() { @Override @@ -173,7 +174,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Assert.fail("This should never be called"); return null; } - }).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class)); + }).when(fetcher).restoreOffsetsAndWatermarks(anyMap()); doAnswer(new Answer() { @Override @@ -257,9 +258,9 @@ public void testRestoreFromFlink11() throws Exception { // prepare fake states // -------------------------------------------------------------------- - final HashMap state1 = new HashMap<>(); - state1.put(new KafkaTopicPartition("abc", 13), 16768L); - state1.put(new KafkaTopicPartition("def", 7), 987654321L); + final HashMap> state1 = new HashMap<>(); + state1.put(new KafkaTopicPartition("abc", 13), Tuple2.of(16768L, Long.MIN_VALUE)); + state1.put(new KafkaTopicPartition("def", 7), Tuple2.of(987654321L, Long.MIN_VALUE)); final OneShotLatch latch = new OneShotLatch(); final AbstractFetcher fetcher = mock(AbstractFetcher.class); @@ -267,13 +268,14 @@ public void testRestoreFromFlink11() throws Exception { doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Map map = (HashMap) invocationOnMock.getArguments()[0]; + Map> map = + (HashMap>) invocationOnMock.getArguments()[0]; latch.trigger(); assertEquals(state1, map); return null; } - }).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class)); + }).when(fetcher).restoreOffsetsAndWatermarks(anyMap()); final List partitions = new ArrayList<>(); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index b96ba30e03595..d0d20f1041f2a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -106,7 +106,7 @@ public void ignoreCheckpointWhenNotRunning() throws Exception { FlinkKafkaConsumerBase consumer = getConsumer(fetcher, new LinkedMap(), false); OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - TestingListState> listState = new TestingListState<>(); + TestingListState>> listState = new TestingListState<>(); when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1)); @@ -123,8 +123,8 @@ public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception { OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); TestingListState listState = new TestingListState<>(); - listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L)); - listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L)); + listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), Tuple2.of(16768L, 16700L))); + listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), Tuple2.of(987654321L, 987654300L))); FlinkKafkaConsumerBase consumer = getConsumer(null, new LinkedMap(), true); @@ -215,17 +215,17 @@ public void testSnapshotState() throws Exception { // prepare fake states // -------------------------------------------------------------------- - final HashMap state1 = new HashMap<>(); - state1.put(new KafkaTopicPartition("abc", 13), 16768L); - state1.put(new KafkaTopicPartition("def", 7), 987654321L); + final HashMap> state1 = new HashMap<>(); + state1.put(new KafkaTopicPartition("abc", 13), Tuple2.of(16768L, 16760L)); + state1.put(new KafkaTopicPartition("def", 7), Tuple2.of(987654321L, 987654320L)); - final HashMap state2 = new HashMap<>(); - state2.put(new KafkaTopicPartition("abc", 13), 16770L); - state2.put(new KafkaTopicPartition("def", 7), 987654329L); + final HashMap> state2 = new HashMap<>(); + state2.put(new KafkaTopicPartition("abc", 13), Tuple2.of(16770L, 16770L)); + state2.put(new KafkaTopicPartition("def", 7), Tuple2.of(987654329L, 987654320L)); - final HashMap state3 = new HashMap<>(); - state3.put(new KafkaTopicPartition("abc", 13), 16780L); - state3.put(new KafkaTopicPartition("def", 7), 987654377L); + final HashMap> state3 = new HashMap<>(); + state3.put(new KafkaTopicPartition("abc", 13), Tuple2.of(16780L, 16780L)); + state3.put(new KafkaTopicPartition("def", 7), Tuple2.of(987654377L, 987654370L)); // -------------------------------------------------------------------- @@ -253,11 +253,12 @@ public void testSnapshotState() throws Exception { // checkpoint 1 consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138)); - HashMap snapshot1 = new HashMap<>(); + HashMap> snapshot1 = new HashMap<>(); for (Serializable serializable : listState.get()) { - Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2) serializable; - snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); + Tuple2> kafkaTopicPartitionOffsetAndWatermark = + (Tuple2>) serializable; + snapshot1.put(kafkaTopicPartitionOffsetAndWatermark.f0, kafkaTopicPartitionOffsetAndWatermark.f1); } assertEquals(state1, snapshot1); @@ -267,11 +268,12 @@ public void testSnapshotState() throws Exception { // checkpoint 2 consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140)); - HashMap snapshot2 = new HashMap<>(); + HashMap> snapshot2 = new HashMap<>(); for (Serializable serializable : listState.get()) { - Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2) serializable; - snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); + Tuple2> kafkaTopicPartitionOffsetAndWatermark = + (Tuple2>) serializable; + snapshot2.put(kafkaTopicPartitionOffsetAndWatermark.f0, kafkaTopicPartitionOffsetAndWatermark.f1); } assertEquals(state2, snapshot2); @@ -286,11 +288,12 @@ public void testSnapshotState() throws Exception { // checkpoint 3 consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141)); - HashMap snapshot3 = new HashMap<>(); + HashMap> snapshot3 = new HashMap<>(); for (Serializable serializable : listState.get()) { - Tuple2 kafkaTopicPartitionLongTuple2 = (Tuple2) serializable; - snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); + Tuple2> kafkaTopicPartitionOffsetAndWatermark = + (Tuple2>) serializable; + snapshot3.put(kafkaTopicPartitionOffsetAndWatermark.f0, kafkaTopicPartitionOffsetAndWatermark.f1); } assertEquals(state3, snapshot3); @@ -369,7 +372,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Assert.fail("Trying to restore offsets even though there was no restore state."); return null; } - }).when(fetcher).restoreOffsets(any(HashMap.class)); + }).when(fetcher).restoreOffsetsAndWatermarks(any(HashMap.class)); return fetcher; }