Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Add time based backlog quota #10093

Merged
merged 8 commits into from Apr 21, 2021
Expand Up @@ -3532,6 +3532,10 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
}

public Clock getClock() {
return clock;
}

/**
* check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
*
Expand Down
Expand Up @@ -310,18 +310,39 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Enable backlog quota check. Enforces actions on topic when the quota is reached"
)
private boolean backlogQuotaCheckEnabled = true;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Whether to enable precise time based backlog quota check. "
+ "Enabling precise time based backlog quota check will cause broker to read first entry in backlog "
+ "of the slowest cursor on a ledger which will mostly result in reading entry from BookKeeper's " +
"disk which can have negative impact on overall performance. "
+ "Disabling precise time based backlog quota check will just use the timestamp indicating when a "
+ "ledger was closed, which is of coarser granularity."
)
private boolean preciseTimeBasedBacklogQuotaCheck = false;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "How often to check for topics that have reached the quota."
+ " It only takes effects when `backlogQuotaCheckEnabled` is true"
)
private int backlogQuotaCheckIntervalInSeconds = 60;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Default per-topic backlog quota limit, less than 0 means no limitation. default is -1."
doc = "Default per-topic backlog quota limit by size, less than 0 means no limitation. default is -1."
+ " Increase it if you want to allow larger msg backlog"
)
private long backlogQuotaDefaultLimitGB = -1;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Default per-topic backlog quota limit by time in second, less than 0 means no limitation. " +
"default is -1. Increase it if you want to allow larger msg backlog"
)
private int backlogQuotaDefaultLimitSecond = -1;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Default backlog quota retention policy. Default is producer_request_hold\n\n"
Expand All @@ -331,6 +352,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog"
)
private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = BacklogQuota.RetentionPolicy.producer_request_hold;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Default ttl for namespaces if ttl is not already configured at namespace policies. "
Expand Down
Expand Up @@ -414,7 +414,7 @@ protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retent
if (quota == null) {
quota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
}
if (quota.getLimit() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
if (quota.getLimitSize() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
return false;
}
return true;
Expand Down
Expand Up @@ -27,8 +27,11 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand All @@ -52,6 +55,7 @@ public BacklogQuotaManager(PulsarService pulsar) {
this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
this.defaultQuota = new BacklogQuota(
pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024,
pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond(),
pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy());
this.zkCache = pulsar.getConfigurationCache().policiesCache();
this.pulsar = pulsar;
Expand Down Expand Up @@ -90,23 +94,37 @@ public BacklogQuota getBacklogQuota(TopicName topicName) {
return getBacklogQuota(topicName.getNamespace(), policyPath);
}

public long getBacklogQuotaLimit(TopicName topicName) {
return getBacklogQuota(topicName).getLimit();
public long getBacklogQuotaLimitInSize(TopicName topicName) {
return getBacklogQuota(topicName).getLimitSize();
}

public int getBacklogQuotaLimitInTime(TopicName topicName) {
return getBacklogQuota(topicName).getLimitTime();
}

/**
* Handle exceeded backlog by using policies set in the zookeeper for given topic.
* Handle exceeded size backlog by using policies set in the zookeeper for given topic.
*
* @param persistentTopic Topic on which backlog has been exceeded
*/
public void handleExceededBacklogQuota(PersistentTopic persistentTopic) {
public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType,
boolean preciseTimeBasedBacklogQuotaCheck) {
TopicName topicName = TopicName.get(persistentTopic.getName());
BacklogQuota quota = getBacklogQuota(topicName);
log.info("Backlog quota exceeded for topic [{}]. Applying [{}] policy", persistentTopic.getName(),
quota.getPolicy());
log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", backlogQuotaType,
persistentTopic.getName(), quota.getPolicy());
switch (quota.getPolicy()) {
case consumer_backlog_eviction:
dropBacklog(persistentTopic, quota);
switch (backlogQuotaType) {
case destination_storage:
dropBacklogForSizeLimit(persistentTopic, quota);
break;
case message_age:
dropBacklogForTimeLimit(persistentTopic, quota, preciseTimeBasedBacklogQuotaCheck);
break;
default:
break;
}
break;
case producer_exception:
case producer_request_hold:
Expand All @@ -125,10 +143,10 @@ public void handleExceededBacklogQuota(PersistentTopic persistentTopic) {
* @param quota
* Backlog quota set for the topic
*/
private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {
private void dropBacklogForSizeLimit(PersistentTopic persistentTopic, BacklogQuota quota) {
// Set the reduction factor to 90%. The aim is to drop down the backlog to 90% of the quota limit.
double reductionFactor = 0.9;
double targetSize = reductionFactor * quota.getLimit();
double targetSize = reductionFactor * quota.getLimitSize();

// Get estimated unconsumed size for the managed ledger associated with this topic. Estimated size is more
// useful than the actual storage size. Actual storage size gets updated only when managed ledger is trimmed.
Expand Down Expand Up @@ -188,7 +206,51 @@ private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {
backlogSize, messageSkipFactor);
}
}
}

/**
* Drop the backlog on the topic.
*
* @param persistentTopic
* The topic from which backlog should be dropped
* @param quota
* Backlog quota set for the topic
*/
private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuota quota,
boolean preciseTimeBasedBacklogQuotaCheck) {
// If enabled precise time based backlog quota check, will expire message based on the timeBaseQuota
if (preciseTimeBasedBacklogQuotaCheck) {
// Set the reduction factor to 90%. The aim is to drop down the backlog to 90% of the quota limit.
double reductionFactor = 0.9;
int target = (int) (reductionFactor * quota.getLimitTime());
if (log.isDebugEnabled()) {
log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), target);
}
for (PersistentSubscription subscription : persistentTopic.getSubscriptions().values()) {
subscription.getExpiryMonitor().expireMessages(target);
}
} else {
// If disabled precise time based backlog quota check, will try to remove whole ledger from cursor's backlog
Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis();
ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
try {
Long ledgerId = mLedger.getCursors().getSlowestReaderPosition().getLedgerId();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(ledgerId).get();
// Timestamp only > 0 if ledger has been closed
while (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
// skip whole ledger for the slowest cursor
slowestConsumer.resetCursor(mLedger.getNextValidPosition(
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1)));
ledgerId = mLedger.getCursors().getSlowestReaderPosition().getLedgerId();
ledgerInfo = mLedger.getLedgerInfo(ledgerId).get();
}
} catch (Exception e) {
log.error("Error resetting cursor for slowest consumer [{}]: {}",
mLedger.getSlowestConsumer().getName(), e);
}
}
}

