Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* The assigner is kept in serialized form, to deserialize it into multiple copies */
private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;

private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
private transient ListState<Tuple2<KafkaTopicPartition, Tuple2<Long, Long>>> offsetsAndWatermarksStateForCheckpoint;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should switch to have a specific checkpointed state object instead of continuing to "extend" the original Tuple. This will also be helpful for compatibility for any future changes to the checkpointed state.


// ------------------------------------------------------------------------
// runtime state (used individually by each parallel subtask)
Expand All @@ -113,8 +113,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/** The fetcher implements the connections to the Kafka brokers */
private transient volatile AbstractFetcher<T, ?> kafkaFetcher;

/** The offsets to restore to, if the consumer restores state from a checkpoint */
private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset;
/** The offsets and watermarks to restore to, if the consumer restores state from a checkpoint */
private transient volatile HashMap<KafkaTopicPartition, Tuple2<Long, Long>> restoreToOffsetAndWatermark;

/** Flag indicating whether the consumer is still running **/
private volatile boolean running = true;
Expand Down Expand Up @@ -238,8 +238,8 @@ public void run(SourceContext<T> sourceContext) throws Exception {
(StreamingRuntimeContext) getRuntimeContext());

// (2) set the fetcher to the restored checkpoint offsets
if (restoreToOffset != null) {
fetcher.restoreOffsets(restoreToOffset);
if (restoreToOffsetAndWatermark != null) {
fetcher.restoreOffsetsAndWatermarks(restoreToOffsetAndWatermark);
}

// publish the reference, for snapshot-, commit-, and cancel calls
Expand Down Expand Up @@ -318,21 +318,21 @@ public void close() throws Exception {
public void initializeState(FunctionInitializationContext context) throws Exception {

OperatorStateStore stateStore = context.getOperatorStateStore();
offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
offsetsAndWatermarksStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

if (context.isRestored()) {
if (restoreToOffset == null) {
restoreToOffset = new HashMap<>();
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
if (restoreToOffsetAndWatermark == null) {
restoreToOffsetAndWatermark = new HashMap<>();
for (Tuple2<KafkaTopicPartition, Tuple2<Long, Long>> kafkaOffset : offsetsAndWatermarksStateForCheckpoint.get()) {
restoreToOffsetAndWatermark.put(kafkaOffset.f0, kafkaOffset.f1);
}

LOG.info("Setting restore state in the FlinkKafkaConsumer.");
if (LOG.isDebugEnabled()) {
LOG.debug("Using the following offsets: {}", restoreToOffset);
LOG.debug("Using the following offsets and watermarks: {}", restoreToOffsetAndWatermark);
}
} else if (restoreToOffset.isEmpty()) {
restoreToOffset = null;
} else if (restoreToOffsetAndWatermark.isEmpty()) {
restoreToOffsetAndWatermark = null;
}
} else {
LOG.info("No restore state for FlinkKafkaConsumer.");
Expand All @@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
LOG.debug("snapshotState() called on closed source");
} else {

offsetsStateForCheckpoint.clear();
offsetsAndWatermarksStateForCheckpoint.clear();

final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
// originally restored offsets and watermarks or the assigned partitions

if (restoreToOffset != null) {
if (restoreToOffsetAndWatermark != null) {

for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
offsetsStateForCheckpoint.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
for (Map.Entry<KafkaTopicPartition, Tuple2<Long, Long>> kafkaTopicPartitionOffsetAndWatermark : restoreToOffsetAndWatermark.entrySet()) {
offsetsAndWatermarksStateForCheckpoint.add(
Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a specific checkpoint state object will also be helpful for code readability in situations like this one (it's quite tricky to understand quickly what the key / value refers to, as well as some of the f0, f1 calls in other parts of the PR. I know the previous code used f0 and f1 also, but I think it's a good opportunity to improve that).

}
} else if (subscribedPartitions != null) {
for (KafkaTopicPartition subscribedPartition : subscribedPartitions) {
offsetsStateForCheckpoint.add(
Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
offsetsAndWatermarksStateForCheckpoint.add(
Tuple2.of(subscribedPartition, Tuple2.of(KafkaTopicPartitionState.OFFSET_NOT_SET, Long.MIN_VALUE)));
}
}

// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffsetAndWatermark);
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
HashMap<KafkaTopicPartition, Tuple2<Long, Long>> currentOffsetsAndWatermarks = fetcher.snapshotCurrentState();

// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsetsAndWatermarks);

for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
offsetsStateForCheckpoint.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
for (Map.Entry<KafkaTopicPartition, Tuple2<Long, Long>> kafkaTopicPartitionOffsetAndWatermark : currentOffsetsAndWatermarks.entrySet()) {
offsetsAndWatermarksStateForCheckpoint.add(
Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue()));
}
}

Expand All @@ -393,11 +393,14 @@ public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());

restoreToOffset = restoredOffsets;
restoreToOffsetAndWatermark = new HashMap<>();
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionOffset : restoredOffsets.entrySet()) {
restoreToOffsetAndWatermark.put(kafkaTopicPartitionOffset.getKey(), Tuple2.of(kafkaTopicPartitionOffset.getValue(), Long.MIN_VALUE));
}

if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoredOffsets);
}
}

Expand Down Expand Up @@ -427,18 +430,24 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
}

