Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import static java.util.Collections.emptyList;
import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithNonPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition;
import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;

/**
Expand Down Expand Up @@ -75,7 +75,7 @@ public TopicMetadataListener(List<String> topics) {
List<String> partitions = new ArrayList<>(topics.size());
Map<String, Integer> metadata = new HashMap<>(topics.size());
for (String topic : topics) {
if (isPartitioned(topic)) {
if (isPartition(topic)) {
partitions.add(topic);
} else {
// This would be updated when open writing.
Expand Down Expand Up @@ -120,7 +120,7 @@ public List<String> availableTopics() {
int partitionNums = entry.getValue();
// Get all topics from partitioned and non-partitioned topic names
if (partitionNums == NON_PARTITIONED) {
results.add(topicNameWithNonPartition(entry.getKey()));
results.add(topicNameWithoutPartition(entry.getKey()));
} else {
for (int i = 0; i < partitionNums; i++) {
results.add(topicNameWithPartition(entry.getKey(), i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -63,18 +63,15 @@ protected List<TopicPartition> toTopicPartitions(
.map(range -> new TopicPartition(metadata.getName(), -1, range))
.collect(toList());
} else {
return IntStream.range(0, metadata.getPartitionSize())
.boxed()
.flatMap(
partitionId ->
ranges.stream()
.map(
range ->
new TopicPartition(
metadata.getName(),
partitionId,
range)))
.collect(toList());
List<TopicPartition> partitions = new ArrayList<>();
for (int i = 0; i < metadata.getPartitionSize(); i++) {
for (TopicRange range : ranges) {
TopicPartition partition = new TopicPartition(metadata.getName(), i, range);
partitions.add(partition);
}
}

return partitions;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,68 @@

package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl;

import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.TopicName;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static java.util.stream.Collectors.toSet;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition;

/** the implements of consuming multiple topics. */
public class TopicListSubscriber extends BasePulsarSubscriber {
private static final long serialVersionUID = 6473918213832993116L;

private final List<String> topics;
private final List<String> partitions;
private final List<String> fullTopicNames;

public TopicListSubscriber(List<String> topics) {
this.topics = topics;
public TopicListSubscriber(List<String> fullTopicNameOrPartitions) {
this.partitions = new ArrayList<>();
this.fullTopicNames = new ArrayList<>();

for (String fullTopicNameOrPartition : fullTopicNameOrPartitions) {
if (isPartition(fullTopicNameOrPartition)) {
this.partitions.add(fullTopicNameOrPartition);
} else {
this.fullTopicNames.add(fullTopicNameOrPartition);
}
}
}

@Override
public Set<TopicPartition> getSubscribedTopicPartitions(
PulsarAdmin pulsarAdmin, RangeGenerator rangeGenerator, int parallelism) {
Set<TopicPartition> results = new HashSet<>();

// Query topics from Pulsar.
for (String topic : fullTopicNames) {
TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);
List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism);
List<TopicPartition> list = toTopicPartitions(metadata, ranges);

results.addAll(list);
}

for (String partition : partitions) {
TopicName topicName = TopicName.get(partition);
String name = topicName.getPartitionedTopicName();
int index = topicName.getPartitionIndex();

TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, name);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a partition, should the topicMetadata be created manually or query from the admin api ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query from admin API is ok.

List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism);

for (TopicRange range : ranges) {
results.add(new TopicPartition(name, index, range));
}
}

return topics.parallelStream()
.map(topic -> queryTopicMetadata(pulsarAdmin, topic))
.filter(Objects::nonNull)
.flatMap(
metadata -> {
List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism);
return toTopicPartitions(metadata, ranges).stream();
})
.collect(toSet());
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public static String topicNameWithPartition(String topic, int partitionId) {
}

