Skip to content

Commit

Permalink
Support per message unacknowledged redelivery on RedeliverUnAcknowled…
Browse files Browse the repository at this point in the history
…gedMessages (#70)
  • Loading branch information
merlimat committed Oct 17, 2016
1 parent f946a6d commit e274a24
Show file tree
Hide file tree
Showing 16 changed files with 754 additions and 14 deletions.
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -455,4 +456,13 @@ public void redeliverUnacknowledgedMessages() {
}

}

public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
List<PositionImpl> pendingPositions = messageIds.stream()
.map(messageIdData -> PositionImpl.get(messageIdData.getLedgerId(), messageIdData.getEntryId()))
.filter(position -> pendingAcks.remove(position) != null)
.collect(Collectors.toList());

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
}
}
Expand Up @@ -15,10 +15,13 @@
*/
package com.yahoo.pulsar.broker.service;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import com.yahoo.pulsar.common.api.proto.PulsarApi;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.utils.CopyOnWriteArrayList;
import org.apache.bookkeeper.mledger.impl.PositionImpl;

public interface Dispatcher {
void addConsumer(Consumer consumer) throws BrokerServiceException;
Expand All @@ -43,4 +46,6 @@ public interface Dispatcher {
SubType getType();

void redeliverUnacknowledgedMessages(Consumer consumer);

void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);
}
Expand Up @@ -455,7 +455,12 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumerFuture.getNow(null).redeliverUnacknowledgedMessages();
Consumer consumer = consumerFuture.getNow(null);
if (redeliver.getMessageIdsCount() > 0 && consumer.subType() == SubType.Shared) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
consumer.redeliverUnacknowledgedMessages();
}
}
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import com.yahoo.pulsar.common.api.proto.PulsarApi;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;

Expand Down Expand Up @@ -59,4 +60,6 @@ public interface Subscription {
void expireMessages(int messageTTLInSeconds);

void redeliverUnacknowledgedMessages(Consumer consumer);

void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);
}
Expand Up @@ -22,13 +22,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.yahoo.pulsar.common.api.proto.PulsarApi;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -403,5 +405,14 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
readMoreEntries();
}

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
messagesToReplay.addAll(positions);
if (log.isDebugEnabled()) {
log.debug("[{}] Redelivering unacknowledged messages for consumer ", consumer);
}
readMoreEntries();
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
Expand Up @@ -30,6 +30,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -262,6 +263,12 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {

}

@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
// We cannot redeliver single messages to single consumers to preserve ordering.
redeliverUnacknowledgedMessages(consumer);
}

private void readMoreEntries(Consumer consumer) {
int availablePermits = consumer.getAvailablePermits();

Expand Down
Expand Up @@ -15,10 +15,12 @@
*/
package com.yahoo.pulsar.broker.service.persistent;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import com.yahoo.pulsar.common.api.proto.PulsarApi;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand Down Expand Up @@ -586,5 +588,10 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
dispatcher.redeliverUnacknowledgedMessages(consumer);
}

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}

0 comments on commit e274a24

Please sign in to comment.