diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index a83d15ed2926b..cc538ff90d384 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -369,8 +369,10 @@ public List replayEntries(Set positions) * callback object returning the list of entries * @param ctx * opaque context + * @return skipped positions + * set of positions which are already deleted/acknowledged and skipped while replaying them */ - public void asyncReplayEntries(Set positions, ReadEntriesCallback callback, Object ctx); + public Set asyncReplayEntries(Set positions, ReadEntriesCallback callback, Object ctx); /** * Close the cursor and releases the associated resources. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8c48c8e76fb23..7ab27e95ee962 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; @@ -66,6 +67,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; +import com.google.common.collect.Sets; import com.google.common.collect.TreeRangeSet; import com.google.common.util.concurrent.RateLimiter; import com.google.protobuf.InvalidProtocolBufferException; @@ -802,15 +804,23 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } @Override - public void asyncReplayEntries(final Set positions, ReadEntriesCallback callback, Object ctx) { + public Set asyncReplayEntries(final Set positions, ReadEntriesCallback callback, Object ctx) { List entries = Lists.newArrayListWithExpectedSize(positions.size()); if (positions.isEmpty()) { callback.readEntriesComplete(entries, ctx); } + // filters out messages which are already acknowledged + Set alreadyAcknowledgedPositions = Sets.newHashSet(); + positions.stream().filter(position -> { + return individualDeletedMessages.contains((PositionImpl) position) + || ((PositionImpl) position).compareTo(markDeletePosition) < 0; + }).forEach(pos -> alreadyAcknowledgedPositions.add(pos)); + + final int totalValidPositions = positions.size() - alreadyAcknowledgedPositions.size(); final AtomicReference exception = new AtomicReference<>(); ReadEntryCallback cb = new ReadEntryCallback() { - int pendingCallbacks = positions.size(); + int pendingCallbacks = totalValidPositions; @Override public synchronized void readEntryComplete(Entry entry, Object ctx) { @@ -842,25 +852,11 @@ public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx) } }; - positions.forEach(p -> { - PositionImpl position = (PositionImpl) p; - if (position.compareTo(markDeletePosition) > 0 && hasMoreEntries(position)) { - ledger.asyncReadEntry(position, cb, ctx); - } else { - String msg = String.format("[%s][%s] No entries to read from position %s for replay.", ledger.getName(), - name, position); - lock.readLock().lock(); - try { - log.warn( - "[{}][{}] No entries to read from position {} for replay. - md: {} - rd: {} - last-position: {} - individually-deleted: {}", - ledger.getName(), name, position, markDeletePosition, readPosition, - ledger.lastConfirmedEntry, individualDeletedMessages); - } finally { - lock.readLock().unlock(); - } - cb.readEntryFailed(new ManagedLedgerException.InvalidReplayPositionException(msg), ctx); - } + positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position)).forEach(p -> { + ledger.asyncReadEntry((PositionImpl) p, cb, ctx); }); + + return alreadyAcknowledgedPositions; } private long getNumberOfEntries(Range range) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index d4718c1c6311b..a36e65e26d3ad 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -38,6 +38,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; @Test public class ManagedCursorContainerTest { @@ -184,7 +185,8 @@ public List replayEntries(Set positions) } @Override - public void asyncReplayEntries(Set positions, ReadEntriesCallback callback, Object ctx) { + public Set asyncReplayEntries(Set positions, ReadEntriesCallback callback, Object ctx) { + return Sets.newConcurrentHashSet(); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 9e8b9d36872cf..b3d90c8599e6f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -2072,10 +2072,10 @@ void testReplayEntries() throws Exception { c1.markDelete(p2); try { - c1.replayEntries(positions); - fail("Should fail"); + // as mark-delete is at position: p2 it should read entry : p3 + assertEquals(1, c1.replayEntries(positions).size()); } catch (ManagedLedgerException e) { - // ok + fail("Should have not failed"); } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index cc9ba0cdfa0d2..ab5385055c8da 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -165,7 +166,10 @@ private void readMoreEntries() { } havePendingReplayRead = true; - cursor.asyncReplayEntries(messagesToReplayNow, this, ReadType.Replay); + Set deletedMessages = cursor.asyncReplayEntries(messagesToReplayNow, this, + ReadType.Replay); + // clear already acked positions from replay bucket + messagesToReplay.removeAll(deletedMessages); } else if (!havePendingRead) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead, diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java index b504ccc277f25..a3a904ff691f0 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java @@ -25,6 +25,7 @@ import java.lang.reflect.Field; import java.util.List; import java.util.Map; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -38,12 +39,15 @@ import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.google.common.collect.Sets; +import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription; import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; import com.yahoo.pulsar.broker.stats.Metrics; @@ -1069,4 +1073,98 @@ public void testPayloadCorruptionDetection() throws Exception { assertEquals(new String(msg.getData()), "message-1"); } } + + /** + * Verify: Broker should not replay already acknowledged messages again and should clear them from messageReplay bucket + * + * 1. produce messages + * 2. consume messages and ack all except 1 msg + * 3. add already acked messages to PersistentDispatcherMultipleConsumers.messagesToReplay bucket + * 4. replay messages present into messagesToReplay bucket + * 5. Verification + * a. should replay only 1 unacked message + * b. messagesToReplay should not have previously added acked messages + * + * + * + * @throws Exception + */ + @Test + public void testMessageReplay() throws Exception { + + final String topicName = "persistent://prop/use/ns-abc/topic2"; + final String subName = "sub2"; + + Message msg; + int totalMessages = 10; + int replayIndex = totalMessages / 2; + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(SubscriptionType.Shared); + + Consumer consumer = pulsarClient.subscribe(topicName, subName, conf); + Producer producer = pulsarClient.createProducer(topicName); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + assertNotNull(topicRef); + PersistentSubscription subRef = topicRef.getPersistentSubscription(subName); + PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef + .getDispatcher(); + Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay"); + replayMap.setAccessible(true); + TreeSet messagesToReplay = Sets.newTreeSet(); + + assertNotNull(subRef); + + // (1) Produce messages + for (int i = 0; i < totalMessages; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + //(2) Consume and ack messages except first message + Message unAckedMsg = null; + for (int i = 0; i < totalMessages; i++) { + msg = consumer.receive(); + if (i == 0) { + unAckedMsg = msg; + } else { + consumer.acknowledge(msg); + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + if (i < replayIndex) { + // (3) accumulate acked messages for replay + messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId())); + } + } + } + + /** + * 1. Now: ManagedCursorImpl.individualDeletedMessages => (L:0,L9] + * 2. dispatcher.redeliverUnacknowledgedMessages replays => unackMsg (L:0) + position presents into messagesToReplay + * 3. ManagedCursorImpl.asyncReplayEntries will skip replays for already acked messages and those will be deleted + * from messagesToReplay + * 4. Verify: + * A. dispatcher redelivers msg [L:0] and deletes [L:0] from messagesToReplay bucket + * B. messagesToReplay must be empty + */ + assertEquals(replayIndex - 1, messagesToReplay.size()); + replayMap.set(dispatcher, messagesToReplay); + //(4) redeliver : replay-msg + unAcked-msg + dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0)); + + // Verify: [4.A] msg [L:0] must be redelivered + try { + msg = consumer.receive(1, TimeUnit.SECONDS); + assertEquals(msg.getData(), unAckedMsg.getData()); + } catch (Exception e) { + fail("msg should be redelivered ", e); + } + + // Verify: [4.B] messagesToReplay must be empty + assertEquals(messagesToReplay.size(), 0); + + consumer.close(); + producer.close(); + } + }