@SuppressWarnings("unchecked")
HashMap<KafkaTopicPartition, Long> offsets =
(HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
HashMap<KafkaTopicPartition, Tuple2<Long, Long>> offsetsAndWatermarks =
(HashMap<KafkaTopicPartition, Tuple2<Long, Long>>) pendingOffsetsToCommit.remove(posInMap);

// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0);
}

if (offsets == null || offsets.size() == 0) {
if (offsetsAndWatermarks == null || offsetsAndWatermarks.size() == 0) {
LOG.debug("Checkpoint state was empty.");
return;
}
// separate the offsets from the watermarks
HashMap<KafkaTopicPartition, Long> offsets = new HashMap<>();
for (Map.Entry<KafkaTopicPartition, Tuple2<Long, Long>> offsetAndWatermark : offsetsAndWatermarks.entrySet()) {
offsets.put(offsetAndWatermark.getKey(), offsetAndWatermark.getValue().f0);
}

fetcher.commitInternalOffsetsToKafka(offsets);
}
catch (Exception e) {
Expand Down Expand Up @@ -492,9 +501,9 @@ public TypeInformation<T> getProducedType() {
private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
subscribedPartitions = new ArrayList<>();

if (restoreToOffset != null) {
if (restoreToOffsetAndWatermark != null) {
for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
if (restoreToOffset.containsKey(kafkaTopicPartition)) {
if (restoreToOffsetAndWatermark.containsKey(kafkaTopicPartition)) {
subscribedPartitions.add(kafkaTopicPartition);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka.internals;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
Expand Down Expand Up @@ -175,34 +176,115 @@ protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
// ------------------------------------------------------------------------

/**
* Takes a snapshot of the partition offsets.
* Takes a snapshot of the partition offsets and watermarks.
*
* <p>Important: This method mus be called under the checkpoint lock.
*
* @return A map from partition to current offset.
* @return A map from partition to current offset and watermark.
*/
public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
public HashMap<KafkaTopicPartition, Tuple2<Long, Long>> snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);

HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
state.put(partition.getKafkaTopicPartition(), partition.getOffset());
HashMap<KafkaTopicPartition, Tuple2<Long, Long>> state = new HashMap<>(allPartitions.length);

switch (timestampWatermarkMode) {

case NO_TIMESTAMPS_WATERMARKS: {

for (KafkaTopicPartitionState<KPH> partition : allPartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excessive empty line above this line.

state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), Long.MIN_VALUE));
}

return state;
}

case PERIODIC_WATERMARKS: {
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> [] partitions =
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> []) allPartitions;

for (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partition : partitions) {
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentWatermarkTimestamp()));
}

return state;
}

case PUNCTUATED_WATERMARKS: {
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> [] partitions =
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> []) allPartitions;

for (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partition : partitions) {
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentPartitionWatermark()));
}

return state;
}

default:
// cannot happen, add this as a guard for the future
throw new RuntimeException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to have a reason message here.

}
return state;
}

