Skip to content

Commit

Permalink
Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)
Browse files Browse the repository at this point in the history
* change getTopicName in MultiTopicsConsumer

* change following sijie's comments

* keep both topicName and topicPartitonName in consumer to avoid new string
  • Loading branch information
jiazhai authored and sijie committed Aug 27, 2018
1 parent 96bf00f commit 2595941
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 16 deletions.
Expand Up @@ -131,6 +131,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final SubscriptionInitialPosition subscriptionInitialPosition;
private final ConnectionHandler connectionHandler;

private final String topicNameWithoutPartition;

enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
Expand Down Expand Up @@ -203,6 +205,8 @@ enum SubscriptionMode {
NonPersistentAcknowledgmentGroupingTracker.of();
}

topicNameWithoutPartition = topicName.getPartitionedTopicName();

grabCnx();
}

Expand Down Expand Up @@ -1458,6 +1462,10 @@ void grabCnx() {
this.connectionHandler.grabCnx();
}

public String getTopicNameWithoutPartition() {
return topicNameWithoutPartition;
}

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

}
Expand Up @@ -230,7 +230,8 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
checkArgument(message instanceof MessageImpl);
lock.writeLock().lock();
try {
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(), message);
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
unAckedMessageTracker.add(topicMessage.getMessageId());

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -369,15 +370,15 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
}

if (ackType == AckType.Cumulative) {
Consumer individualConsumer = consumers.get(topicMessageId.getTopicName());
Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
if (individualConsumer != null) {
MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.acknowledgeCumulativeAsync(innerId);
} else {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else {
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicName());
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());

MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doAcknowledge(innerId, ackType, properties)
Expand Down Expand Up @@ -510,7 +511,7 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
}
removeExpiredMessagesFromQueue(messageIds);
messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, Collectors.toSet()))
.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet()))
.forEach((topicName, messageIds1) ->
consumers.get(topicName)
.redeliverUnacknowledgedMessages(messageIds1.stream()
Expand Down
Expand Up @@ -18,20 +18,39 @@
*/
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;

import java.util.Objects;
import org.apache.pulsar.client.api.MessageId;

public class TopicMessageIdImpl implements MessageId {

/** This topicPartitionName is get from ConsumerImpl, it contains partition part. */
private final String topicPartitionName;
private final String topicName;
private final MessageId messageId;

TopicMessageIdImpl(String topicName, MessageId messageId) {
this.topicName = topicName;
TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
this.messageId = messageId;
this.topicPartitionName = topicPartitionName;
this.topicName = topicName;
}

/**
* Get the topic name without partition part of this message.
* @return the name of the topic on which this message was published
*/
public String getTopicName() {
return topicName;
return this.topicName;
}

/**
* Get the topic name which contains partition part for this message.
* @return the topic name which contains Partition part
*/
public String getTopicPartitionName() {
return this.topicPartitionName;
}

public MessageId getInnerMessageId() {
Expand All @@ -49,7 +68,7 @@ public boolean equals(Object obj) {
return false;
}
TopicMessageIdImpl other = (TopicMessageIdImpl) obj;
return Objects.equals(topicName, other.topicName)
return Objects.equals(topicPartitionName, other.topicPartitionName)
&& Objects.equals(messageId, other.messageId);
}

Expand Down
Expand Up @@ -21,32 +21,44 @@

import java.util.Map;
import java.util.Optional;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;

public class TopicMessageImpl<T> implements Message<T> {

/** This topicPartitionName is get from ConsumerImpl, it contains partition part. */
private final String topicPartitionName;
private final String topicName;
private final Message<T> msg;
private final TopicMessageIdImpl messageId;

TopicMessageImpl(String topicName,
TopicMessageImpl(String topicPartitionName,
String topicName,
Message<T> msg) {
this.topicPartitionName = topicPartitionName;
this.topicName = topicName;

this.msg = msg;
this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId());
this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, msg.getMessageId());
}

/**
* Get the topic name of this message.
* Get the topic name without partition part of this message.
* @return the name of the topic on which this message was published
*/
public String getTopicName() {
return topicName;
}

/**
* Get the topic name which contains partition part for this message.
* @return the topic name which contains Partition part
*/
public String getTopicPartitionName() {
return topicPartitionName;
}

@Override
public MessageId getMessageId() {
return messageId;
Expand Down
Expand Up @@ -32,12 +32,12 @@ public int removeTopicMessages(String topicName) {
int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
checkState(m instanceof TopicMessageIdImpl,
"message should be of type TopicMessageIdImpl");
return ((TopicMessageIdImpl)m).getTopicName().contains(topicName);
return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
});
int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
checkState(m instanceof TopicMessageIdImpl,
"message should be of type TopicMessageIdImpl");
return ((TopicMessageIdImpl)m).getTopicName().contains(topicName);
return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
});

return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
Expand Down
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import org.testng.annotations.Test;
Expand Down Expand Up @@ -122,12 +121,15 @@ public void testCompareDifferentType() {
public void testMessageIdImplCompareToTopicMessageId() {
MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(123L, 345L, 566, 789));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(123L, 345L, 567, 789));
TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(messageIdImpl));
assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than");
Expand All @@ -144,9 +146,11 @@ public void testBatchMessageIdImplCompareToTopicMessageId() {
BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0);
BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new MessageIdImpl(123L, 345L, 566));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new MessageIdImpl(123L, 345L, 567));
assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than");
Expand Down
Expand Up @@ -245,7 +245,7 @@ private void testSubscriptionInitialPosition(int numTopics) throws Exception {
Message<String> m = consumer.receive();
int topicIdx;
if (numTopics > 1) {
String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicName();
String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicPartitionName();

String[] topicParts = StringUtils.split(topic, '-');
topicIdx = Integer.parseInt(topicParts[topicParts.length - 1]);
Expand Down

0 comments on commit 2595941

Please sign in to comment.