Skip to content

Commit

Permalink
changes make MessageId Comparable (#1065)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaijack authored and merlimat committed Jan 17, 2018
1 parent 5795569 commit dd1f64c
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 14 deletions.
Expand Up @@ -30,7 +30,7 @@
* *
* *
*/ */
public interface MessageId { public interface MessageId extends Comparable<MessageId>{


/** /**
* Serialize the message ID into a byte array * Serialize the message ID into a byte array
Expand Down
Expand Up @@ -19,10 +19,11 @@
package org.apache.pulsar.client.impl; package org.apache.pulsar.client.impl;


import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import org.apache.pulsar.client.api.MessageId;


/** /**
*/ */
public class BatchMessageIdImpl extends MessageIdImpl implements Comparable<MessageIdImpl> { public class BatchMessageIdImpl extends MessageIdImpl {
private final int batchIndex; private final int batchIndex;


public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex) { public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex) {
Expand All @@ -44,16 +45,19 @@ public int getBatchIndex() {
} }


@Override @Override
public int compareTo(MessageIdImpl o) { public int compareTo(MessageId o) {
if (!(o instanceof BatchMessageIdImpl)) { if (!(o instanceof BatchMessageIdImpl)) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"expected BatchMessageIdImpl object. Got instance of " + o.getClass().getName()); "expected BatchMessageIdImpl object. Got instance of " + o.getClass().getName());
} }


BatchMessageIdImpl other = (BatchMessageIdImpl) o; BatchMessageIdImpl other = (BatchMessageIdImpl) o;
return ComparisonChain.start().compare(this.ledgerId, other.ledgerId).compare(this.entryId, other.entryId) return ComparisonChain.start()
.compare(this.batchIndex, other.batchIndex).compare(this.getPartitionIndex(), other.getPartitionIndex()) .compare(this.ledgerId, other.ledgerId)
.result(); .compare(this.entryId, other.entryId)
.compare(this.batchIndex, other.batchIndex)
.compare(this.getPartitionIndex(), other.getPartitionIndex())
.result();
} }


@Override @Override
Expand Down
Expand Up @@ -34,7 +34,7 @@
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;


public class MessageIdImpl implements MessageId, Comparable<MessageIdImpl> { public class MessageIdImpl implements MessageId {
protected final long ledgerId; protected final long ledgerId;
protected final long entryId; protected final long entryId;
protected final int partitionIndex; protected final int partitionIndex;
Expand Down Expand Up @@ -63,13 +63,6 @@ public int getPartitionIndex() {
return partitionIndex; return partitionIndex;
} }


@Override
public int compareTo(MessageIdImpl other) {

return ComparisonChain.start().compare(this.ledgerId, other.ledgerId).compare(this.entryId, other.entryId)
.compare(this.getPartitionIndex(), other.getPartitionIndex()).result();
}

@Override @Override
public int hashCode() { public int hashCode() {
return (int) (31 * (ledgerId + 31 * entryId) + partitionIndex); return (int) (31 * (ledgerId + 31 * entryId) + partitionIndex);
Expand Down Expand Up @@ -152,4 +145,19 @@ public byte[] toByteArray() {
// there is no message batch so we pass -1 // there is no message batch so we pass -1
return toByteArray(-1); return toByteArray(-1);
} }

@Override
public int compareTo(MessageId o) {
if (!(o instanceof MessageIdImpl)) {
throw new IllegalArgumentException(
"expected MessageIdImpl object. Got instance of " + o.getClass().getName());
}

MessageIdImpl other = (MessageIdImpl) o;
return ComparisonChain.start()
.compare(this.ledgerId, other.ledgerId)
.compare(this.entryId, other.entryId)
.compare(this.getPartitionIndex(), other.getPartitionIndex())
.result();
}
} }
@@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import org.testng.annotations.Test;

/**
* Test compareTo method in MessageIdImpl and BatchMessageIdImpl
*/
public class MessageIdCompareToTest {

@Test
public void testEqual() {
MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567);

BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(234L, 345L, 456, 567);
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(234L, 345L, 456, 567);

assertTrue(messageIdImpl1.compareTo(messageIdImpl2) == 0, "Expected to be equal");
assertTrue(batchMessageId1.compareTo(batchMessageId2) == 0, "Expected to be equal");
}

@Test
public void testGreaterThan() {
MessageIdImpl messageIdImpl1 = new MessageIdImpl(124L, 345L, 567);
MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl3 = new MessageIdImpl(123L, 344L, 567);
MessageIdImpl messageIdImpl4 = new MessageIdImpl(123L, 344L, 566);

BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(235L, 345L, 456, 567);
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(234L, 346L, 456, 567);
BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(234L, 345L, 456, 568);
BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(234L, 345L, 457, 567);
BatchMessageIdImpl batchMessageId5 = new BatchMessageIdImpl(234L, 345L, 456, 567);

assertTrue(messageIdImpl1.compareTo(messageIdImpl2) > 0, "Expected to be greater than");
assertTrue(messageIdImpl1.compareTo(messageIdImpl3) > 0, "Expected to be greater than");
assertTrue(messageIdImpl1.compareTo(messageIdImpl4) > 0, "Expected to be greater than");
assertTrue(messageIdImpl2.compareTo(messageIdImpl3) > 0, "Expected to be greater than");
assertTrue(messageIdImpl2.compareTo(messageIdImpl4) > 0, "Expected to be greater than");
assertTrue(messageIdImpl3.compareTo(messageIdImpl4) > 0, "Expected to be greater than");

assertTrue(batchMessageId1.compareTo(batchMessageId2) > 0, "Expected to be greater than");
assertTrue(batchMessageId1.compareTo(batchMessageId3) > 0, "Expected to be greater than");
assertTrue(batchMessageId1.compareTo(batchMessageId4) > 0, "Expected to be greater than");
assertTrue(batchMessageId1.compareTo(batchMessageId5) > 0, "Expected to be greater than");
assertTrue(batchMessageId2.compareTo(batchMessageId3) > 0, "Expected to be greater than");
assertTrue(batchMessageId2.compareTo(batchMessageId4) > 0, "Expected to be greater than");
assertTrue(batchMessageId2.compareTo(batchMessageId5) > 0, "Expected to be greater than");
assertTrue(batchMessageId3.compareTo(batchMessageId4) > 0, "Expected to be greater than");
assertTrue(batchMessageId3.compareTo(batchMessageId5) > 0, "Expected to be greater than");
assertTrue(batchMessageId4.compareTo(batchMessageId5) > 0, "Expected to be greater than");
}

@Test
public void testLessThan() {
MessageIdImpl messageIdImpl1 = new MessageIdImpl(124L, 345L, 567);
MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl3 = new MessageIdImpl(123L, 344L, 567);
MessageIdImpl messageIdImpl4 = new MessageIdImpl(123L, 344L, 566);

BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(235L, 345L, 456, 567);
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(234L, 346L, 456, 567);
BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(234L, 345L, 456, 568);
BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(234L, 345L, 457, 567);
BatchMessageIdImpl batchMessageId5 = new BatchMessageIdImpl(234L, 345L, 456, 567);

assertTrue(messageIdImpl2.compareTo(messageIdImpl1) < 0, "Expected to be less than");
assertTrue(messageIdImpl3.compareTo(messageIdImpl1) < 0, "Expected to be less than");
assertTrue(messageIdImpl4.compareTo(messageIdImpl1) < 0, "Expected to be less than");
assertTrue(messageIdImpl3.compareTo(messageIdImpl2) < 0, "Expected to be less than");
assertTrue(messageIdImpl4.compareTo(messageIdImpl2) < 0, "Expected to be less than");
assertTrue(messageIdImpl4.compareTo(messageIdImpl3) < 0, "Expected to be less than");

assertTrue(batchMessageId2.compareTo(batchMessageId1) < 0, "Expected to be less than");
assertTrue(batchMessageId3.compareTo(batchMessageId1) < 0, "Expected to be less than");
assertTrue(batchMessageId4.compareTo(batchMessageId1) < 0, "Expected to be less than");
assertTrue(batchMessageId5.compareTo(batchMessageId1) < 0, "Expected to be less than");
assertTrue(batchMessageId3.compareTo(batchMessageId2) < 0, "Expected to be less than");
assertTrue(batchMessageId4.compareTo(batchMessageId2) < 0, "Expected to be less than");
assertTrue(batchMessageId5.compareTo(batchMessageId2) < 0, "Expected to be less than");
assertTrue(batchMessageId4.compareTo(batchMessageId3) < 0, "Expected to be less than");
assertTrue(batchMessageId5.compareTo(batchMessageId3) < 0, "Expected to be less than");
assertTrue(batchMessageId5.compareTo(batchMessageId4) < 0, "Expected to be less than");
}

@Test
public void testCompareDifferentType() {
// Expected throw IllegalArgumentException
MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(123L, 345L, 567, 789);

assertTrue( messageIdImpl.compareTo(batchMessageId) == 0, "Expected to be equal");

try {
batchMessageId.compareTo(messageIdImpl);
fail("Should throw IllegalArgumentException when compare different type");
} catch (IllegalArgumentException e) {
// expected
}
}

}

0 comments on commit dd1f64c

Please sign in to comment.