Skip to content

Commit

Permalink
Add optional key/value metadata to consumers. (#1031)
Browse files Browse the repository at this point in the history
  • Loading branch information
cckellogg authored and merlimat committed Jan 7, 2018
1 parent 5263e64 commit 6bd1138
Show file tree
Hide file tree
Showing 16 changed files with 312 additions and 51 deletions.
Expand Up @@ -96,8 +96,10 @@ public class Consumer {
private volatile int unackedMessages = 0; private volatile int unackedMessages = 0;
private volatile boolean blockedConsumerOnUnackedMsgs = false; private volatile boolean blockedConsumerOnUnackedMsgs = false;


private final Map<String, String> metadata;

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException { int maxUnackedMessages, ServerCnx cnx, String appId, Map<String, String> metadata) throws BrokerServiceException {


this.subscription = subscription; this.subscription = subscription;
this.subType = subType; this.subType = subType;
Expand All @@ -114,11 +116,14 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
MESSAGE_PERMITS_UPDATER.set(this, 0); MESSAGE_PERMITS_UPDATER.set(this, 0);
UNACKED_MESSAGES_UPDATER.set(this, 0); UNACKED_MESSAGES_UPDATER.set(this, 0);


this.metadata = metadata != null ? metadata : Collections.emptyMap();

stats = new ConsumerStats(); stats = new ConsumerStats();
stats.address = cnx.clientAddress().toString(); stats.address = cnx.clientAddress().toString();
stats.consumerName = consumerName; stats.consumerName = consumerName;
stats.connectedSince = DateFormatter.now(); stats.connectedSince = DateFormatter.now();
stats.clientVersion = cnx.getClientVersion(); stats.clientVersion = cnx.getClientVersion();
stats.metadata = this.metadata;


if (subType == SubType.Shared) { if (subType == SubType.Shared) {
this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1); this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
Expand Down
Expand Up @@ -367,6 +367,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
: null; : null;


final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0; final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);


authorizationFuture.thenApply(isAuthorized -> { authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) { if (isAuthorized) {
Expand All @@ -376,6 +377,14 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {


log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);


try {
Metadata.validateMetadata(metadata);
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
return null;
}

CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>(); CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture); CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture);


Expand All @@ -400,7 +409,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
} }


service.getTopic(topicName).thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, service.getTopic(topicName).thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName,
consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId)) consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata))
.thenAccept(consumer -> { .thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) { if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName, log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/ */
package org.apache.pulsar.broker.service; package org.apache.pulsar.broker.service;


import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
Expand Down Expand Up @@ -77,7 +78,8 @@ default long getOriginalSequenceId() {
void removeProducer(Producer producer); void removeProducer(Producer producer);


CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType, CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId); int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata);


CompletableFuture<Subscription> createSubscription(String subscriptionName); CompletableFuture<Subscription> createSubscription(String subscriptionName);


Expand Down
Expand Up @@ -24,6 +24,7 @@


import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -295,7 +296,8 @@ public void removeProducer(Producer producer) {


@Override @Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId, public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId) { SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata) {


final CompletableFuture<Consumer> future = new CompletableFuture<>(); final CompletableFuture<Consumer> future = new CompletableFuture<>();


Expand Down Expand Up @@ -334,7 +336,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri


try { try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx, Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole()); cnx.getRole(), metadata);
subscription.addConsumer(consumer); subscription.addConsumer(consumer);
if (!cnx.isActive()) { if (!cnx.isActive()) {
consumer.close(); consumer.close();
Expand Down
Expand Up @@ -25,10 +25,7 @@
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Collections; import java.util.*;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -385,7 +382,8 @@ public void removeProducer(Producer producer) {


@Override @Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId, public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId) { SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata) {


final CompletableFuture<Consumer> future = new CompletableFuture<>(); final CompletableFuture<Consumer> future = new CompletableFuture<>();


Expand Down Expand Up @@ -436,7 +434,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
subscriptionFuture.thenAccept(subscription -> { subscriptionFuture.thenAccept(subscription -> {
try { try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
maxUnackedMessages, cnx, cnx.getRole()); maxUnackedMessages, cnx, cnx.getRole(), metadata);
subscription.addConsumer(consumer); subscription.addConsumer(consumer);
if (!cnx.isActive()) { if (!cnx.isActive()) {
consumer.close(); consumer.close();
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -208,7 +209,7 @@ public void testAddRemoveConsumer() throws Exception {


// 2. Add consumer // 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1"); "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap());
pdfc.addConsumer(consumer1); pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers(); List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName()); assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
Expand All @@ -225,15 +226,15 @@ public void testAddRemoveConsumer() throws Exception {


// 5. Add another consumer which does not change active consumer // 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1"); 50000, serverCnx, "myrole-1", Collections.emptyMap());
pdfc.addConsumer(consumer2); pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers(); consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(3, consumers.size()); assertEquals(3, consumers.size());


// 6. Add a consumer which changes active consumer // 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, Consumer consumer0 = new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1"); "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap());
pdfc.addConsumer(consumer0); pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers(); consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName()); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
Expand Down Expand Up @@ -440,7 +441,9 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche
} }


private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception { private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
Consumer consumer = new Consumer(null, SubType.Shared, null, id, priority, ""+id, 5000, serverCnx, "appId"); Consumer consumer =
new Consumer(null, SubType.Shared, null, id, priority, ""+id, 5000,
serverCnx, "appId", Collections.emptyMap());
try { try {
consumer.flowPermits(permit); consumer.flowPermits(permit);
} catch (Exception e) { } catch (Exception e) {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;


import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();


Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get(); f1.get();


final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -177,7 +178,7 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();


Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get(); f1.get();


final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -239,7 +240,7 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();


Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get(); f1.get();


final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -297,7 +298,7 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();


Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get(); f1.get();


final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down
Expand Up @@ -42,6 +42,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -362,7 +363,7 @@ public void testSubscribeFail() throws Exception {
.setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build(); .setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();


Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
try { try {
f1.get(); f1.get();
fail("should fail with exception"); fail("should fail with exception");
Expand All @@ -381,12 +382,12 @@ public void testSubscribeUnsubscribe() throws Exception {


// 1. simple subscribe // 1. simple subscribe
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get(); f1.get();


// 2. duplicate subscribe // 2. duplicate subscribe
Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());


try { try {
f2.get(); f2.get();
Expand All @@ -410,7 +411,7 @@ public void testAddRemoveConsumer() throws Exception {


// 1. simple add consumer // 1. simple add consumer
Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1"); 50000, serverCnx, "myrole-1", Collections.emptyMap());
sub.addConsumer(consumer); sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected()); assertTrue(sub.getDispatcher().isConsumerConnected());


Expand Down Expand Up @@ -440,7 +441,7 @@ public void testUbsubscribeRaceConditions() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1"); 50000, serverCnx, "myrole-1", Collections.emptyMap());
sub.addConsumer(consumer1); sub.addConsumer(consumer1);


doAnswer(new Answer<Object>() { doAnswer(new Answer<Object>() {
Expand All @@ -462,7 +463,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
try { try {
Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */ Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1"); 50000, serverCnx, "myrole-1", Collections.emptyMap());
} catch (BrokerServiceException e) { } catch (BrokerServiceException e) {
assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException); assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
} }
Expand Down Expand Up @@ -492,7 +493,7 @@ public void testDeleteTopic() throws Exception {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();


Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get(); f1.get();


assertTrue(topic.delete().isCompletedExceptionally()); assertTrue(topic.delete().isCompletedExceptionally());
Expand All @@ -507,7 +508,7 @@ public void testDeleteAndUnsubscribeTopic() throws Exception {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();


Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get(); f1.get();


final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -561,7 +562,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();


Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get(); f1.get();


final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -648,7 +649,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();


Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null); 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());


try { try {
f.get(); f.get();
Expand Down Expand Up @@ -759,7 +760,7 @@ public void testFailoverSubscription() throws Exception {


// 1. Subscribe with non partition topic // 1. Subscribe with non partition topic
Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(), Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null); cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap());
f1.get(); f1.get();


// 2. Subscribe with partition topic // 2. Subscribe with partition topic
Expand All @@ -770,7 +771,7 @@ public void testFailoverSubscription() throws Exception {
.setSubType(SubType.Failover).build(); .setSubType(SubType.Failover).build();


Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(), Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null); cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap());
f2.get(); f2.get();


// 3. Subscribe and create second consumer // 3. Subscribe and create second consumer
Expand All @@ -779,7 +780,7 @@ public void testFailoverSubscription() throws Exception {
.setSubType(SubType.Failover).build(); .setSubType(SubType.Failover).build();


Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(), Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null); cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap());
f3.get(); f3.get();


assertEquals( assertEquals(
Expand All @@ -799,7 +800,7 @@ public void testFailoverSubscription() throws Exception {
.setSubType(SubType.Failover).build(); .setSubType(SubType.Failover).build();


Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(), Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null); cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap());
f4.get(); f4.get();


assertEquals( assertEquals(
Expand All @@ -824,7 +825,7 @@ public void testFailoverSubscription() throws Exception {
.setSubType(SubType.Exclusive).build(); .setSubType(SubType.Exclusive).build();


Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(), Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null); cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap());


try { try {
f5.get(); f5.get();
Expand All @@ -840,7 +841,7 @@ public void testFailoverSubscription() throws Exception {
.setSubType(SubType.Exclusive).build(); .setSubType(SubType.Exclusive).build();


Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(), Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null); cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap());
f6.get(); f6.get();


// 7. unsubscribe exclusive sub // 7. unsubscribe exclusive sub
Expand Down

0 comments on commit 6bd1138

Please sign in to comment.