Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Fix broker dispatch byte rate limiter. #11135

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -47,8 +48,11 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {

protected final Subscription subscription;

protected AbstractBaseDispatcher(Subscription subscription) {
protected final ServiceConfiguration serviceConfig;

protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
this.subscription = subscription;
this.serviceConfig = serviceConfig;
}

/**
Expand Down Expand Up @@ -234,6 +238,19 @@ public void resetCloseFuture() {
// noop
}

protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg,
long bytesToRead, long availablePermitsOnByte) {
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
}

if (availablePermitsOnByte > 0) {
bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
}

return Pair.of(messagesToRead, bytesToRead);
}

protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.slf4j.Logger;
Expand All @@ -46,8 +47,8 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi

private Random random = new Random(42);

protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
super(subscription);
protected AbstractDispatcherMultipleConsumers(Subscription subscription, ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
}

public boolean isConsumerConnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -55,8 +56,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
private volatile int isClosed = FALSE;

public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
String topicName, Subscription subscription) {
super(subscription);
String topicName, Subscription subscription,
ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
this.topicName = topicName;
this.consumers = new CopyOnWriteArrayList<>();
this.partitionIndex = partitionIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -56,16 +55,14 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
@SuppressWarnings("unused")
private volatile int totalAvailablePermits = 0;

private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;

public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
super(subscription);
super(subscription, topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.name = topic.getName() + " / " + subscription.getName();
this.msgDrop = new Rate();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
Expand All @@ -40,16 +39,15 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
private final NonPersistentTopic topic;
private final Rate msgDrop;
private final Subscription subscription;
private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;

public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription);
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.msgDrop = new Rate();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ public long getAvailableDispatchRateLimitOnMsg() {
return dispatchRateLimiterOnMessage == null ? -1 : dispatchRateLimiterOnMessage.getAvailablePermits();
}

/**
* returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1.
*
* @return
*/
public long getAvailableDispatchRateLimitOnByte() {
return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits();
}

/**
* It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -106,7 +106,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"blockedDispatcherOnUnackedMsgs");
protected final ServiceConfiguration serviceConfig;
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();

protected enum ReadType {
Expand All @@ -115,8 +114,7 @@ protected enum ReadType {

public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription) {
super(subscription);
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
super(subscription, topic.getBrokerService().pulsar().getConfiguration());
this.cursor = cursor;
this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
Expand Down Expand Up @@ -224,9 +222,11 @@ public synchronized void readMoreEntries() {
int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits);
Pair<Integer, Long> calculateResult = calculateToRead(currentTotalAvailablePermits);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();

if (-1 == messagesToRead) {
if (messagesToRead == -1 || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
return;
}
Expand Down Expand Up @@ -263,8 +263,7 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
this,
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytesToRead can potentially be -1 now (like at line 315 when new MutablePair<>(-1, -1); is returned) when dispatch limit is not set. This changes the default behaviour because earlier it was capped by serviceConfig.getDispatcherMaxReadSizeBytes()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in line 230, when bytesToRead or messageToRead is -1, it will not process read operation.

ReadType.Normal, topic.getMaxReadPosition());
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
Expand All @@ -276,8 +275,10 @@ public synchronized void readMoreEntries() {
}
}

protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
// left pair is messagesToRead, right pair is bytesToRead
protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits) {
int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();

Consumer c = getRandomConsumer();
// if turn on precise dispatcher flow control, adjust the record to read
Expand Down Expand Up @@ -310,13 +311,15 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
}
topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
return -1;
return Pair.of(-1, -1L);
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
(int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());

messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();

}
}

Expand All @@ -331,13 +334,14 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
}
topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
return -1;
return Pair.of(-1, -1L);
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
(int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());

messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();
}
}

Expand All @@ -347,11 +351,11 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
}
return -1;
return Pair.of(-1, -1L);
}

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
return Math.max(messagesToRead, 1);
return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
}

protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
Expand Down
Loading