/**
Expand Down
Expand Up @@ -146,6 +146,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
Expand Down Expand Up @@ -1595,8 +1596,13 @@ public synchronized void monitorBacklogQuota() {
forEachTopic(topic -> {
if (topic instanceof PersistentTopic) {
PersistentTopic persistentTopic = (PersistentTopic) topic;
if (persistentTopic.isBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic);
if (persistentTopic.isSizeBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic,
BacklogQuota.BacklogQuotaType.destination_storage, false);
} else if (persistentTopic.isTimeBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic,
BacklogQuota.BacklogQuotaType.message_age,
pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
} else {
if (log.isDebugEnabled()) {
log.debug("quota not exceeded for [{}]", topic.getName());
Expand Down
Expand Up @@ -918,6 +918,10 @@ public double getExpiredMessageRate() {
return expiryMonitor.getMessageExpiryRate();
}

public PersistentMessageExpiryMonitor getExpiryMonitor() {
return expiryMonitor;
}

public long estimateBacklogSize() {
return cursor.getEstimatedSizeSinceMarkDeletePosition();
}
Expand Down
Expand Up @@ -61,6 +61,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -2244,7 +2245,7 @@ public boolean isBacklogQuotaExceeded(String producerName) {

if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold
|| retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)
&& isBacklogExceeded()) {
&& (isSizeBacklogExceeded() || isTimeBacklogExceeded())) {
log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName);
return true;
} else {
Expand All @@ -2255,11 +2256,11 @@ && isBacklogExceeded()) {
}

/**
* @return determine if quota enforcement needs to be done for topic
* @return determine if backlog quota enforcement needs to be done for topic based on size limit
*/
public boolean isBacklogExceeded() {
public boolean isSizeBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimit(topicName);
long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimitInSize(topicName);
if (backlogQuotaLimitInBytes < 0) {
return false;
}
Expand All @@ -2276,6 +2277,88 @@ public boolean isBacklogExceeded() {
return (storageSize >= backlogQuotaLimitInBytes);
}

/**
* @return determine if backlog quota enforcement needs to be done for topic based on time limit
*/
public boolean isTimeBacklogExceeded() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more efficient way to achieve this is based on the Ledger metadata, we can evaluated a time based on the Ledger create time, Ledger create time(next ledger create time) and the max entry Id of the ledger. So that we don't need to read an entry. Of course this will affect the accuracy of the limit, but the way to read the entry every time will make the bookkeeper's burden heavier in a large number of topic cluster

Copy link
Contributor Author

@MarvinCai MarvinCai Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui Yes, I agree, reading first entry in backlog of slowest cursor will mostly causing read from BK disk which could affect overall performance, but using metadata could also affect accuracy as you mentioned, how about we have both and make it configurable so user can make their own decision? We can have the metadata one as default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MarvinCai LGTM!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopicName topicName = TopicName.get(getName());
CompletableFuture<Boolean> future = new CompletableFuture<>();
int backlogQuotaLimitInSecond = brokerService.getBacklogQuotaManager().getBacklogQuotaLimitInTime(topicName);

// If backlog quota by time is not set and we have no durable cursor.
if (backlogQuotaLimitInSecond <= 0
|| ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition() == null) {
return false;
}

if (brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) {
// Check if first unconsumed message(first message after mark delete position)
// for slowest cursor's has expired.
PositionImpl position = ((ManagedLedgerImpl) ledger).getNextValidPosition(((ManagedCursorContainer)
ledger.getCursors()).getSlowestReaderPosition());
((ManagedLedgerImpl) ledger).asyncReadEntry(position,
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
boolean expired = msg.isExpired(backlogQuotaLimitInSecond);
if (expired && log.isDebugEnabled()) {
log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
+ "exceeded quota {}", ((ManagedLedgerImpl) ledger).getSlowestConsumer().getName(),
backlogQuotaLimitInSecond);
}
future.complete(expired);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for backlog check", e);
future.complete(false);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Error reading entry for precise time based backlog check",
topicName, exception);
future.complete(false);
}
}, null);

try {
return future.get();
} catch (Exception e) {
log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
return false;
}
} else {
Long ledgerId = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
try {
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
&& ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
> backlogQuotaLimitInSecond * 1000) {
if (log.isDebugEnabled()) {
log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
+ "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
}
return true;
} else {
return false;
}
} catch (Exception e) {
log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
return false;
}
}
}

@Override
public boolean isReplicated() {
return !replicators.isEmpty();
Expand Down
Expand Up @@ -31,7 +31,12 @@ public SystemTopic(String topic, ManagedLedger ledger, BrokerService brokerServi
}

@Override
public boolean isBacklogExceeded() {
public boolean isSizeBacklogExceeded() {
return false;
}

@Override
public boolean isTimeBacklogExceeded() {
return false;
}

Expand Down