Skip to content

Commit

Permalink
[FLINK-9303] [kafka] Adding support for unassign dynamically partitio…
Browse files Browse the repository at this point in the history
…ns from kafka consumer when they become unavailable

- Check for unavailable partitions recovered from state
- Using kafka consumer option to activate this validations
  • Loading branch information
EAlexRojas committed May 22, 2018
1 parent 1b1122f commit a7f23f9
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ private FlinkKafkaConsumer08(
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
!getBoolean(props, KEY_DISABLE_METRICS, false));
!getBoolean(props, KEY_DISABLE_METRICS, false),
getBoolean(props, FLINK_CHECK_UNAVAILABLE_PARTITIONS, false));

this.kafkaProperties = props;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -342,6 +343,11 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti
return new TopicAndPartition(partition.getTopic(), partition.getPartition());
}

@Override
protected void addPartitionsToBeRemoved(Set<KafkaTopicPartition> partitionsToRemove) {
throw new UnsupportedOperationException();
}

// ------------------------------------------------------------------------
// Offset handling
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ private FlinkKafkaConsumer09(
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
!getBoolean(props, KEY_DISABLE_METRICS, false));
!getBoolean(props, KEY_DISABLE_METRICS, false),
getBoolean(props, FLINK_CHECK_UNAVAILABLE_PARTITIONS, false));

this.properties = props;
setDeserializer(this.properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -112,7 +113,8 @@ public Kafka09Fetcher(
pollTimeout,
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
subtaskMetricGroup,
partitionsToBeRemoved);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -215,6 +217,16 @@ public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition)
return new TopicPartition(partition.getTopic(), partition.getPartition());
}

@Override
protected void addPartitionsToBeRemoved(Set<KafkaTopicPartition> partitionsToRemove) {
for (KafkaTopicPartition ptr : partitionsToRemove) {
TopicPartition partition = new TopicPartition(ptr.getTopic(), ptr.getPartition());
if (!partitionsToBeRemoved.contains(partition)) {
partitionsToBeRemoved.add(partition);
}
}
}

@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -80,6 +83,9 @@ public class KafkaConsumerThread extends Thread {
/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;

/** The list of partitions to be removed from kafka consumer. */
private final Set<TopicPartition> partitionsToBeRemoved;

/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken. */
private final KafkaConsumerCallBridge consumerCallBridge;

Expand Down Expand Up @@ -130,7 +136,8 @@ public KafkaConsumerThread(
long pollTimeout,
boolean useMetrics,
MetricGroup consumerMetricGroup,
MetricGroup subtaskMetricGroup) {
MetricGroup subtaskMetricGroup,
Set<TopicPartition> partitionsToBeRemoved) {

super(threadName);
setDaemon(true);
Expand All @@ -144,6 +151,8 @@ public KafkaConsumerThread(

this.unassignedPartitionsQueue = checkNotNull(unassignedPartitionsQueue);

this.partitionsToBeRemoved = checkNotNull(partitionsToBeRemoved);

this.pollTimeout = pollTimeout;
this.useMetrics = useMetrics;

Expand Down Expand Up @@ -240,7 +249,9 @@ public void run() {
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
reassignPartitions(newPartitions);
reassignPartitions(newPartitions, new HashSet<>());
} else if (!partitionsToBeRemoved.isEmpty()){
reassignPartitions(new ArrayList<>(), partitionsToBeRemoved);
}
} catch (AbortedReassignmentException e) {
continue;
Expand Down Expand Up @@ -374,8 +385,8 @@ void setOffsetsToCommit(
* <p>This method is exposed for testing purposes.
*/
@VisibleForTesting
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
if (newPartitions.size() == 0) {
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions, Set<TopicPartition> partitionsToBeRemoved) throws Exception {
if (newPartitions.isEmpty() && partitionsToBeRemoved.isEmpty()) {
return;
}
hasAssignedPartitions = true;
Expand All @@ -391,14 +402,29 @@ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit
}

final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>();
final List<TopicPartition> removedPartitions = new ArrayList<>();
try {
for (TopicPartition oldPartition : consumerTmp.assignment()) {
oldPartitionAssignmentsToPosition.put(oldPartition, consumerTmp.position(oldPartition));
}

Set<TopicPartition> oldTopicPartitions = oldPartitionAssignmentsToPosition.keySet();
Iterator<TopicPartition> iter = partitionsToBeRemoved.iterator();
while (iter.hasNext()) {
TopicPartition partition = iter.next();
if (oldTopicPartitions.contains(partition)) {
oldTopicPartitions.remove(partition);
removedPartitions.add(partition);
iter.remove();
}
}
if (!removedPartitions.isEmpty()) {
log.info("Removing " + removedPartitions.size() + " partition(s) from consumer.");
}

final List<TopicPartition> newPartitionAssignments =
new ArrayList<>(newPartitions.size() + oldPartitionAssignmentsToPosition.size());
newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());
new ArrayList<>(newPartitions.size() + oldTopicPartitions.size());
newPartitionAssignments.addAll(oldTopicPartitions);
newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));

// reassign with the new partitions
Expand Down Expand Up @@ -460,6 +486,11 @@ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit
unassignedPartitionsQueue.add(newPartition);
}

// re-add all "partitions to be removed" back to the list to be actually removed next time
for (TopicPartition removedPartition : removedPartitions) {
partitionsToBeRemoved.add(removedPartition);
}

// this signals the main fetch loop to continue through the loop
throw new AbortedReassignmentException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -721,7 +723,8 @@ public TestKafkaConsumerThread(
0,
false,
new UnregisteredMetricsGroup(),
new UnregisteredMetricsGroup());
new UnregisteredMetricsGroup(),
new HashSet<>());