/**
* Restores the partition offsets.
* Restores the partition offsets and watermarks.
*
* @param snapshotState The offsets for the partitions
* @param snapshotState The offsets and watermarks for the partitions
*/
public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) {
for (KafkaTopicPartitionState<?> partition : allPartitions) {
Long offset = snapshotState.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setOffset(offset);
public void restoreOffsetsAndWatermarks(Map<KafkaTopicPartition, Tuple2<Long, Long>> snapshotState) {

switch (timestampWatermarkMode) {

case NO_TIMESTAMPS_WATERMARKS: {
for (KafkaTopicPartitionState<KPH> partition : allPartitions) {
Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0;
if (offset != null) {
partition.setOffset(offset);
}
}
break;
}

case PERIODIC_WATERMARKS: {
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> [] partitions =
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> []) allPartitions;

for (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partition : partitions) {
Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0;
if (offset != null) {
partition.setOffset(offset);
}

Long watermarkTimestamp = snapshotState.get(partition.getKafkaTopicPartition()).f1;
if (watermarkTimestamp != null) {
partition.setCurrentWatermarkTimestamp(watermarkTimestamp);
}
}
break;
}

case PUNCTUATED_WATERMARKS: {
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> [] partitions =
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> []) allPartitions;

for (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partition : partitions) {
Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0;
if (offset != null) {
partition.setOffset(offset);
}

Long watermarkTimestamp = snapshotState.get(partition.getKafkaTopicPartition()).f1;
if (watermarkTimestamp != null) {
partition.setCurrentWatermarkTimestamp(watermarkTimestamp);
}
}
break;
}

default:
// cannot happen, add this as a guard for the future
throw new RuntimeException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to have a reason message here.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp() {
return partitionWatermark;
}

void setCurrentWatermarkTimestamp(long watermarkTimestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other methods seem to be public (although they can actually be package-private). Should we stay consistent with that here?

partitionWatermark = watermarkTimestamp;
}

// ------------------------------------------------------------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public long getCurrentPartitionWatermark() {
return partitionWatermark;
}

void setCurrentWatermarkTimestamp(long watermarkTimestamp) {
partitionWatermark = watermarkTimestamp;
}

// ------------------------------------------------------------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
Expand Down Expand Up @@ -46,7 +47,7 @@
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyMapOf;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -84,7 +85,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Assert.fail("This should never be called");
return null;
}
}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class));
}).when(fetcher).restoreOffsetsAndWatermarks(anyMap());

doAnswer(new Answer<Void>() {
@Override
Expand Down Expand Up @@ -173,7 +174,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Assert.fail("This should never be called");
return null;
}
}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class));
}).when(fetcher).restoreOffsetsAndWatermarks(anyMap());

doAnswer(new Answer<Void>() {
@Override
Expand Down Expand Up @@ -257,23 +258,24 @@ public void testRestoreFromFlink11() throws Exception {
// prepare fake states
// --------------------------------------------------------------------

final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
state1.put(new KafkaTopicPartition("abc", 13), 16768L);
state1.put(new KafkaTopicPartition("def", 7), 987654321L);
final HashMap<KafkaTopicPartition, Tuple2<Long, Long>> state1 = new HashMap<>();
state1.put(new KafkaTopicPartition("abc", 13), Tuple2.of(16768L, Long.MIN_VALUE));
state1.put(new KafkaTopicPartition("def", 7), Tuple2.of(987654321L, Long.MIN_VALUE));

final OneShotLatch latch = new OneShotLatch();
final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);

doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Map<KafkaTopicPartition, Long> map = (HashMap<KafkaTopicPartition, Long>) invocationOnMock.getArguments()[0];
Map<KafkaTopicPartition, Tuple2<Long, Long>> map =
(HashMap<KafkaTopicPartition, Tuple2<Long, Long>>) invocationOnMock.getArguments()[0];

latch.trigger();
assertEquals(state1, map);
return null;
}
}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class));
}).when(fetcher).restoreOffsetsAndWatermarks(anyMap());


final List<KafkaTopicPartition> partitions = new ArrayList<>();
Expand Down
Loading