Skip to content

Commit

Permalink
Fix returned wrong hash ranges for the consumer with same consumer na…
Browse files Browse the repository at this point in the history
…me (#12212)

Currently, we are using the consumer name to generate the hash ranges to the admin client.
If there are consumers with the same name, we will get same hash ranges for different consumers,
this will confuse when troubleshooting issue. The following is an example:

```
"consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 46320,
        "msgOutCounter" : 1020,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "5253f",
        "availablePermits" : -20,
        "unackedMessages" : 1000,
        "avgMessagesPerEntry" : 56,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "10:11494",
        "lastAckedTimestamp" : 1632731049993,
        "lastConsumedTimestamp" : 1632731030268,
        "keyHashRanges" : [ "[0, 16384]" ],
        "metadata" : { },
        "address" : "/127.0.0.1:54702",
        "connectedSince" : "2021-09-27T16:23:49.891+08:00",
        "clientVersion" : "2.8.1"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "my-name",
        "availablePermits" : 10,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 1000,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "10:19505",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "keyHashRanges" : [ "[16385, 40960]", "[40961, 65536]" ],
        "metadata" : { },
        "address" : "/127.0.0.1:54708",
        "connectedSince" : "2021-09-27T16:23:59.031+08:00",
        "clientVersion" : "2.8.1"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "my-name",
        "availablePermits" : 10,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 1000,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "10:19514",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "keyHashRanges" : [ "[16385, 40960]", "[40961, 65536]" ],
        "metadata" : { },
        "address" : "/127.0.0.1:54717",
        "connectedSince" : "2021-09-27T16:24:03.927+08:00",
        "clientVersion" : "2.8.1"
      } ],
```

The fix is to use the equals method of the consumer to generate the key hash ranges.
New tests added.
  • Loading branch information
codelipenghui committed Sep 28, 2021
1 parent 1497ddf commit 9abd6d3
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.Murmur3_32Hash;

/**
Expand Down Expand Up @@ -126,15 +127,15 @@ public Consumer select(int hash) {
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new LinkedHashMap<>();
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new LinkedHashMap<>();
rwLock.readLock().lock();
try {
int start = 0;
for (Map.Entry<Integer, List<Consumer>> entry: hashRing.entrySet()) {
for (Consumer consumer: entry.getValue()) {
result.computeIfAbsent(consumer.consumerName(), key -> new ArrayList<>())
.add("[" + start + ", " + entry.getKey() + "]");
result.computeIfAbsent(consumer, key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
}
start = entry.getKey() + 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;

/**
* This is a consumer selector based fixed hash range.
Expand Down Expand Up @@ -112,12 +113,12 @@ public Consumer select(int hash) {
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new HashMap<>();
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new HashMap<>();
int start = 0;
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>())
.add("[" + start + ", " + entry.getKey() + "]");
result.computeIfAbsent(entry.getValue(), key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
start = entry.getKey() + 1;
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.api.proto.KeySharedMeta;

Expand Down Expand Up @@ -65,16 +66,16 @@ public void removeConsumer(Consumer consumer) {
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new HashMap<>();
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new HashMap<>();
Map.Entry<Integer, Consumer> prev = null;
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
if (prev == null) {
prev = entry;
} else {
if (prev.getValue().equals(entry.getValue())) {
result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>())
.add("[" + prev.getKey() + ", " + entry.getKey() + "]");
result.computeIfAbsent(entry.getValue(), key -> new ArrayList<>())
.add(Range.of(prev.getKey(), entry.getKey()));
}
prev = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.Murmur3_32Hash;

public interface StickyKeyConsumerSelector {
Expand Down Expand Up @@ -66,5 +67,5 @@ static int makeStickyKeyHash(byte[] stickyKey) {
* Get key hash ranges handled by each consumer.
* @return A map where key is a consumer name and value is list of hash range it receiving message for.
*/
Map<String, List<String>> getConsumerKeyHashRanges();
Map<Consumer, List<Range>> getConsumerKeyHashRanges();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
Expand Down Expand Up @@ -418,7 +419,7 @@ public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
}

public Map<String, List<String>> getConsumerKeyHashRanges() {
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
return selector.getConsumerKeyHashRanges();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -1019,7 +1021,7 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared
Map<Consumer, List<Range>> consumerKeyHashRanges = getType() == SubType.Key_Shared
? ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getConsumerKeyHashRanges() : null;
dispatcher.getConsumers().forEach(consumer -> {
ConsumerStatsImpl consumerStats = consumer.getStats();
Expand All @@ -1034,8 +1036,10 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.lastConsumedTimestamp =
Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer.consumerName())) {
consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer.consumerName());
if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer)) {
consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer).stream()
.map(Range::toString)
.collect(Collectors.toList());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import com.google.common.collect.ImmutableSet;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -146,22 +148,31 @@ public void testConsumerSelect() throws ConsumerAssignException {
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3");
List<Consumer> consumers = new ArrayList<>();
for (String s : consumerName) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(s);
selector.addConsumer(consumer);
consumers.add(consumer);
}

