Skip to content

Commit

Permalink
Add a metric to get earliest time in the backlog (#12523)
Browse files Browse the repository at this point in the history
### Motivation

Currently, there is no metric to show the publish time of the earliest message in the backlog.

### Modifications

Add a metric to show the timestamp from `the publish time of the earliest message in the backlog` to `the current time`.

Usage:
```bash
pulsar-admin topics stats <topic-name> -etb
# The command will only compute the metric when the flag `-etb` is `true`.
```

### Verifying this change

This change added tests and can be verified as follows:
  - Add unit test `AdminApiTest#testGetStats`
  • Loading branch information
LeBW committed Dec 8, 2021
1 parent 01d73aa commit ebedc30
Show file tree
Hide file tree
Showing 39 changed files with 337 additions and 96 deletions.
Expand Up @@ -388,6 +388,13 @@ public interface ManagedLedger {
*/
long getEstimatedBacklogSize();

/**
* Get the publishing time of the oldest message in the backlog.
*
* @return the publishing time of the oldest message
*/
CompletableFuture<Long> getEarliestMessagePublishTimeInBacklog();

/**
* Return the size of all ledgers offloaded to 2nd tier storage
*/
Expand Down
Expand Up @@ -35,6 +35,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -131,13 +132,15 @@
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private static final long MegaByte = 1024 * 1024;

Expand Down Expand Up @@ -1149,6 +1152,43 @@ public long getEstimatedBacklogSize() {
}
}

@Override
public CompletableFuture<Long> getEarliestMessagePublishTimeInBacklog() {
PositionImpl pos = getMarkDeletePositionOfSlowestConsumer();

return getEarliestMessagePublishTimeOfPos(pos);
}

public CompletableFuture<Long> getEarliestMessagePublishTimeOfPos(PositionImpl pos) {
CompletableFuture<Long> future = new CompletableFuture<>();
if (pos == null) {
future.complete(0L);
return future;
}
PositionImpl nextPos = getNextValidPosition(pos);

asyncReadEntry(nextPos, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
future.complete(entryTimestamp);
} catch (IOException e) {
log.error("Error deserializing message for message position {}", nextPos, e);
future.completeExceptionally(e);
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
log.error("Error read entry for position {}", nextPos, exception);
future.completeExceptionally(exception);
}
}, null);

return future;
}

/**
* Get estimated backlog size from a specific position.
*/
Expand Down
Expand Up @@ -1149,15 +1149,21 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR
}

protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog,
boolean subscriptionBacklogSize) {
boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.GET_STATS);

Topic topic = getTopicReference(topicName);
return topic.getStats(getPreciseBacklog, subscriptionBacklogSize);
try {
return topic.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[{}] Failed to get stats for {}", clientAppId(), topicName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR,
(e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
}
}

protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative, boolean metadata) {
Expand Down Expand Up @@ -1280,8 +1286,9 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}, null);
}

protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition,
boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
if (topicName.isGlobal()) {
try {
validateGlobalNamespaceOwnership(namespaceName);
Expand All @@ -1303,8 +1310,8 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
try {
topicStatsFutureList
.add(pulsar().getAdminClient().topics().getStatsAsync(
(topicName.getPartition(i).toString()), getPreciseBacklog,
subscriptionBacklogSize));
(topicName.getPartition(i).toString()), getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
Expand Down Expand Up @@ -2410,7 +2417,7 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestamp(long time
ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger();
return ledger.asyncFindPosition(entry -> {
try {
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}] Error deserializing message for message position find", topicName, e);
Expand Down
Expand Up @@ -106,7 +106,7 @@ public NonPersistentTopicStats getStats(@PathParam("property") String property,
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.GET_STATS);
Topic topic = getTopicReference(topicName);
return ((NonPersistentTopic) topic).getStats(getPreciseBacklog, false);
return ((NonPersistentTopic) topic).getStats(getPreciseBacklog, false, false);
}

@GET
Expand Down
Expand Up @@ -342,7 +342,7 @@ public TopicStats getStats(@PathParam("property") String property, @PathParam("c
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
validateTopicName(property, cluster, namespace, encodedTopic);
return internalGetStats(authoritative, getPreciseBacklog, false);
return internalGetStats(authoritative, getPreciseBacklog, false, false);
}

@GET
Expand Down Expand Up @@ -390,7 +390,7 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, false);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, false, false);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Expand Up @@ -125,13 +125,16 @@ public NonPersistentTopicStats getStats(
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return time of the earliest message in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.GET_STATS);

Topic topic = getTopicReference(topicName);
return ((NonPersistentTopic) topic).getStats(getPreciseBacklog, subscriptionBacklogSize);
return ((NonPersistentTopic) topic).getStats(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog);
}

@GET
Expand Down Expand Up @@ -237,7 +240,9 @@ public void getPartitionedStats(
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
if (topicName.isGlobal()) {
Expand All @@ -263,7 +268,7 @@ public void getPartitionedStats(
topicStatsFutureList
.add(pulsar().getAdminClient().topics().getStatsAsync(
(topicName.getPartition(i).toString()), getPreciseBacklog,
subscriptionBacklogSize));
subscriptionBacklogSize, getEarliestTimeInBacklog));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
Expand Down
Expand Up @@ -950,9 +950,11 @@ public TopicStats getStats(
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return time of the earliest message in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetStats(authoritative, getPreciseBacklog, subscriptionBacklogSize);
return internalGetStats(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog);
}

@GET
Expand Down Expand Up @@ -1032,11 +1034,13 @@ public void getPartitionedStats(
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog,
subscriptionBacklogSize);
subscriptionBacklogSize, getEarliestTimeInBacklog);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Expand Up @@ -904,11 +904,11 @@ public long getBytesInCounter() {
}

public long getMsgOutCounter() {
return getStats(false, false).msgOutCounter;
return getStats(false, false, false).msgOutCounter;
}

public long getBytesOutCounter() {
return getStats(false, false).bytesOutCounter;
return getStats(false, false, false).bytesOutCounter;
}

public boolean isDeleteWhileInactive() {
Expand Down
Expand Up @@ -2031,7 +2031,7 @@ public String generateUniqueProducerName() {
public Map<String, TopicStatsImpl> getTopicStats() {
HashMap<String, TopicStatsImpl> stats = new HashMap<>();

forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, false)));
forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, false, false)));

return stats;
}
Expand Down
Expand Up @@ -220,7 +220,12 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats

ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();

TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize);
TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);

CompletableFuture<? extends TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);

CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata);

Expand Down
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -807,8 +808,21 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
}

@Override
public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
try {
return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getPreciseBacklog).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[{}] Fail to get stats", topic, e);
return null;
}
}

@Override
public CompletableFuture<NonPersistentTopicStatsImpl> asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
CompletableFuture<NonPersistentTopicStatsImpl> future = new CompletableFuture<>();
NonPersistentTopicStatsImpl stats = new NonPersistentTopicStatsImpl();

ObjectObjectHashMap<String, PublisherStatsImpl> remotePublishersStats = new ObjectObjectHashMap<>();
Expand Down Expand Up @@ -861,7 +875,8 @@ public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean s
});

stats.topicEpoch = topicEpoch.orElse(null);
return stats;
future.complete(stats);
return future;
}

@Override
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,7 +75,7 @@ public boolean expireMessages(int messageTTLInSeconds) {

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
try {
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
try {
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
Expand Down

0 comments on commit ebedc30

Please sign in to comment.