Skip to content

Commit

Permalink
Reduce Dispatcher-totalPermits by number of messages delivered in bat…
Browse files Browse the repository at this point in the history
…ch (#67)
  • Loading branch information
rdhabalia authored and merlimat committed Oct 17, 2016
1 parent bd49d69 commit 3e4e127
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 16 deletions.
Expand Up @@ -27,6 +27,8 @@
import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.Rate; import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -115,19 +117,22 @@ public String consumerName() {
* *
* @return a promise that can be use to track when all the data has been written into the socket * @return a promise that can be use to track when all the data has been written into the socket
*/ */
public ChannelPromise sendMessages(final List<Entry> entries) { public Pair<ChannelPromise, Integer> sendMessages(final List<Entry> entries) {
final ChannelHandlerContext ctx = cnx.ctx(); final ChannelHandlerContext ctx = cnx.ctx();
final MutablePair<ChannelPromise, Integer> sentMessages = new MutablePair<ChannelPromise, Integer>();
final ChannelPromise writePromise = ctx.newPromise(); final ChannelPromise writePromise = ctx.newPromise();
sentMessages.setLeft(writePromise);
if (entries.isEmpty()) { if (entries.isEmpty()) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] List of messages is empty, triggering write future immediately for consumerId {}", log.debug("[{}] List of messages is empty, triggering write future immediately for consumerId {}",
subscription, consumerId); subscription, consumerId);
} }
writePromise.setSuccess(); writePromise.setSuccess();
return writePromise; sentMessages.setRight(0);
return sentMessages;
} }


updatePermitsAndPendingAcks(entries); sentMessages.setRight(updatePermitsAndPendingAcks(entries));


ctx.channel().eventLoop().execute(() -> { ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) { for (int i = 0; i < entries.size(); i++) {
Expand Down Expand Up @@ -165,7 +170,7 @@ public ChannelPromise sendMessages(final List<Entry> entries) {
ctx.flush(); ctx.flush();
}); });


return writePromise; return sentMessages;
} }


private void incrementUnackedMessages(int ackedMessages) { private void incrementUnackedMessages(int ackedMessages) {
Expand All @@ -192,7 +197,7 @@ int getBatchSizeforEntry(ByteBuf metadataAndPayload) {
return -1; return -1;
} }


void updatePermitsAndPendingAcks(final List<Entry> entries) { int updatePermitsAndPendingAcks(final List<Entry> entries) {
int permitsToReduce = 0; int permitsToReduce = 0;
Iterator<Entry> iter = entries.iterator(); Iterator<Entry> iter = entries.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
Expand Down Expand Up @@ -221,6 +226,7 @@ void updatePermitsAndPendingAcks(final List<Entry> entries) {
log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits); log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits);
} }
} }
return permitsToReduce;
} }


public boolean isWritable() { public boolean isWritable() {
Expand Down
Expand Up @@ -149,7 +149,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM
} }


private void readMoreEntries() { private void readMoreEntries() {
if (totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) { if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits, readBatchSize); int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);


if (!messagesToReplay.isEmpty()) { if (!messagesToReplay.isEmpty()) {
Expand Down Expand Up @@ -258,7 +258,7 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size()); log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
} }


while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) { while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
Consumer c = getNextConsumer(); Consumer c = getNextConsumer();
if (c == null) { if (c == null) {
// Do nothing, cursor will be rewind at reconnection // Do nothing, cursor will be rewind at reconnection
Expand All @@ -271,7 +271,7 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()), MaxRoundRobinBatchSize); int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()), MaxRoundRobinBatchSize);


if (messagesForC > 0) { if (messagesForC > 0) {
c.sendMessages(entries.subList(start, start + messagesForC)); int msgSent = c.sendMessages(entries.subList(start, start + messagesForC)).getRight();


if (readType == ReadType.Replay) { if (readType == ReadType.Replay) {
entries.subList(start, start + messagesForC).forEach(entry -> { entries.subList(start, start + messagesForC).forEach(entry -> {
Expand All @@ -280,7 +280,7 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
} }
start += messagesForC; start += messagesForC;
entriesToDispatch -= messagesForC; entriesToDispatch -= messagesForC;
totalAvailablePermits -= messagesForC; totalAvailablePermits -= msgSent;
} }
} }


Expand Down Expand Up @@ -357,7 +357,7 @@ private Consumer getNextConsumer() {
// find next available unblocked consumer // find next available unblocked consumer
int unblockedConsumerIndex = consumerIndex; int unblockedConsumerIndex = consumerIndex;
do { do {
if (!consumerList.get(unblockedConsumerIndex).isBlocked()) { if (isConsumerAvailable(consumerList.get(unblockedConsumerIndex))) {
consumerIndex = unblockedConsumerIndex; consumerIndex = unblockedConsumerIndex;
return consumerList.get(consumerIndex++); return consumerList.get(consumerIndex++);
} }
Expand All @@ -371,23 +371,26 @@ private Consumer getNextConsumer() {
} }


/** /**
* returns true only if {@link consumerList} has atleast one unblocked consumer * returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits
* *
* @return * @return
*/ */
private boolean isUnblockedConsumerAvailable() { private boolean isAtleastOneConsumerAvailable() {
if (consumerList.isEmpty() || closeFuture != null) { if (consumerList.isEmpty() || closeFuture != null) {
// abort read if no consumers are connected or if disconnect is initiated // abort read if no consumers are connected or if disconnect is initiated
return false; return false;
} }
Iterator<Consumer> consumerIterator = consumerList.iterator(); for(Consumer consumer : consumerList) {
while (consumerIterator.hasNext()) { if (isConsumerAvailable(consumer)) {
if (!consumerIterator.next().isBlocked()) {
return true; return true;
} }
} }
return false; return false;
} }

private boolean isConsumerAvailable(Consumer consumer) {
return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
}


@Override @Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
Expand Down
Expand Up @@ -199,7 +199,7 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
readMoreEntries(currentConsumer); readMoreEntries(currentConsumer);
} }
} else { } else {
currentConsumer.sendMessages(entries).addListener(future -> { currentConsumer.sendMessages(entries).getLeft().addListener(future -> {
if (future.isSuccess()) { if (future.isSuccess()) {
// Schedule a new read batch operation only after the previous batch has been written to the socket // Schedule a new read batch operation only after the previous batch has been written to the socket
synchronized (PersistentDispatcherSingleActiveConsumer.this) { synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Expand Down

0 comments on commit 3e4e127

Please sign in to comment.