this.mockConsumer = mockConsumer;
}
Expand All @@ -748,15 +751,15 @@ KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
}

@Override
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions, Set<TopicPartition> partitionsToBeRemoved) throws Exception {
// triggers blocking calls on waitPartitionReassignmentInvoked()
preReassignmentLatch.trigger();

// waits for startPartitionReassignment() to be called
startReassignmentLatch.await();

try {
super.reassignPartitions(newPartitions);
super.reassignPartitions(newPartitions, partitionsToBeRemoved);
} finally {
// triggers blocking calls on waitPartitionReassignmentComplete()
reassignmentCompleteLatch.trigger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.DiscoveryType;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
Expand All @@ -59,6 +60,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -105,6 +107,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/** Configuration key to define the consumer's partition discovery interval, in milliseconds. */
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";

/** Boolean configuration key to define whether or not unsubscribe from partitions that are not longer available. */
public static final String FLINK_CHECK_UNAVAILABLE_PARTITIONS = "flink.check-unavailable-partitions";

/** State name of the consumer's partition offset states. */
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";

Expand Down Expand Up @@ -194,6 +199,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;

/** Flag indicating whether or not unsubscribe from partitions that are not longer available.*/
private final boolean checkUnavailablePartitions;

// ------------------------------------------------------------------------
// internal metrics
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema<T> deserializer,
long discoveryIntervalMillis,
boolean useMetrics) {
boolean useMetrics,
boolean checkUnavailablePartitions) {
this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
this.deserializer = checkNotNull(deserializer, "valueDeserializer");

Expand All @@ -245,6 +254,8 @@ public FlinkKafkaConsumerBase(
this.discoveryIntervalMillis = discoveryIntervalMillis;

this.useMetrics = useMetrics;

this.checkUnavailablePartitions = checkUnavailablePartitions;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -479,7 +490,19 @@ public void open(Configuration configuration) throws Exception {
}
}

for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
Iterator<Map.Entry<KafkaTopicPartition, Long>> iter = restoredState.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KafkaTopicPartition, Long> restoredStateEntry = iter.next();

// After adding new partitions, restoredState should contain all partitions
// but restoredStated could contain partitions that are no longer available.
// In that case, those partitions should be removed.
// Comparing against allPartitions only when it has elements.
if (this.checkUnavailablePartitions && !allPartitions.isEmpty()
&& !allPartitions.contains(restoredStateEntry.getKey())) {
iter.remove();
continue;
}
if (!restoredFromOldState) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
Expand Down Expand Up @@ -672,7 +695,9 @@ public void run() {
try {
// --------------------- partition discovery loop ---------------------

List<KafkaTopicPartition> discoveredPartitions;
Map<DiscoveryType, List<KafkaTopicPartition>> discoveredPartitions;
List<KafkaTopicPartition> newDiscoveredPartitions;
List<KafkaTopicPartition> partitionsToBeRemoved;

// throughout the loop, we always eagerly check if we are still running before
// performing the next operation, so that we can escape the loop as soon as possible
Expand All @@ -683,16 +708,21 @@ public void run() {
}

try {
discoveredPartitions = partitionDiscoverer.discoverPartitions();
discoveredPartitions = partitionDiscoverer.discoverNewAndUnavailablePartitions(checkUnavailablePartitions);
newDiscoveredPartitions = discoveredPartitions.get(DiscoveryType.DISCOVERED_NEW_PARTITIONS);
partitionsToBeRemoved = discoveredPartitions.get(DiscoveryType.DISCOVERED_UNAVAILABLE_PARTITIONS);
} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
// the partition discoverer may have been closed or woken up before or during the discovery;
// this would only happen if the consumer was canceled; simply escape the loop
break;
}

// no need to add the discovered partitions if we were closed during the meantime
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
if (running && !newDiscoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(newDiscoveredPartitions);
}
if (running && !partitionsToBeRemoved.isEmpty()) {
kafkaFetcher.removePartitions(partitionsToBeRemoved);
}

// do not waste any time sleeping if we're not running anymore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITTED_OFFSETS_METRICS_GAUGE;
Expand Down Expand Up @@ -78,6 +81,9 @@ public abstract class AbstractFetcher<T, KPH> {
/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates;

/** Partitions marked to be removed. */
protected final Set<KPH> partitionsToBeRemoved;

/**
* Queue of partitions that are not yet assigned to any Kafka clients for consuming.
* Kafka version-specific implementations of {@link AbstractFetcher#runFetchLoop()}
Expand Down Expand Up @@ -183,6 +189,8 @@ protected AbstractFetcher(
watermarksPunctuated,
userCodeClassLoader);

this.partitionsToBeRemoved = new HashSet<>();

// check that all seed partition states have a defined offset
for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
if (!partitionState.isOffsetDefined()) {
Expand Down Expand Up @@ -245,6 +253,21 @@ public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) thr
}
}

public void removePartitions(List<KafkaTopicPartition> partitionsToRemove) throws IOException, ClassNotFoundException {
Iterator<KafkaTopicPartitionState<KPH>> iter = subscribedPartitionStates.iterator();
Set<KafkaTopicPartition> fetcherPartitionsToRemove = new HashSet<>();
while (iter.hasNext()) {
KafkaTopicPartitionState<KPH> next = iter.next();
if (partitionsToRemove.contains(next.getKafkaTopicPartition())) {
iter.remove();
fetcherPartitionsToRemove.add(next.getKafkaTopicPartition());
}
}
addPartitionsToBeRemoved(fetcherPartitionsToRemove);
}

protected abstract void addPartitionsToBeRemoved(Set<KafkaTopicPartition> partitionsToRemove);

// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
Expand Down

0 comments on commit a7f23f9

Please sign in to comment.