Skip to content

Commit

Permalink
[feat][broker] Implementation of PIP-323: Complete Backlog Quota Tele…
Browse files Browse the repository at this point in the history
…metry (apache#21816) (apache#22740)

Co-authored-by: Asaf Mesika <asaf.mesika@gmail.com>
  • Loading branch information
codelipenghui and asafm committed May 18, 2024
1 parent 5d0f1d6 commit ce7a07b
Show file tree
Hide file tree
Showing 37 changed files with 1,413 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public interface ManagedLedgerMXBean {
*/
long getAddEntryErrors();

/**
* @return the number of entries read from the managed ledger (from cache or BK)
*/
long getEntriesReadTotalCount();

/**
* @return the number of readEntries requests that succeeded
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,46 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.StampedLock;
import lombok.Value;
import lombok.experimental.UtilityClass;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;

/**
* Contains cursors for a ManagedLedger.
*
* <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
*
* <p/>This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* <p>
* The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
* <p>
* This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
* in a single array. More details about heap implementations:
* https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
*
* <p/>The heap is updated and kept sorted when a cursor is updated.
* <a href="https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation">here</a>
* <p>
* The heap is updated and kept sorted when a cursor is updated.
*
*/
public class ManagedCursorContainer implements Iterable<ManagedCursor> {

/**
* This field is incremented everytime the cursor information is updated.
*/
private long version;

@Value
public static class CursorInfo {
ManagedCursor cursor;
PositionImpl position;

/**
* Cursor info's version.
* <p>
* Use {@link DataVersion#compareVersions(long, long)} to compare between two versions,
* since it rolls over to 0 once reaching Long.MAX_VALUE
*/
long version;
}

private static class Item {
final ManagedCursor cursor;
PositionImpl position;
Expand All @@ -56,10 +77,66 @@ private static class Item {
}
}

public ManagedCursorContainer() {
/**
* Utility class to manage a data version, which rolls over to 0 when reaching Long.MAX_VALUE.
*/
@UtilityClass
public class DataVersion {

/**
* Compares two data versions, which either rolls overs to 0 when reaching Long.MAX_VALUE.
* <p>
* Use {@link DataVersion#getNextVersion(long)} to increment the versions. The assumptions
* are that metric versions are compared with close time proximity one to another, hence,
* they are expected not close to each other in terms of distance, hence we don't
* expect the distance ever to exceed Long.MAX_VALUE / 2, otherwise we wouldn't be able
* to know which one is a later version in case the furthest rolls over to beyond 0. We
* assume the shortest distance between them dictates that.
* <p>
* @param v1 First version to compare
* @param v2 Second version to compare
* @return the value {@code 0} if {@code v1 == v2};
* a value less than {@code 0} if {@code v1 < v2}; and
* a value greater than {@code 0} if {@code v1 > v2}
*/
public static int compareVersions(long v1, long v2) {
if (v1 == v2) {
return 0;
}

// 0-------v1--------v2--------MAX_LONG
if (v2 > v1) {
long distance = v2 - v1;
long wrapAroundDistance = (Long.MAX_VALUE - v2) + v1;
if (distance < wrapAroundDistance) {
return -1;
} else {
return 1;
}

// 0-------v2--------v1--------MAX_LONG
} else {
long distance = v1 - v2;
long wrapAroundDistance = (Long.MAX_VALUE - v1) + v2;
if (distance < wrapAroundDistance) {
return 1; // v1 is bigger
} else {
return -1; // v2 is bigger
}
}
}

public static long getNextVersion(long existingVersion) {
if (existingVersion == Long.MAX_VALUE) {
return 0;
} else {
return existingVersion + 1;
}
}
}

public ManagedCursorContainer() {}

// Used to keep track of slowest cursor.
private final ArrayList<Item> heap = new ArrayList<>();

Expand Down Expand Up @@ -94,6 +171,7 @@ public void add(ManagedCursor cursor, Position position) {
if (cursor.isDurable()) {
durableCursorCount++;
}
version = DataVersion.getNextVersion(version);
} finally {
rwLock.unlockWrite(stamp);
}
Expand Down Expand Up @@ -129,6 +207,7 @@ public boolean removeCursor(String name) {
if (item.cursor.isDurable()) {
durableCursorCount--;
}
version = DataVersion.getNextVersion(version);
return true;
} else {
return false;
Expand Down Expand Up @@ -162,6 +241,7 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi

PositionImpl previousSlowestConsumer = heap.get(0).position;
item.position = (PositionImpl) newPosition;
version = DataVersion.getNextVersion(version);

if (heap.size() == 1) {
return Pair.of(previousSlowestConsumer, item.position);
Expand Down Expand Up @@ -204,6 +284,24 @@ public ManagedCursor getSlowestReader() {
}
}

/**
* @return Returns the CursorInfo for the cursor with the oldest position,
* or null if there aren't any tracked cursors
*/
public CursorInfo getCursorWithOldestPosition() {
long stamp = rwLock.readLock();
try {
if (heap.isEmpty()) {
return null;
} else {
Item item = heap.get(0);
return new CursorInfo(item.cursor, item.position, version);
}
} finally {
rwLock.unlockRead(stamp);
}
}

/**
* Check whether there are any cursors.
* @return true is there are no cursors and false if there are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public enum PositionBound {

/**
* This variable is used for testing the tests.
* {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()}
* ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()
*/
@VisibleForTesting
Map<String, byte[]> createdLedgerCustomMetadata;
Expand Down Expand Up @@ -2127,6 +2127,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
}

protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
mbean.addEntriesRead(1);
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
private final Rate readEntriesOpsFailed = new Rate();
private final Rate readEntriesOpsCacheMisses = new Rate();
private final Rate markDeleteOps = new Rate();
private final Rate entriesRead = new Rate();

private final LongAdder dataLedgerOpenOp = new LongAdder();
private final LongAdder dataLedgerCloseOp = new LongAdder();
Expand Down Expand Up @@ -80,6 +81,7 @@ public void refreshStats(long period, TimeUnit unit) {
ledgerAddEntryLatencyStatsUsec.refresh();
ledgerSwitchLatencyStatsUsec.refresh();
entryStats.refresh();
entriesRead.calculateRate(seconds);
}

public void addAddEntrySample(long size) {
Expand Down Expand Up @@ -120,6 +122,10 @@ public void addReadEntriesSample(int count, long totalSize) {
readEntriesOps.recordMultipleEvents(count, totalSize);
}

public void addEntriesRead(int count) {
entriesRead.recordEvent(count);
}

public void startDataLedgerOpenOp() {
dataLedgerOpenOp.increment();
}
Expand Down Expand Up @@ -189,6 +195,11 @@ public String getName() {
return managedLedger.getName();
}

@Override
public long getEntriesReadTotalCount() {
return entriesRead.getTotalCount();
}

@Override
public double getAddEntryMessagesRate() {
return addEntryOps.getRate();
Expand Down Expand Up @@ -333,5 +344,4 @@ public PendingBookieOpsStats getPendingBookieOpsStats() {
result.cursorLedgerDeleteOp = cursorLedgerDeleteOp.longValue();
return result;
}

}
Loading

0 comments on commit ce7a07b

Please sign in to comment.