Skip to content

Commit

Permalink
keep both topicName and topicPartitonName in consumer to avoid new st…
Browse files Browse the repository at this point in the history
…ring
  • Loading branch information
jiazhai committed Aug 27, 2018
1 parent 400bd07 commit 1384675
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 23 deletions.
Expand Up @@ -131,6 +131,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final SubscriptionInitialPosition subscriptionInitialPosition;
private final ConnectionHandler connectionHandler;

private final String topicNameWithoutPartition;

enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
Expand Down Expand Up @@ -203,6 +205,8 @@ enum SubscriptionMode {
NonPersistentAcknowledgmentGroupingTracker.of();
}

topicNameWithoutPartition = topicName.getPartitionedTopicName();

grabCnx();
}

Expand Down Expand Up @@ -1458,6 +1462,10 @@ void grabCnx() {
this.connectionHandler.grabCnx();
}

public String getTopicNameWithoutPartition() {
return topicNameWithoutPartition;
}

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

}
Expand Up @@ -231,7 +231,8 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
checkArgument(message instanceof MessageImpl);
lock.writeLock().lock();
try {
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(), message);
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
unAckedMessageTracker.add(topicMessage.getMessageId());

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -370,15 +371,15 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
}

if (ackType == AckType.Cumulative) {
Consumer individualConsumer = consumers.get(topicMessageId.getTopicName());
Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
if (individualConsumer != null) {
MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.acknowledgeCumulativeAsync(innerId);
} else {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else {
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicName());
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());

MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doAcknowledge(innerId, ackType, properties)
Expand Down
Expand Up @@ -31,16 +31,10 @@ public class TopicMessageIdImpl implements MessageId {
private final String topicName;
private final MessageId messageId;

TopicMessageIdImpl(String topicPartitionName, MessageId messageId) {
TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
this.messageId = messageId;
this.topicPartitionName = topicPartitionName;

int position = topicPartitionName.lastIndexOf(PARTITIONED_TOPIC_SUFFIX);
if (position != -1) {
this.topicName = topicPartitionName.substring(0, position);
} else {
this.topicName = topicPartitionName;
}
this.topicName = topicName;
}

/**
Expand Down
Expand Up @@ -19,12 +19,8 @@

package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;

import java.util.Map;
import java.util.Optional;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
Expand All @@ -38,17 +34,13 @@ public class TopicMessageImpl<T> implements Message<T> {
private final TopicMessageIdImpl messageId;

TopicMessageImpl(String topicPartitionName,
String topicName,
Message<T> msg) {
this.topicPartitionName = topicPartitionName;
int position = topicPartitionName.lastIndexOf(PARTITIONED_TOPIC_SUFFIX);
if (position != -1) {
this.topicName = topicPartitionName.substring(0, position);
} else {
this.topicName = topicPartitionName;
}
this.topicName = topicName;

this.msg = msg;
this.messageId = new TopicMessageIdImpl(topicPartitionName, msg.getMessageId());
this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, msg.getMessageId());
}

/**
Expand Down
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import org.testng.annotations.Test;
Expand Down Expand Up @@ -123,12 +122,15 @@ public void testMessageIdImplCompareToTopicMessageId() {
MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(123L, 345L, 566, 789));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(123L, 345L, 567, 789));
TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(messageIdImpl));
assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than");
assertTrue(messageIdImpl.compareTo(topicMessageId2) == 0, "Expected to be equal");
Expand All @@ -145,9 +147,11 @@ public void testBatchMessageIdImplCompareToTopicMessageId() {
BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new MessageIdImpl(123L, 345L, 566));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new MessageIdImpl(123L, 345L, 567));
assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than");
assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to be greater than");
Expand Down

0 comments on commit 1384675

Please sign in to comment.