Skip to content

Commit

Permalink
Delete PartitionedConsumerImpl, use TopicsConsumerImpl instead (#1365)
Browse files Browse the repository at this point in the history
* delete partitionedConsumer, use topicsConsumer instead

* change following comments

* rebase master, rename TopicsConsumerImpl to MultiTopicsConsumerImpl

* avoid dup calling getPartitionedTopicMetadata

* rebase master, fix test error
  • Loading branch information
zhaijack authored and merlimat committed Apr 4, 2018
1 parent b052669 commit 1dd9c43
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 755 deletions.
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -323,13 +324,11 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
final String subName = "sub1"; final String subName = "sub1";
final int numMsgs = 100; final int numMsgs = 100;
Set<String> uniqueMessages = new HashSet<>(); Set<String> uniqueMessages = new HashSet<>();

admin.persistentTopics().createPartitionedTopic(topicName, numPartitions); admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);


ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover); .subscriptionType(SubscriptionType.Failover);



// 1. two consumers on the same subscription // 1. two consumers on the same subscription
ActiveInactiveListenerEvent listener1 = new ActiveInactiveListenerEvent(); ActiveInactiveListenerEvent listener1 = new ActiveInactiveListenerEvent();
ActiveInactiveListenerEvent listener2 = new ActiveInactiveListenerEvent(); ActiveInactiveListenerEvent listener2 = new ActiveInactiveListenerEvent();
Expand Down Expand Up @@ -374,7 +373,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
} }
totalMessages++; totalMessages++;
consumer1.acknowledge(msg); consumer1.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
receivedPtns.add(msgId.getPartitionIndex()); receivedPtns.add(msgId.getPartitionIndex());
} }


Expand All @@ -391,7 +390,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
} }
totalMessages++; totalMessages++;
consumer2.acknowledge(msg); consumer2.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
receivedPtns.add(msgId.getPartitionIndex()); receivedPtns.add(msgId.getPartitionIndex());
} }
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty()); assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());
Expand Down
Expand Up @@ -208,7 +208,8 @@ public void partitionedProducerSendAsync() throws PulsarClientException, PulsarA
Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully"); Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully");


for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
MessageId messageId = consumer.receive().getMessageId(); MessageId topicMessageId = consumer.receive().getMessageId();
MessageId messageId = ((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
log.info("Message ID Received = " + messageId); log.info("Message ID Received = " + messageId);
Assert.assertTrue(messageIds.remove(messageId), "Failed to receive Message"); Assert.assertTrue(messageIds.remove(messageId), "Failed to receive Message");
} }
Expand Down Expand Up @@ -247,7 +248,9 @@ public void partitionedProducerSend() throws PulsarClientException, PulsarAdminE
Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully"); Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully");


for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
Assert.assertTrue(messageIds.remove(consumer.receive().getMessageId()), "Failed to receive Message"); MessageId topicMessageId = consumer.receive().getMessageId();
MessageId messageId = ((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
Assert.assertTrue(messageIds.remove(messageId), "Failed to receive Message");
} }
log.info("Message IDs = " + messageIds); log.info("Message IDs = " + messageIds);
Assert.assertEquals(messageIds.size(), 0, "Not all messages received successfully"); Assert.assertEquals(messageIds.size(), 0, "Not all messages received successfully");
Expand Down
Expand Up @@ -161,21 +161,21 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
.subscribe(); .subscribe();


// 4. verify consumer get methods, to get right number of partitions and topics. // 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics(); List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
List<ConsumerImpl<byte[]>> consumers = ((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers(); List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();


assertEquals(topics.size(), 6); assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6); assertEquals(consumers.size(), 6);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);


topics.forEach(topic -> log.debug("topic: {}", topic)); topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));


IntStream.range(0, topics.size()).forEach(index -> IntStream.range(0, topics.size()).forEach(index ->
assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));


((PatternTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); ((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));


// 5. produce data // 5. produce data
for (int i = 0; i < totalMessages / 3; i++) { for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -235,8 +235,8 @@ public void testTopicsListMinus() throws Exception {
List<String> oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); List<String> oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4);
List<String> newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6); List<String> newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6);


List<String> addedNames = PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames); List<String> addedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
List<String> removedNames = PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames); List<String> removedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);


assertTrue(addedNames.size() == 2 && assertTrue(addedNames.size() == 2 &&
addedNames.contains(topicName5) && addedNames.contains(topicName5) &&
Expand All @@ -246,21 +246,21 @@ public void testTopicsListMinus() throws Exception {
removedNames.contains(topicName2)); removedNames.contains(topicName2));


// totally 2 different list, should return content of first lists. // totally 2 different list, should return content of first lists.
List<String> addedNames2 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames); List<String> addedNames2 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
assertTrue(addedNames2.size() == 2 && assertTrue(addedNames2.size() == 2 &&
addedNames2.contains(topicName5) && addedNames2.contains(topicName5) &&
addedNames2.contains(topicName6)); addedNames2.contains(topicName6));


// 2 same list, should return empty list. // 2 same list, should return empty list.
List<String> addedNames3 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames); List<String> addedNames3 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
assertEquals(addedNames3.size(), 0); assertEquals(addedNames3.size(), 0);


