Skip to content

Commit

Permalink
[FLINK-20194] Fix Kafka offset commit to coorectly handle the followi…
Browse files Browse the repository at this point in the history
…ng cases:

1. The SplitFetcher has exited.
2. The offsets to be committed is empty.
3. The offsets commit for finished splits.
  • Loading branch information
becketqin committed Nov 19, 2020
1 parent 221d331 commit 92b26cf
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.base.source.reader.fetcher;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
Expand Down Expand Up @@ -185,4 +186,11 @@ public void checkErrors() {
uncaughtFetcherException.get());
}
}

// -----------------------

@VisibleForTesting
public int getNumAliveFetchers() {
return fetchers.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

/**
Expand All @@ -48,7 +51,10 @@
public class KafkaSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<Tuple3<T, Long, Long>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReader.class);
// These maps need to be concurrent because it will be accessed by both the main thread
// and the split fetcher thread in the callback.
private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit;
private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits;

public KafkaSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue,
Expand All @@ -62,37 +68,55 @@ public KafkaSourceReader(
recordEmitter,
config,
context);
this.offsetsToCommit = new TreeMap<>();
this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
}

@Override
protected void onSplitFinished(Map<String, KafkaPartitionSplitState> finishedSplitIds) {

finishedSplitIds.forEach((ignored, splitState) -> {
offsetsOfFinishedSplits.put(
splitState.getTopicPartition(),
new OffsetAndMetadata(splitState.getCurrentOffset()));
});
}

@Override
public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
for (KafkaPartitionSplit split : splits) {
offsetsToCommit
.compute(checkpointId, (ignoredKey, ignoredValue) -> new HashMap<>())
.put(
split.getTopicPartition(),
new OffsetAndMetadata(split.getStartingOffset(), null));
if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
offsetsToCommit.put(checkpointId, Collections.emptyMap());
} else {
Map<TopicPartition, OffsetAndMetadata> offsetsMap =
offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the offsets of the active splits.
for (KafkaPartitionSplit split : splits) {
offsetsMap.put(
split.getTopicPartition(),
new OffsetAndMetadata(split.getStartingOffset(), null));
}
// Put offsets of all the finished splits.
offsetsMap.putAll(offsetsOfFinishedSplits);
}
return splits;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.info("Committing offsets for checkpoint {}", checkpointId);
((KafkaSourceFetcherManager<T>) splitFetcherManager).commitOffsets(
offsetsToCommit.get(checkpointId),
(ignored, e) -> {
if (e != null) {
LOG.warn(
"Failed to commit consumer offsets for checkpoint {}",
checkpointId);
LOG.warn("Failed to commit consumer offsets for checkpoint {}", checkpointId, e);
} else {
LOG.debug("Successfully committed offsets for checkpoint {}", checkpointId);
// If the finished topic partition has been committed, we remove it
// from the offsets of finsihed splits map.
Map<TopicPartition, OffsetAndMetadata> committedPartitions =
offsetsToCommit.get(checkpointId);
offsetsOfFinishedSplits.entrySet().removeIf(
entry -> committedPartitions.containsKey(entry.getKey()));
while (!offsetsToCommit.isEmpty() && offsetsToCommit.firstKey() <= checkpointId) {
offsetsToCommit.remove(offsetsToCommit.firstKey());
}
Expand All @@ -116,4 +140,9 @@ protected KafkaPartitionSplit toSplitType(String splitId, KafkaPartitionSplitSta
SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> getOffsetsToCommit() {
return offsetsToCommit;
}

@VisibleForTesting
int getNumAliveFetchers() {
return splitFetcherManager.getNumAliveFetchers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
Expand All @@ -44,6 +46,7 @@
*/
public class KafkaSourceFetcherManager<T>
extends SingleThreadFetcherManager<Tuple3<T, Long, Long>, KafkaPartitionSplit> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceFetcherManager.class);

/**
* Creates a new SplitFetcherManager with a single I/O threads.
Expand All @@ -62,7 +65,25 @@ public KafkaSourceFetcherManager(
public void commitOffsets(
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
OffsetCommitCallback callback) {
SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher = getRunningFetcher();
LOG.debug("Committing offsets {}", offsetsToCommit);
if (offsetsToCommit.isEmpty()) {
return;
}
SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher = fetchers.get(0);
if (splitFetcher != null) {
// The fetcher thread is still running. This should be the majority of the cases.
enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
} else {
splitFetcher = createSplitFetcher();
enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
startFetcher(splitFetcher);
}
}

private void enqueueOffsetsCommitTask(
SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher,
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
OffsetCommitCallback callback) {
KafkaPartitionSplitReader<T> kafkaReader =
(KafkaPartitionSplitReader<T>) splitFetcher.getSplitReader();

Expand All @@ -74,7 +95,8 @@ public boolean run() throws IOException {
}

@Override
public void wakeUp() {}
public void wakeUp() {
}
});
}
}

0 comments on commit 92b26cf

Please sign in to comment.