Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Optimize message replay for large backlog consumer #3732

Merged
merged 1 commit into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static java.util.stream.Collectors.toSet;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;

Expand Down Expand Up @@ -53,7 +52,8 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -72,7 +72,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
private final ManagedCursor cursor;

private CompletableFuture<Void> closeFuture = null;
private ConcurrentLongPairSet messagesToReplay;
LongPairSet messagesToReplay = new ConcurrentSortedLongPairSet(128, 2);
private final RedeliveryTracker redeliveryTracker;

private boolean havePendingRead = false;
Expand Down Expand Up @@ -102,7 +102,6 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
this.cursor = cursor;
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
this.topic = topic;
this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
? new InMemoryRedeliveryTracker()
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
Expand Down Expand Up @@ -288,8 +287,8 @@ public void readMoreEntries() {
return;
}

Set<PositionImpl> messagesToReplayNow = messagesToReplay.items(messagesToRead).stream()
.map(pair -> new PositionImpl(pair.first, pair.second)).collect(toSet());
Set<PositionImpl> messagesToReplayNow = messagesToReplay.items(messagesToRead,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having the messagesToReplay to be sorted, wouldn't be easier to just make the messagesToReplayNow to be a SortedSet ?

In this case, we don't care to have messagesToReplay to be always sorted, just when there is a redelivery is when we care.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't be easier to just make the messagesToReplayNow to be a SortedSet ?

No because messagesToReplayNow is a very small sub-set (100 msgs) of messagesToReplay (> 1M msgs) and the main issue is messagesToReplay can have more than million random messages and we want to read messages from the same ledger. making ONLY messagesToReplayNow sorted set will not help because there is a possibility that 100 read messages from messagesToReplay might be random from different ledgers and it will still perform random read across multiple ledgers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's true.

(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));

if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(),
Expand Down Expand Up @@ -329,6 +328,7 @@ public void readMoreEntries() {
}
}


@Override
public boolean isConsumerConnected() {
return !consumerList.isEmpty();
Expand Down Expand Up @@ -503,6 +503,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
havePendingReplayRead = false;
if (exception instanceof ManagedLedgerException.InvalidReplayPositionException) {
PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition();

messagesToReplay.removeIf((ledgerId, entryId) -> {
return ComparisonChain.start().compare(ledgerId, markDeletePosition.getLedgerId())
.compare(entryId, markDeletePosition.getEntryId()).result() <= 0;
Expand Down Expand Up @@ -555,7 +556,8 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
messagesToReplay.add(ledgerId, entryId);
});
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, messagesToReplay);
log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer,
messagesToReplay);
}
readMoreEntries();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;

public class ConsumerRedeliveryTest extends ProducerConsumerBase {
@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

/**
* It verifies that redelivered messages are sorted based on the ledger-ids.
* <pre>
* 1. client publishes 100 messages across 50 ledgers
* 2. broker deliveres 100 messages to consumer
* 3. consumer ack every alternative message and doesn't ack 50 messsages
* 4. broker sorts replay messages based on ledger and redelivers messages ledger by ledger
* </pre>
* @throws Exception
*/
@Test
public void testOrderedRedelivery() throws Exception {
String topic = "persistent://my-property/my-ns/redelivery";

conf.setManagedLedgerMaxEntriesPerLedger(2);
conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic)
.producerName("my-producer-name");
Producer<byte[]> producer = producerBuilder.create();
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topic).subscriptionName("s1")
.subscriptionType(SubscriptionType.Shared);
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) consumerBuilder.subscribe();

final int totalMsgs = 100;

for (int i = 0; i < totalMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}


int consumedCount = 0;
Set<MessageId> messageIds = Sets.newHashSet();
for (int i = 0; i < totalMsgs; i++) {
Message<byte[]> message = consumer1.receive(5, TimeUnit.SECONDS);
if (message != null && (consumedCount % 2) == 0) {
consumer1.acknowledge(message);
} else {
messageIds.add(message.getMessageId());
}
consumedCount += 1;
}
assertEquals(totalMsgs, consumedCount);

// redeliver all unack messages
consumer1.redeliverUnacknowledgedMessages(messageIds);

MessageIdImpl lastMsgId = null;
for (int i = 0; i < totalMsgs / 2; i++) {
Message<byte[]> message = consumer1.receive(5, TimeUnit.SECONDS);
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
if (lastMsgId != null) {
assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId());
}
lastMsgId = msgId;
}

// close consumer so, this consumer's unack messages will be redelivered to new consumer
consumer1.close();

Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
lastMsgId = null;
for (int i = 0; i < totalMsgs / 2; i++) {
Message<byte[]> message = consumer2.receive(5, TimeUnit.SECONDS);
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
if (lastMsgId != null) {
assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId());
}
lastMsgId = msgId;
}

producer.close();
consumer2.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiFunction;

import org.apache.pulsar.common.util.collections.LongPairSet.LongPairFunction;

/**
* Concurrent hash set where values are composed of pairs of longs.
Expand All @@ -38,7 +41,7 @@
* <p>
* Values <strong>MUST</strong> be >= 0.
*/
public class ConcurrentLongPairSet {
public class ConcurrentLongPairSet implements LongPairSet {

private static final long EmptyItem = -1L;
private static final long DeletedItem = -2L;
Expand All @@ -54,10 +57,6 @@ public static interface ConsumerLong {
void accept(LongPair item);
}

public interface LongPairPredicate {
boolean test(long v1, long v2);
}

public static interface LongPairConsumer {
void accept(long v1, long v2);
}
Expand Down Expand Up @@ -190,11 +189,16 @@ public Set<LongPair> items() {
* @return a new list of keys with max provided numberOfItems (makes a copy)
*/
public Set<LongPair> items(int numberOfItems) {
Set<LongPair> items = new HashSet<>();
return items(numberOfItems, (item1, item2) -> new LongPair(item1, item2));
}

@Override
public <T> Set<T> items(int numberOfItems, LongPairFunction<T> longPairConverter) {
Set<T> items = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to return a set? typically the iteration methods are used to go through it inline without intermediary collection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this method is used by PersistentDispatcherMultipleConsumers to prepare the sub-set of replay-messages and then it passes this set to cursor.asyncReplayEntries to read the entries. So, this method is not used for inline execution.

for (Section s : sections) {
s.forEach((item1, item2) -> {
if (items.size() < numberOfItems) {
items.add(new LongPair(item1, item2));
items.add(longPairConverter.apply(item1, item2));
}
});
if (items.size() >= numberOfItems) {
Expand All @@ -203,7 +207,7 @@ public Set<LongPair> items(int numberOfItems) {
}
return items;
}

// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
Expand Down Expand Up @@ -552,4 +556,5 @@ public String toString() {
sb.append('}');
return sb.toString();
}

}
Loading