Skip to content

Commit

Permalink
Prevent message-replay of already acknowledged messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Sep 24, 2016
1 parent 2d75e88 commit 2739ce7
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 26 deletions.
Expand Up @@ -369,8 +369,10 @@ public List<Entry> replayEntries(Set<? extends Position> positions)
* callback object returning the list of entries
* @param ctx
* opaque context
* @return skipped positions
* set of positions which are already deleted/acknowledged and skipped while replaying them
*/
public void asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx);
public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx);

/**
* Close the cursor and releases the associated resources.
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
Expand Down Expand Up @@ -66,6 +67,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -802,15 +804,23 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void asyncReplayEntries(final Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
public Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
if (positions.isEmpty()) {
callback.readEntriesComplete(entries, ctx);
}

// filters out messages which are already acknowledged
Set<Position> alreadyAcknowledgedPositions = Sets.newHashSet();
positions.stream().filter(position -> {
return individualDeletedMessages.contains((PositionImpl) position)
|| ((PositionImpl) position).compareTo(markDeletePosition) < 0;
}).forEach(pos -> alreadyAcknowledgedPositions.add(pos));

final int totalValidPositions = positions.size() - alreadyAcknowledgedPositions.size();
final AtomicReference<ManagedLedgerException> exception = new AtomicReference<>();
ReadEntryCallback cb = new ReadEntryCallback() {
int pendingCallbacks = positions.size();
int pendingCallbacks = totalValidPositions;

@Override
public synchronized void readEntryComplete(Entry entry, Object ctx) {
Expand Down Expand Up @@ -842,25 +852,11 @@ public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx)
}
};

positions.forEach(p -> {
PositionImpl position = (PositionImpl) p;
if (position.compareTo(markDeletePosition) > 0 && hasMoreEntries(position)) {
ledger.asyncReadEntry(position, cb, ctx);
} else {
String msg = String.format("[%s][%s] No entries to read from position %s for replay.", ledger.getName(),
name, position);
lock.readLock().lock();
try {
log.warn(
"[{}][{}] No entries to read from position {} for replay. - md: {} - rd: {} - last-position: {} - individually-deleted: {}",
ledger.getName(), name, position, markDeletePosition, readPosition,
ledger.lastConfirmedEntry, individualDeletedMessages);
} finally {
lock.readLock().unlock();
}
cb.readEntryFailed(new ManagedLedgerException.InvalidReplayPositionException(msg), ctx);
}
positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position)).forEach(p -> {
ledger.asyncReadEntry((PositionImpl) p, cb, ctx);
});

return alreadyAcknowledgedPositions;
}

private long getNumberOfEntries(Range<PositionImpl> range) {
Expand Down
Expand Up @@ -38,6 +38,7 @@

import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

@Test
public class ManagedCursorContainerTest {
Expand Down Expand Up @@ -184,7 +185,8 @@ public List<Entry> replayEntries(Set<? extends Position> positions)
}

@Override
public void asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
return Sets.newConcurrentHashSet();
}

@Override
Expand Down
Expand Up @@ -2072,10 +2072,10 @@ void testReplayEntries() throws Exception {
c1.markDelete(p2);

try {
c1.replayEntries(positions);
fail("Should fail");
// as mark-delete is at position: p2 it should read entry : p3
assertEquals(1, c1.replayEntries(positions).size());
} catch (ManagedLedgerException e) {
// ok
fail("Should have not failed");
}
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -165,7 +166,10 @@ private void readMoreEntries() {
}

havePendingReplayRead = true;
cursor.asyncReplayEntries(messagesToReplayNow, this, ReadType.Replay);
Set<? extends Position> deletedMessages = cursor.asyncReplayEntries(messagesToReplayNow, this,
ReadType.Replay);
// clear already acked positions from replay bucket
messagesToReplay.removeAll(deletedMessages);
} else if (!havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -38,12 +39,15 @@
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.stats.Metrics;
Expand Down Expand Up @@ -1069,4 +1073,98 @@ public void testPayloadCorruptionDetection() throws Exception {
assertEquals(new String(msg.getData()), "message-1");
}
}

/**
* Verify: Broker should not replay already acknowledged messages again and should clear them from messageReplay bucket
*
* 1. produce messages
* 2. consume messages and ack all except 1 msg
* 3. add already acked messages to PersistentDispatcherMultipleConsumers.messagesToReplay bucket
* 4. replay messages present into messagesToReplay bucket
* 5. Verification
* a. should replay only 1 unacked message
* b. messagesToReplay should not have previously added acked messages
*
*
*
* @throws Exception
*/
@Test
public void testMessageReplay() throws Exception {

final String topicName = "persistent://prop/use/ns-abc/topic2";
final String subName = "sub2";

Message msg;
int totalMessages = 10;
int replayIndex = totalMessages / 2;

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);

Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Producer producer = pulsarClient.createProducer(topicName);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getPersistentSubscription(subName);
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef
.getDispatcher();
Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay");
replayMap.setAccessible(true);
TreeSet<PositionImpl> messagesToReplay = Sets.newTreeSet();

assertNotNull(subRef);

// (1) Produce messages
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

//(2) Consume and ack messages except first message
Message unAckedMsg = null;
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive();
if (i == 0) {
unAckedMsg = msg;
} else {
consumer.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
if (i < replayIndex) {
// (3) accumulate acked messages for replay
messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId()));
}
}
}

/**
* 1. Now: ManagedCursorImpl.individualDeletedMessages => (L:0,L9]
* 2. dispatcher.redeliverUnacknowledgedMessages replays => unackMsg (L:0) + position presents into messagesToReplay
* 3. ManagedCursorImpl.asyncReplayEntries will skip replays for already acked messages and those will be deleted
* from messagesToReplay
* 4. Verify:
* A. dispatcher redelivers msg [L:0] and deletes [L:0] from messagesToReplay bucket
* B. messagesToReplay must be empty
*/
assertEquals(replayIndex - 1, messagesToReplay.size());
replayMap.set(dispatcher, messagesToReplay);
//(4) redeliver : replay-msg + unAcked-msg
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));

// Verify: [4.A] msg [L:0] must be redelivered
try {
msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(msg.getData(), unAckedMsg.getData());
} catch (Exception e) {
fail("msg should be redelivered ", e);
}

// Verify: [4.B] messagesToReplay must be empty
assertEquals(messagesToReplay.size(), 0);

consumer.close();
producer.close();
}

}

0 comments on commit 2739ce7

Please sign in to comment.