// empty list minus: addedNames2.size = 2, addedNames3.size = 0 // empty list minus: addedNames2.size = 2, addedNames3.size = 0
List<String> addedNames4 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3); List<String> addedNames4 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
assertTrue(addedNames4.size() == addedNames2.size()); assertTrue(addedNames4.size() == addedNames2.size());
addedNames4.forEach(name -> assertTrue(addedNames2.contains(name))); addedNames4.forEach(name -> assertTrue(addedNames2.contains(name)));


List<String> addedNames5 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2); List<String> addedNames5 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
assertEquals(addedNames5.size(), 0); assertEquals(addedNames5.size(), 0);
} }


Expand Down Expand Up @@ -290,10 +290,10 @@ public void testStartEmptyPatternConsumer() throws Exception {
.subscribe(); .subscribe();


// 3. verify consumer get methods, to get 0 number of partitions and topics. // 3. verify consumer get methods, to get 0 number of partitions and topics.
assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 0); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 0);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 0); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 0);


// 4. create producer // 4. create producer
String messagePredicate = "my-message-" + key + "-"; String messagePredicate = "my-message-" + key + "-";
Expand All @@ -310,15 +310,15 @@ public void testStartEmptyPatternConsumer() throws Exception {


// 5. call recheckTopics to subscribe each added topics above // 5. call recheckTopics to subscribe each added topics above
log.debug("recheck topics change"); log.debug("recheck topics change");
PatternTopicsConsumerImpl<byte[]> consumer1 = ((PatternTopicsConsumerImpl<byte[]>) consumer); PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout()); consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100); Thread.sleep(100);


// 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3. // 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3.
assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);




// 7. produce data // 7. produce data
Expand Down Expand Up @@ -384,13 +384,13 @@ public void testAutoSubscribePatternConsumer() throws Exception {
.receiverQueueSize(4) .receiverQueueSize(4)
.subscribe(); .subscribe();


assertTrue(consumer instanceof PatternTopicsConsumerImpl); assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);


// 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3 // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);


// 5. produce data to topic 1,2,3; verify should receive all the message // 5. produce data to topic 1,2,3; verify should receive all the message
for (int i = 0; i < totalMessages / 3; i++) { for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -419,12 +419,12 @@ public void testAutoSubscribePatternConsumer() throws Exception {


// 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4 // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4
log.debug("recheck topics change"); log.debug("recheck topics change");
PatternTopicsConsumerImpl<byte[]> consumer1 = ((PatternTopicsConsumerImpl<byte[]>) consumer); PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout()); consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100); Thread.sleep(100);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 10); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 10);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 10); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 10);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 4); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 4);


// 8. produce data to topic3 and topic4, verify should receive all the message // 8. produce data to topic3 and topic4, verify should receive all the message
for (int i = 0; i < totalMessages / 2; i++) { for (int i = 0; i < totalMessages / 2; i++) {
Expand Down Expand Up @@ -487,13 +487,13 @@ public void testAutoUnbubscribePatternConsumer() throws Exception {
.receiverQueueSize(4) .receiverQueueSize(4)
.subscribe(); .subscribe();


assertTrue(consumer instanceof PatternTopicsConsumerImpl); assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);


// 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3 // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);


// 5. produce data to topic 1,2,3; verify should receive all the message // 5. produce data to topic 1,2,3; verify should receive all the message
for (int i = 0; i < totalMessages / 3; i++) { for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -521,12 +521,12 @@ public void testAutoUnbubscribePatternConsumer() throws Exception {


// 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3 // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3
log.debug("recheck topics change"); log.debug("recheck topics change");
PatternTopicsConsumerImpl<byte[]> consumer1 = ((PatternTopicsConsumerImpl<byte[]>) consumer); PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout()); consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100); Thread.sleep(100);
assertEquals(((PatternTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2); assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2);
assertEquals(((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2); assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2);
assertEquals(((PatternTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 1); assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 1);


// 8. produce data to topic2, verify should receive all the message // 8. produce data to topic2, verify should receive all the message
for (int i = 0; i < totalMessages; i++) { for (int i = 0; i < totalMessages; i++) {
Expand Down
Expand Up @@ -338,7 +338,7 @@ public void testFailoverAckedNormalTopic() throws Exception {
} }


private static long getUnackedMessagesCountInPartitionedConsumer(Consumer<byte[]> c) { private static long getUnackedMessagesCountInPartitionedConsumer(Consumer<byte[]> c) {
PartitionedConsumerImpl<byte[]> pc = (PartitionedConsumerImpl<byte[]>) c; MultiTopicsConsumerImpl<byte[]> pc = (MultiTopicsConsumerImpl<byte[]>) c;
return pc.getUnAckedMessageTracker().size() return pc.getUnAckedMessageTracker().size()
+ pc.getConsumers().stream().mapToLong(consumer -> consumer.getUnAckedMessageTracker().size()).sum(); + pc.getConsumers().stream().mapToLong(consumer -> consumer.getUnAckedMessageTracker().size()).sum();
} }
Expand Down Expand Up @@ -405,8 +405,8 @@ public void testSharedAckedPartitionedTopic() throws Exception {
assertEquals(received, 5); assertEquals(received, 5);


// 7. Simulate ackTimeout // 7. Simulate ackTimeout
((PartitionedConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle(); ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
((PartitionedConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());


// 8. producer publish more messages // 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) { for (int i = 0; i < totalMessages / 3; i++) {
Expand Down

0 comments on commit 1dd9c43

Please sign in to comment.