Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent message-replay of already acknowledged messages #38

Merged
merged 1 commit into from Sep 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -369,8 +369,10 @@ public List<Entry> replayEntries(Set<? extends Position> positions)
* callback object returning the list of entries
* @param ctx
* opaque context
* @return skipped positions
* set of positions which are already deleted/acknowledged and skipped while replaying them
*/
public void asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx);
public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx);

/**
* Close the cursor and releases the associated resources.
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
Expand Down Expand Up @@ -66,6 +67,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -802,15 +804,23 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void asyncReplayEntries(final Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
public Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
if (positions.isEmpty()) {
callback.readEntriesComplete(entries, ctx);
}

// filters out messages which are already acknowledged
Set<Position> alreadyAcknowledgedPositions = Sets.newHashSet();
positions.stream().filter(position -> {
return individualDeletedMessages.contains((PositionImpl) position)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any access to the individualDeletedMessages should be from within a r/w lock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. i created PR to fix it.

|| ((PositionImpl) position).compareTo(markDeletePosition) < 0;
}).forEach(pos -> alreadyAcknowledgedPositions.add(pos));

final int totalValidPositions = positions.size() - alreadyAcknowledgedPositions.size();
final AtomicReference<ManagedLedgerException> exception = new AtomicReference<>();
ReadEntryCallback cb = new ReadEntryCallback() {
int pendingCallbacks = positions.size();
int pendingCallbacks = totalValidPositions;

@Override
public synchronized void readEntryComplete(Entry entry, Object ctx) {
Expand Down Expand Up @@ -842,25 +852,11 @@ public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx)
}
};

positions.forEach(p -> {
PositionImpl position = (PositionImpl) p;
if (position.compareTo(markDeletePosition) > 0 && hasMoreEntries(position)) {
ledger.asyncReadEntry(position, cb, ctx);
} else {
String msg = String.format("[%s][%s] No entries to read from position %s for replay.", ledger.getName(),
name, position);
lock.readLock().lock();
try {
log.warn(
"[{}][{}] No entries to read from position {} for replay. - md: {} - rd: {} - last-position: {} - individually-deleted: {}",
ledger.getName(), name, position, markDeletePosition, readPosition,
ledger.lastConfirmedEntry, individualDeletedMessages);
} finally {
lock.readLock().unlock();
}
cb.readEntryFailed(new ManagedLedgerException.InvalidReplayPositionException(msg), ctx);
}
positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position)).forEach(p -> {
ledger.asyncReadEntry((PositionImpl) p, cb, ctx);
});

return alreadyAcknowledgedPositions;
}

private long getNumberOfEntries(Range<PositionImpl> range) {
Expand Down
Expand Up @@ -38,6 +38,7 @@

import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

@Test
public class ManagedCursorContainerTest {
Expand Down Expand Up @@ -184,7 +185,8 @@ public List<Entry> replayEntries(Set<? extends Position> positions)
}

@Override
public void asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
return Sets.newConcurrentHashSet();
}

@Override
Expand Down
Expand Up @@ -2072,10 +2072,10 @@ void testReplayEntries() throws Exception {
c1.markDelete(p2);

try {
c1.replayEntries(positions);
fail("Should fail");
// as mark-delete is at position: p2 it should read entry : p3
assertEquals(1, c1.replayEntries(positions).size());
} catch (ManagedLedgerException e) {
// ok
fail("Should have not failed");
}
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -165,7 +166,10 @@ private void readMoreEntries() {
}

havePendingReplayRead = true;
cursor.asyncReplayEntries(messagesToReplayNow, this, ReadType.Replay);
Set<? extends Position> deletedMessages = cursor.asyncReplayEntries(messagesToReplayNow, this,
ReadType.Replay);
// clear already acked positions from replay bucket
messagesToReplay.removeAll(deletedMessages);
} else if (!havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -38,12 +39,15 @@
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.stats.Metrics;
Expand Down Expand Up @@ -1069,4 +1073,98 @@ public void testPayloadCorruptionDetection() throws Exception {
assertEquals(new String(msg.getData()), "message-1");
}
}

/**
* Verify: Broker should not replay already acknowledged messages again and should clear them from messageReplay bucket
*
* 1. produce messages
* 2. consume messages and ack all except 1 msg
* 3. add already acked messages to PersistentDispatcherMultipleConsumers.messagesToReplay bucket
* 4. replay messages present into messagesToReplay bucket
* 5. Verification
* a. should replay only 1 unacked message
* b. messagesToReplay should not have previously added acked messages
*
*
*
* @throws Exception
*/
@Test
public void testMessageReplay() throws Exception {

final String topicName = "persistent://prop/use/ns-abc/topic2";
final String subName = "sub2";

Message msg;
int totalMessages = 10;
int replayIndex = totalMessages / 2;

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);

Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Producer producer = pulsarClient.createProducer(topicName);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getPersistentSubscription(subName);
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef
.getDispatcher();
Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay");
replayMap.setAccessible(true);
TreeSet<PositionImpl> messagesToReplay = Sets.newTreeSet();

assertNotNull(subRef);

// (1) Produce messages
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

//(2) Consume and ack messages except first message
Message unAckedMsg = null;
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive();
if (i == 0) {
unAckedMsg = msg;
} else {
consumer.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
if (i < replayIndex) {
// (3) accumulate acked messages for replay
messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId()));
}
}
}

/**
* 1. Now: ManagedCursorImpl.individualDeletedMessages => (L:0,L9]
* 2. dispatcher.redeliverUnacknowledgedMessages replays => unackMsg (L:0) + position presents into messagesToReplay
* 3. ManagedCursorImpl.asyncReplayEntries will skip replays for already acked messages and those will be deleted
* from messagesToReplay
* 4. Verify:
* A. dispatcher redelivers msg [L:0] and deletes [L:0] from messagesToReplay bucket
* B. messagesToReplay must be empty
*/
assertEquals(replayIndex - 1, messagesToReplay.size());
replayMap.set(dispatcher, messagesToReplay);
//(4) redeliver : replay-msg + unAcked-msg
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));

// Verify: [4.A] msg [L:0] must be redelivered
try {
msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(msg.getData(), unAckedMsg.getData());
} catch (Exception e) {
fail("msg should be redelivered ", e);
}

// Verify: [4.B] messagesToReplay must be empty
assertEquals(messagesToReplay.size(), 0);

consumer.close();
producer.close();
}

}