Map<String, Set<String>> expectedResult = new HashMap<>();
expectedResult.put("consumer1", ImmutableSet.of("[0, 330121749]", "[330121750, 618146114]", "[1797637922, 1976098885]"));
expectedResult.put("consumer2", ImmutableSet.of("[938427576, 1094135919]", "[1138613629, 1342907082]", "[1342907083, 1797637921]"));
expectedResult.put("consumer3", ImmutableSet.of("[618146115, 772640562]", "[772640563, 938427575]", "[1094135920, 1138613628]"));
for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Assert.assertEquals(new HashSet<>(entry.getValue()), expectedResult.get(entry.getKey()));
Map<Consumer, List<Range>> expectedResult = new HashMap<>();
expectedResult.put(consumers.get(0), Arrays.asList(
Range.of(0, 330121749),
Range.of(330121750, 618146114),
Range.of(1797637922, 1976098885)));
expectedResult.put(consumers.get(1), Arrays.asList(
Range.of(938427576, 1094135919),
Range.of(1138613629, 1342907082),
Range.of(1342907083, 1797637921)));
expectedResult.put(consumers.get(2), Arrays.asList(
Range.of(618146115, 772640562),
Range.of(772640563, 938427575),
Range.of(1094135920, 1138613628)));
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
System.out.println(entry.getValue());
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.pulsar.broker.service;

import com.google.common.collect.ImmutableList;
import org.apache.pulsar.client.api.Range;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -37,22 +39,46 @@ public class HashRangeAutoSplitStickyKeyConsumerSelectorTest {
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
List<Consumer> consumers = new ArrayList<>();
for (String s : consumerName) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(s);
selector.addConsumer(consumer);
consumers.add(consumer);
}

Map<String, List<String>> expectedResult = new HashMap<>();
expectedResult.put("consumer1", ImmutableList.of("[49, 64]"));
expectedResult.put("consumer4", ImmutableList.of("[33, 48]"));
expectedResult.put("consumer2", ImmutableList.of("[17, 32]"));
expectedResult.put("consumer3", ImmutableList.of("[0, 16]"));
for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Map<Consumer, List<Range>> expectedResult = new HashMap<>();
expectedResult.put(consumers.get(0), Collections.singletonList(Range.of(49, 64)));
expectedResult.put(consumers.get(3), Collections.singletonList(Range.of(33, 48)));
expectedResult.put(consumers.get(1), Collections.singletonList(Range.of(17, 32)));
expectedResult.put(consumers.get(2), Collections.singletonList(Range.of(0, 16)));
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}

@Test
public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception {
HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
final String consumerName = "My-consumer";
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(consumerName);
selector.addConsumer(consumer);
consumers.add(consumer);
}

List<Range> prev = null;
for (Consumer consumer : consumers) {
List<Range> ranges = selector.getConsumerKeyHashRanges().get(consumer);
Assert.assertEquals(ranges.size(), 1);
if (prev != null) {
Assert.assertNotEquals(prev, ranges);
}
prev = ranges;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
Expand Down Expand Up @@ -117,6 +118,7 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12}, new int[] {15, 20});
List<Consumer> consumers = new ArrayList<>();
for (int index = 0; index < consumerName.size(); index++) {
Consumer consumer = mock(Consumer.class);
KeySharedMeta keySharedMeta = new KeySharedMeta()
Expand All @@ -128,20 +130,52 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
when(consumer.consumerName()).thenReturn(consumerName.get(index));
Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
selector.addConsumer(consumer);
consumers.add(consumer);
}

Map<String, List<String>> expectedResult = new HashMap<>();
expectedResult.put("consumer1", ImmutableList.of("[0, 2]"));
expectedResult.put("consumer2", ImmutableList.of("[3, 7]"));
expectedResult.put("consumer3", ImmutableList.of("[9, 12]"));
expectedResult.put("consumer4", ImmutableList.of("[15, 20]"));
for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Map<Consumer, List<Range>> expectedResult = new HashMap<>();
expectedResult.put(consumers.get(0), Collections.singletonList(Range.of(0, 2)));
expectedResult.put(consumers.get(1), Collections.singletonList(Range.of(3, 7)));
expectedResult.put(consumers.get(2), Collections.singletonList(Range.of(9, 12)));
expectedResult.put(consumers.get(3), Collections.singletonList(Range.of(15, 20)));
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}

@Test
public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception {
HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
final String consumerName = "My-consumer";
List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12});
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Consumer consumer = mock(Consumer.class);
KeySharedMeta keySharedMeta = new KeySharedMeta()
.setKeySharedMode(KeySharedMode.STICKY);
keySharedMeta.addHashRange()
.setStart(range.get(i)[0])
.setEnd(range.get(i)[1]);
when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta);
when(consumer.consumerName()).thenReturn(consumerName);
Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
selector.addConsumer(consumer);
consumers.add(consumer);
}

List<Range> prev = null;
for (Consumer consumer : consumers) {
List<Range> ranges = selector.getConsumerKeyHashRanges().get(consumer);
Assert.assertEquals(ranges.size(), 1);
if (prev != null) {
Assert.assertNotEquals(prev, ranges);
}
prev = ranges;
}
}

@Test
public void testSingleRangeConflict() throws BrokerServiceException.ConsumerAssignException {
HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

import java.util.Objects;

/**
* Int range.
*/
Expand Down Expand Up @@ -62,6 +64,23 @@ public Range intersect(Range range) {
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Range range = (Range) o;
return start == range.start && end == range.end;
}

@Override
public int hashCode() {
return Objects.hash(start, end);
}

@Override
public String toString() {
return "[" + start + ", " + end + "]";
Expand Down

0 comments on commit 9abd6d3

Please sign in to comment.