Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix returned wrong hash ranges for the consumer with same consumer name #12212

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.Murmur3_32Hash;

/**
Expand Down Expand Up @@ -126,15 +127,15 @@ public Consumer select(int hash) {
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new LinkedHashMap<>();
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new LinkedHashMap<>();
rwLock.readLock().lock();
try {
int start = 0;
for (Map.Entry<Integer, List<Consumer>> entry: hashRing.entrySet()) {
for (Consumer consumer: entry.getValue()) {
result.computeIfAbsent(consumer.consumerName(), key -> new ArrayList<>())
.add("[" + start + ", " + entry.getKey() + "]");
result.computeIfAbsent(consumer, key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
}
start = entry.getKey() + 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;

/**
* This is a consumer selector based fixed hash range.
Expand Down Expand Up @@ -112,12 +113,12 @@ public Consumer select(int hash) {
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new HashMap<>();
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new HashMap<>();
int start = 0;
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>())
.add("[" + start + ", " + entry.getKey() + "]");
result.computeIfAbsent(entry.getValue(), key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
start = entry.getKey() + 1;
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.api.proto.KeySharedMeta;

Expand Down Expand Up @@ -65,16 +66,16 @@ public void removeConsumer(Consumer consumer) {
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new HashMap<>();
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new HashMap<>();
Map.Entry<Integer, Consumer> prev = null;
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
if (prev == null) {
prev = entry;
} else {
if (prev.getValue().equals(entry.getValue())) {
result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>())
.add("[" + prev.getKey() + ", " + entry.getKey() + "]");
result.computeIfAbsent(entry.getValue(), key -> new ArrayList<>())
.add(Range.of(prev.getKey(), entry.getKey()));
}
prev = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.Murmur3_32Hash;

public interface StickyKeyConsumerSelector {
Expand Down Expand Up @@ -66,5 +67,5 @@ static int makeStickyKeyHash(byte[] stickyKey) {
* Get key hash ranges handled by each consumer.
* @return A map where key is a consumer name and value is list of hash range it receiving message for.
*/
Map<String, List<String>> getConsumerKeyHashRanges();
Map<Consumer, List<Range>> getConsumerKeyHashRanges();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
Expand Down Expand Up @@ -418,7 +419,7 @@ public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
}

public Map<String, List<String>> getConsumerKeyHashRanges() {
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
return selector.getConsumerKeyHashRanges();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -1019,7 +1021,7 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared
Map<Consumer, List<Range>> consumerKeyHashRanges = getType() == SubType.Key_Shared
? ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getConsumerKeyHashRanges() : null;
dispatcher.getConsumers().forEach(consumer -> {
ConsumerStatsImpl consumerStats = consumer.getStats();
Expand All @@ -1034,8 +1036,10 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.lastConsumedTimestamp =
Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer.consumerName())) {
consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer.consumerName());
if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer)) {
consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer).stream()
.map(range -> String.format("[%s, %s]", range.getStart(), range.getEnd()))
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import com.google.common.collect.ImmutableSet;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -146,22 +148,31 @@ public void testConsumerSelect() throws ConsumerAssignException {
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3");
List<Consumer> consumers = new ArrayList<>();
for (String s : consumerName) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(s);
selector.addConsumer(consumer);
consumers.add(consumer);
}

Map<String, Set<String>> expectedResult = new HashMap<>();
expectedResult.put("consumer1", ImmutableSet.of("[0, 330121749]", "[330121750, 618146114]", "[1797637922, 1976098885]"));
expectedResult.put("consumer2", ImmutableSet.of("[938427576, 1094135919]", "[1138613629, 1342907082]", "[1342907083, 1797637921]"));
expectedResult.put("consumer3", ImmutableSet.of("[618146115, 772640562]", "[772640563, 938427575]", "[1094135920, 1138613628]"));
for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Assert.assertEquals(new HashSet<>(entry.getValue()), expectedResult.get(entry.getKey()));
Map<Consumer, List<Range>> expectedResult = new HashMap<>();
expectedResult.put(consumers.get(0), Arrays.asList(
Range.of(0, 330121749),
Range.of(330121750, 618146114),
Range.of(1797637922, 1976098885)));
expectedResult.put(consumers.get(1), Arrays.asList(
Range.of(938427576, 1094135919),
Range.of(1138613629, 1342907082),
Range.of(1342907083, 1797637921)));
expectedResult.put(consumers.get(2), Arrays.asList(
Range.of(618146115, 772640562),
Range.of(772640563, 938427575),
Range.of(1094135920, 1138613628)));
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
System.out.println(entry.getValue());
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.pulsar.broker.service;

