Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setter for the reader subscription name #8801

Merged
merged 4 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -31,8 +30,10 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -397,4 +398,70 @@ public void testKeyHashRangeReader() throws IOException {
}

}

@Test
public void testReaderSubName() throws Exception {
doTestReaderSubName(true);
doTestReaderSubName(false);
}

public void doTestReaderSubName(boolean setPrefix) throws Exception {
final String topic = "persistent://my-property/my-ns/testReaderSubName" + System.currentTimeMillis();
final String subName = "my-sub-name";

ReaderBuilder<String> builder = pulsarClient.newReader(Schema.STRING)
.subscriptionName(subName)
.topic(topic)
.startMessageId(MessageId.earliest);
if (setPrefix) {
builder = builder.subscriptionRolePrefix(subName + System.currentTimeMillis());
}
Reader<String> reader = builder.create();
ReaderImpl<String> readerImpl = (ReaderImpl<String>) reader;
assertEquals(readerImpl.getConsumer().getSubscription(), subName);
reader.close();

final String topic2 = "persistent://my-property/my-ns/testReaderSubName2" + System.currentTimeMillis();
admin.topics().createPartitionedTopic(topic2, 3);
builder = pulsarClient.newReader(Schema.STRING)
.subscriptionName(subName)
.topic(topic2)
.startMessageId(MessageId.earliest);
if (setPrefix) {
builder = builder.subscriptionRolePrefix(subName + System.currentTimeMillis());
}
reader = builder.create();
MultiTopicsReaderImpl<String> multiTopicsReader = (MultiTopicsReaderImpl<String>) reader;
multiTopicsReader.getMultiTopicsConsumer().getConsumers()
.forEach(consumerImpl -> assertEquals(consumerImpl.getSubscription(), subName));
multiTopicsReader.close();
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}
315157973 marked this conversation as resolved.
Show resolved Hide resolved

@Test
public void testSameSubName() throws Exception {
final String topic = "persistent://my-property/my-ns/testSameSubName";
final String subName = "my-sub-name";

Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.subscriptionName(subName)
.topic(topic)
.startMessageId(MessageId.earliest).create();
Reader<String> reader2 = null;
try {
315157973 marked this conversation as resolved.
Show resolved Hide resolved
reader2 = pulsarClient.newReader(Schema.STRING)
.subscriptionName(subName)
.topic(topic)
.startMessageId(MessageId.earliest).create();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e instanceof PulsarClientException.ConsumerBusyException);
assertTrue(e.getMessage().contains("Exclusive consumer is already connected"));
}

reader.close();
if (reader2 != null) {
reader2.close();
}
}
315157973 marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,15 @@ public interface ReaderBuilder<T> extends Cloneable {
*/
ReaderBuilder<T> subscriptionRolePrefix(String subscriptionRolePrefix);

/**
* Set the subscription name.
* <p>If subscriptionRolePrefix is set at the same time, this configuration will prevail
*
* @param subscriptionName
* @return the reader builder instance
*/
ReaderBuilder<T> subscriptionName(String subscriptionName);

/**
* If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
* of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public MultiTopicsReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T>
if (StringUtils.isNotBlank(readerConfiguration.getSubscriptionRolePrefix())) {
subscription = readerConfiguration.getSubscriptionRolePrefix() + "-" + subscription;
}
if (StringUtils.isNotBlank(readerConfiguration.getSubscriptionName())) {
subscription = readerConfiguration.getSubscriptionName();
}
ConsumerConfigurationData<T> consumerConfiguration = new ConsumerConfigurationData<>();
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
consumerConfiguration.setSubscriptionName(subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ public ReaderBuilder<T> subscriptionRolePrefix(String subscriptionRolePrefix) {
return this;
}

@Override
public ReaderBuilder<T> subscriptionName(String subscriptionName) {
conf.setSubscriptionName(subscriptionName);
return this;
}

@Override
public ReaderBuilder<T> readCompacted(boolean readCompacted) {
conf.setReadCompacted(readCompacted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConf
if (StringUtils.isNotBlank(readerConfiguration.getSubscriptionRolePrefix())) {
subscription = readerConfiguration.getSubscriptionRolePrefix() + "-" + subscription;
}
if (StringUtils.isNotBlank(readerConfiguration.getSubscriptionName())) {
subscription = readerConfiguration.getSubscriptionName();
}

ConsumerConfigurationData<T> consumerConfiguration = new ConsumerConfigurationData<>();
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {

private String readerName = null;
private String subscriptionRolePrefix = null;
private String subscriptionName = null;

private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
Expand Down