Skip to content

Commit

Permalink
change following sijie's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jiazhai committed Aug 27, 2018
1 parent b745719 commit 400bd07
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,37 @@

public class TopicMessageIdImpl implements MessageId {

/** This topicName is get from ConsumerImpl, it contains partition part. */
/** This topicPartitionName is get from ConsumerImpl, it contains partition part. */
private final String topicPartitionName;
private final String topicName;
private final MessageId messageId;

TopicMessageIdImpl(String topicName, MessageId messageId) {
this.topicName = topicName;
TopicMessageIdImpl(String topicPartitionName, MessageId messageId) {
this.messageId = messageId;
this.topicPartitionName = topicPartitionName;

int position = topicPartitionName.lastIndexOf(PARTITIONED_TOPIC_SUFFIX);
if (position != -1) {
this.topicName = topicPartitionName.substring(0, position);
} else {
this.topicName = topicPartitionName;
}
}

/**
* Get the topic name without partition part of this message.
* @return the name of the topic on which this message was published
*/
public String getTopicName() {
int position = topicName.lastIndexOf(PARTITIONED_TOPIC_SUFFIX);
checkState(position != -1, "Topic Name not contains partition part. " + topicName);
return topicName.substring(0, position);
return this.topicName;
}

/**
* Get the topic name which contains partition part for this message.
* @return the topic name which contains Partition part
*/
public String getTopicPartitionName() {
return topicName;
return this.topicPartitionName;
}

public MessageId getInnerMessageId() {
Expand All @@ -68,7 +74,7 @@ public boolean equals(Object obj) {
return false;
}
TopicMessageIdImpl other = (TopicMessageIdImpl) obj;
return Objects.equals(topicName, other.topicName)
return Objects.equals(topicPartitionName, other.topicPartitionName)
&& Objects.equals(messageId, other.messageId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,40 @@

public class TopicMessageImpl<T> implements Message<T> {

/** This topicName is get from ConsumerImpl, it contains partition part. */
/** This topicPartitionName is get from ConsumerImpl, it contains partition part. */
private final String topicPartitionName;
private final String topicName;
private final Message<T> msg;
private final TopicMessageIdImpl messageId;

TopicMessageImpl(String topicName,
TopicMessageImpl(String topicPartitionName,
Message<T> msg) {
this.topicName = topicName;
this.topicPartitionName = topicPartitionName;
int position = topicPartitionName.lastIndexOf(PARTITIONED_TOPIC_SUFFIX);
if (position != -1) {
this.topicName = topicPartitionName.substring(0, position);
} else {
this.topicName = topicPartitionName;
}

this.msg = msg;
this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId());
this.messageId = new TopicMessageIdImpl(topicPartitionName, msg.getMessageId());
}

/**
* Get the topic name without partition part of this message.
* @return the name of the topic on which this message was published
*/
public String getTopicName() {
int position = topicName.lastIndexOf(PARTITIONED_TOPIC_SUFFIX);
checkState(position != -1, "Topic Name not contains partition part. " + topicName);
return topicName.substring(0, position);
return topicName;
}

/**
* Get the topic name which contains partition part for this message.
* @return the topic name which contains Partition part
*/
public String getTopicPartitionName() {
return topicName;
return topicPartitionName;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public int removeTopicMessages(String topicName) {
int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
checkState(m instanceof TopicMessageIdImpl,
"message should be of type TopicMessageIdImpl");
return ((TopicMessageIdImpl)m).getTopicName().contains(topicName);
return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
});
int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
checkState(m instanceof TopicMessageIdImpl,
"message should be of type TopicMessageIdImpl");
return ((TopicMessageIdImpl)m).getTopicName().contains(topicName);
return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
});

return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ public void testCompareDifferentType() {
public void testMessageIdImplCompareToTopicMessageId() {
MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
"test-topic",
"test-topic-partition-0",
new BatchMessageIdImpl(123L, 345L, 566, 789));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
"test-topic",
"test-topic-partition-0",
new BatchMessageIdImpl(123L, 345L, 567, 789));
TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl(
"test-topic",
"test-topic-partition-0",
new BatchMessageIdImpl(messageIdImpl));
assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than");
assertTrue(messageIdImpl.compareTo(topicMessageId2) == 0, "Expected to be equal");
Expand All @@ -144,10 +144,10 @@ public void testBatchMessageIdImplCompareToTopicMessageId() {
BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0);
BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
"test-topic",
"test-topic-partition-0",
new MessageIdImpl(123L, 345L, 566));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
"test-topic",
"test-topic-partition-0",
new MessageIdImpl(123L, 345L, 567));
assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than");
assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to be greater than");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void testSubscriptionInitialPosition(int numTopics) throws Exception {
Message<String> m = consumer.receive();
int topicIdx;
if (numTopics > 1) {
String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicName();
String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicPartitionName();

String[] topicParts = StringUtils.split(topic, '-');
topicIdx = Integer.parseInt(topicParts[topicParts.length - 1]);
Expand Down

0 comments on commit 400bd07

Please sign in to comment.