/** Get a non-partitioned topic name that does not belong to any partitioned topic. */
public static String topicNameWithNonPartition(String topic) {
public static String topicNameWithoutPartition(String topic) {
return TopicName.get(topic).toString();
}

public static boolean isPartitioned(String topic) {
public static boolean isPartition(String topic) {
return TopicName.get(topic).isPartitioned();
}

/** Merge the same topics into one topics. */
/** Merge the same topics into one topic. */
public static List<String> distinctTopics(List<String> topics) {
Set<String> fullTopics = new HashSet<>();
Map<String, List<Integer>> partitionedTopics = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithNonPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -131,7 +131,7 @@ void fetchNonPartitionTopic() {
String topic = randomAlphabetic(10);
operator().createTopic(topic, 0);
List<String> nonPartitionTopic =
Collections.singletonList(topicNameWithNonPartition(topic));
Collections.singletonList(topicNameWithoutPartition(topic));

TopicMetadataListener listener = new TopicMetadataListener(nonPartitionTopic);
long interval = Duration.ofMinutes(15).toMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,62 +18,121 @@

package org.apache.flink.connector.pulsar.source.enumerator.subscriber;

import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;

import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicListSubscriber;
import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
import static org.apache.pulsar.client.api.RegexSubscriptionMode.AllTopics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

/** Unit tests for {@link PulsarSubscriber}. */
class PulsarSubscriberTest extends PulsarTestSuiteBase {

private static final String TOPIC1 = TopicNameUtils.topicName("topic1");
private static final String TOPIC2 = TopicNameUtils.topicName("pattern-topic");
private static final String TOPIC3 = TopicNameUtils.topicName("topic2");
private final String topic1 = topicName("topic-" + randomAlphanumeric(4));
private final String topic2 = topicName("pattern-topic-" + randomAlphanumeric(4));
private final String topic3 = topicName("topic2-" + randomAlphanumeric(4));
private final String topic4 = topicName("non-partitioned-topic-" + randomAlphanumeric(4));
private final String topic5 = topicName("non-partitioned-topic2-" + randomAlphanumeric(4));

private static final int NUM_PARTITIONS_PER_TOPIC = 5;
private static final int NUM_PARALLELISM = 10;

@BeforeAll
void setUp() {
operator().createTopic(topic1, NUM_PARTITIONS_PER_TOPIC);
operator().createTopic(topic2, NUM_PARTITIONS_PER_TOPIC);
operator().createTopic(topic3, NUM_PARTITIONS_PER_TOPIC);
operator().createTopic(topic4, 0);
operator().createTopic(topic5, 0);
}

@AfterAll
void tearDown() {
operator().deleteTopic(topic1);
operator().deleteTopic(topic2);
operator().deleteTopic(topic3);
operator().deleteTopic(topic4);
operator().deleteTopic(topic5);
}

@Test
void topicListSubscriber() {
operator().createTopic(TOPIC1, NUM_PARTITIONS_PER_TOPIC);
operator().createTopic(TOPIC2, NUM_PARTITIONS_PER_TOPIC);

PulsarSubscriber subscriber = getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2));
PulsarSubscriber subscriber = getTopicListSubscriber(Arrays.asList(topic1, topic2));
Set<TopicPartition> topicPartitions =
subscriber.getSubscribedTopicPartitions(
operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM);
Set<TopicPartition> expectedPartitions = new HashSet<>();

for (int i = 0; i < NUM_PARTITIONS_PER_TOPIC; i++) {
expectedPartitions.add(new TopicPartition(TOPIC1, i, createFullRange()));
expectedPartitions.add(new TopicPartition(TOPIC2, i, createFullRange()));
expectedPartitions.add(new TopicPartition(topic1, i, createFullRange()));
expectedPartitions.add(new TopicPartition(topic2, i, createFullRange()));
}

assertEquals(expectedPartitions, topicPartitions);
}

@Test
void subscribeOnePartitionOfMultiplePartitionTopic() {
String partition = topicNameWithPartition(topic1, 2);

operator().deleteTopic(TOPIC1);
operator().deleteTopic(TOPIC2);
PulsarSubscriber subscriber = getTopicListSubscriber(singletonList(partition));
Set<TopicPartition> partitions =
subscriber.getSubscribedTopicPartitions(
operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM);

TopicPartition desiredPartition = new TopicPartition(topic1, 2, createFullRange());
assertThat(partitions).hasSize(1).containsExactly(desiredPartition);
}

@Test
void topicPatternSubscriber() {
operator().createTopic(TOPIC1, NUM_PARTITIONS_PER_TOPIC);
operator().createTopic(TOPIC2, NUM_PARTITIONS_PER_TOPIC);
operator().createTopic(TOPIC3, NUM_PARTITIONS_PER_TOPIC);
void subscribeNonPartitionedTopicList() {
PulsarSubscriber subscriber = getTopicListSubscriber(singletonList(topic4));
Set<TopicPartition> partitions =
subscriber.getSubscribedTopicPartitions(
operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM);

TopicPartition desiredPartition = new TopicPartition(topic4, -1, createFullRange());
assertThat(partitions).hasSize(1).containsExactly(desiredPartition);
}

@Test
void subscribeNonPartitionedTopicPattern() {
PulsarSubscriber subscriber =
getTopicPatternSubscriber(
Pattern.compile("persistent://public/default/non-partitioned-topic*?"),
AllTopics);

Set<TopicPartition> topicPartitions =
subscriber.getSubscribedTopicPartitions(
operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM);

Set<TopicPartition> expectedPartitions = new HashSet<>();

expectedPartitions.add(new TopicPartition(topic4, -1, createFullRange()));
expectedPartitions.add(new TopicPartition(topic5, -1, createFullRange()));

assertEquals(expectedPartitions, topicPartitions);
}

@Test
void topicPatternSubscriber() {
PulsarSubscriber subscriber =
getTopicPatternSubscriber(
Pattern.compile("persistent://public/default/topic*?"), AllTopics);
Expand All @@ -85,14 +144,10 @@ void topicPatternSubscriber() {
Set<TopicPartition> expectedPartitions = new HashSet<>();

for (int i = 0; i < NUM_PARTITIONS_PER_TOPIC; i++) {
expectedPartitions.add(new TopicPartition(TOPIC1, i, createFullRange()));
expectedPartitions.add(new TopicPartition(TOPIC3, i, createFullRange()));
expectedPartitions.add(new TopicPartition(topic1, i, createFullRange()));
expectedPartitions.add(new TopicPartition(topic3, i, createFullRange()));
}

assertEquals(expectedPartitions, topicPartitions);

operator().deleteTopic(TOPIC1);
operator().deleteTopic(TOPIC2);
operator().deleteTopic(TOPIC3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public <T> void setupTopic(
*
* @param topic The name of the topic.
* @param numberOfPartitions The number of partitions. We would create a non-partitioned topic
* if this number if zero.
* if this number is zero.
*/
public void createTopic(String topic, int numberOfPartitions) {
checkArgument(numberOfPartitions >= 0);
Expand Down