Skip to content

Commit

Permalink
[FLINK-7407] [kafka] Adapt AbstractPartitionDiscoverer to handle non-…
Browse files Browse the repository at this point in the history
…contiguous partition metadata

Previously, the AbstractPartitionDiscoverer tracked discovered
partitions by keeping only the largest discovered partition id. All
fetched partition metadata with ids smaller than this id would be
considered as discovered. This assumption of contiguous partition ids is
too naive for corner cases where there may be undiscovered partitions
that were temporariliy unavilable before and were shadowed by
discoverered partitions with largerer partition ids.

This commit changes to use a set to track seen partitions. This also
removes the need of pre-sorting fetched partitions.
  • Loading branch information
tzulitai committed Sep 7, 2017
1 parent d0636c8 commit 93369e7
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 16 deletions.
Expand Up @@ -17,10 +17,10 @@

package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -68,7 +68,7 @@ public abstract class AbstractPartitionDiscoverer {
* to keep track of only the largest partition id because Kafka partition numbers are only
* allowed to be increased and has incremental ids.
*/
private Map<String, Integer> topicsToLargestDiscoveredPartitionId;
private Set<KafkaTopicPartition> discoveredPartitions;

public AbstractPartitionDiscoverer(
KafkaTopicsDescriptor topicsDescriptor,
Expand All @@ -78,7 +78,7 @@ public AbstractPartitionDiscoverer(
this.topicsDescriptor = checkNotNull(topicsDescriptor);
this.indexOfThisSubtask = indexOfThisSubtask;
this.numParallelSubtasks = numParallelSubtasks;
this.topicsToLargestDiscoveredPartitionId = new HashMap<>();
this.discoveredPartitions = new HashSet<>();
}

/**
Expand Down Expand Up @@ -149,10 +149,6 @@ public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, Cl
if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
} else {
// sort so that we make sure the topicsToLargestDiscoveredPartitionId state is updated
// with incremental partition ids of the same topics (otherwise some partition ids may be skipped)
KafkaTopicPartition.sort(newDiscoveredPartitions);

Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while (iter.hasNext()) {
Expand Down Expand Up @@ -196,7 +192,7 @@ public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, Cl
*/
public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
discoveredPartitions.add(partition);

return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
}
Expand Down Expand Up @@ -246,8 +242,7 @@ public static final class ClosedException extends Exception {
}

private boolean isUndiscoveredPartition(KafkaTopicPartition partition) {
return !topicsToLargestDiscoveredPartitionId.containsKey(partition.getTopic())
|| partition.getPartition() > topicsToLargestDiscoveredPartitionId.get(partition.getTopic());
return !discoveredPartitions.contains(partition);
}

public static boolean shouldAssignToThisSubtask(KafkaTopicPartition partition, int indexOfThisSubtask, int numParallelSubtasks) {
Expand Down
Expand Up @@ -19,7 +19,6 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -131,8 +130,4 @@ public int compare(KafkaTopicPartition p1, KafkaTopicPartition p2) {
}
}
}

public static void sort(List<KafkaTopicPartition> partitions) {
Collections.sort(partitions, new Comparator());
}
}
Expand Up @@ -394,6 +394,41 @@ public void testDeterministicAssignmentWithDifferentFetchedPartitionOrdering() t
}
}

@Test
public void testNonContiguousPartitionIdDiscovery() throws Exception {
List<KafkaTopicPartition> mockGetAllPartitionsForTopicsReturn1 = Arrays.asList(
new KafkaTopicPartition("test-topic", 1),
new KafkaTopicPartition("test-topic", 4));

List<KafkaTopicPartition> mockGetAllPartitionsForTopicsReturn2 = Arrays.asList(
new KafkaTopicPartition("test-topic", 0),
new KafkaTopicPartition("test-topic", 1),
new KafkaTopicPartition("test-topic", 2),
new KafkaTopicPartition("test-topic", 3),
new KafkaTopicPartition("test-topic", 4));

TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
topicsDescriptor,
0,
1,
TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList("test-topic")),
// first metadata fetch has missing partitions that appears only in the second fetch;
// need to create new modifiable lists for each fetch, since internally Iterable.remove() is used.
Arrays.asList(new ArrayList<>(mockGetAllPartitionsForTopicsReturn1), new ArrayList<>(mockGetAllPartitionsForTopicsReturn2)));
partitionDiscoverer.open();

List<KafkaTopicPartition> discoveredPartitions1 = partitionDiscoverer.discoverPartitions();
assertEquals(2, discoveredPartitions1.size());
assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 1)));
assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 4)));

List<KafkaTopicPartition> discoveredPartitions2 = partitionDiscoverer.discoverPartitions();
assertEquals(3, discoveredPartitions2.size());
assertTrue(discoveredPartitions2.contains(new KafkaTopicPartition("test-topic", 0)));
assertTrue(discoveredPartitions2.contains(new KafkaTopicPartition("test-topic", 2)));
assertTrue(discoveredPartitions2.contains(new KafkaTopicPartition("test-topic", 3)));
}

private boolean contains(List<KafkaTopicPartition> partitions, int partition) {
for (KafkaTopicPartition ktp : partitions) {
if (ktp.getPartition() == partition) {
Expand Down

0 comments on commit 93369e7

Please sign in to comment.