import com.google.common.collect.ImmutableList;
import org.apache.pulsar.client.api.Range;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -37,22 +39,46 @@ public class HashRangeAutoSplitStickyKeyConsumerSelectorTest {
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
List<Consumer> consumers = new ArrayList<>();
for (String s : consumerName) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(s);
selector.addConsumer(consumer);
consumers.add(consumer);
}

Map<String, List<String>> expectedResult = new HashMap<>();
expectedResult.put("consumer1", ImmutableList.of("[49, 64]"));
expectedResult.put("consumer4", ImmutableList.of("[33, 48]"));
expectedResult.put("consumer2", ImmutableList.of("[17, 32]"));
expectedResult.put("consumer3", ImmutableList.of("[0, 16]"));
for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Map<Consumer, List<Range>> expectedResult = new HashMap<>();
expectedResult.put(consumers.get(0), Collections.singletonList(Range.of(49, 64)));
expectedResult.put(consumers.get(3), Collections.singletonList(Range.of(33, 48)));
expectedResult.put(consumers.get(1), Collections.singletonList(Range.of(17, 32)));
expectedResult.put(consumers.get(2), Collections.singletonList(Range.of(0, 16)));
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}

@Test
public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception {
HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
final String consumerName = "My-consumer";
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(consumerName);
selector.addConsumer(consumer);
consumers.add(consumer);
}

List<Range> prev = null;
for (Consumer consumer : consumers) {
List<Range> ranges = selector.getConsumerKeyHashRanges().get(consumer);
Assert.assertEquals(ranges.size(), 1);
if (prev != null) {
Assert.assertNotEquals(prev, ranges);
}
prev = ranges;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
Expand Down Expand Up @@ -117,6 +118,7 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12}, new int[] {15, 20});
List<Consumer> consumers = new ArrayList<>();
for (int index = 0; index < consumerName.size(); index++) {
Consumer consumer = mock(Consumer.class);
KeySharedMeta keySharedMeta = new KeySharedMeta()
Expand All @@ -128,20 +130,52 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
when(consumer.consumerName()).thenReturn(consumerName.get(index));
Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
selector.addConsumer(consumer);
consumers.add(consumer);
}

Map<String, List<String>> expectedResult = new HashMap<>();
expectedResult.put("consumer1", ImmutableList.of("[0, 2]"));
expectedResult.put("consumer2", ImmutableList.of("[3, 7]"));
expectedResult.put("consumer3", ImmutableList.of("[9, 12]"));
expectedResult.put("consumer4", ImmutableList.of("[15, 20]"));
for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Map<Consumer, List<Range>> expectedResult = new HashMap<>();
expectedResult.put(consumers.get(0), Collections.singletonList(Range.of(0, 2)));
expectedResult.put(consumers.get(1), Collections.singletonList(Range.of(3, 7)));
expectedResult.put(consumers.get(2), Collections.singletonList(Range.of(9, 12)));
expectedResult.put(consumers.get(3), Collections.singletonList(Range.of(15, 20)));
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}

@Test
public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception {
HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
final String consumerName = "My-consumer";
List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12});
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Consumer consumer = mock(Consumer.class);
KeySharedMeta keySharedMeta = new KeySharedMeta()
.setKeySharedMode(KeySharedMode.STICKY);
keySharedMeta.addHashRange()
.setStart(range.get(i)[0])
.setEnd(range.get(i)[1]);
when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta);
when(consumer.consumerName()).thenReturn(consumerName);
Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
selector.addConsumer(consumer);
consumers.add(consumer);
}

List<Range> prev = null;
for (Consumer consumer : consumers) {
List<Range> ranges = selector.getConsumerKeyHashRanges().get(consumer);
Assert.assertEquals(ranges.size(), 1);
if (prev != null) {
Assert.assertNotEquals(prev, ranges);
}
prev = ranges;
}
}

@Test
public void testSingleRangeConflict() throws BrokerServiceException.ConsumerAssignException {
HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

import java.util.Objects;

/**
* Int range.
*/
Expand Down Expand Up @@ -62,6 +64,23 @@ public Range intersect(Range range) {
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Range range = (Range) o;
return start == range.start && end == range.end;
}

@Override
public int hashCode() {
return Objects.hash(start, end);
}

@Override
public String toString() {
return "[" + start + ", " + end + "]";
Expand Down