Skip to content

Commit 814781d

Browse files
lhotarisrinath-ctds
authored andcommitted
[fix][ml] Fix issues in estimateEntryCountBySize (apache#24089)
(cherry picked from commit a44b2cf) (cherry picked from commit 098d040)
1 parent dc0176e commit 814781d

File tree

6 files changed

+504
-73
lines changed

6 files changed

+504
-73
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.impl;
20+
21+
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
22+
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.DEFAULT_ESTIMATED_ENTRY_SIZE;
23+
import java.util.Collection;
24+
import java.util.Map;
25+
import java.util.NavigableMap;
26+
import org.apache.bookkeeper.client.LedgerHandle;
27+
import org.apache.bookkeeper.mledger.Position;
28+
import org.apache.bookkeeper.mledger.PositionFactory;
29+
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
30+
31+
class EntryCountEstimator {
32+
// Prevent instantiation, this is a utility class with only static methods
33+
private EntryCountEstimator() {
34+
}
35+
36+
/**
37+
* Estimates the number of entries that can be read within the specified byte size starting from the given position
38+
* in the ledger.
39+
*
40+
* @param maxEntries stop further estimation if the number of estimated entries exceeds this value
41+
* @param maxSizeBytes the maximum size in bytes for the entries to be estimated
42+
* @param readPosition the position in the ledger from where to start reading
43+
* @param ml the {@link ManagedLedgerImpl} instance to use for accessing ledger information
44+
* @return the estimated number of entries that can be read
45+
*/
46+
static int estimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Position readPosition,
47+
ManagedLedgerImpl ml) {
48+
LedgerHandle currentLedger = ml.getCurrentLedger();
49+
// currentLedger is null in ReadOnlyManagedLedgerImpl
50+
Long lastLedgerId = currentLedger != null ? currentLedger.getId() : null;
51+
long lastLedgerTotalSize = ml.getCurrentLedgerSize();
52+
long lastLedgerTotalEntries = ml.getCurrentLedgerEntries();
53+
return internalEstimateEntryCountByBytesSize(maxEntries, maxSizeBytes, readPosition, ml.getLedgersInfo(),
54+
lastLedgerId, lastLedgerTotalEntries, lastLedgerTotalSize);
55+
}
56+
57+
/**
58+
* Internal method to estimate the number of entries that can be read within the specified byte size.
59+
* This method is used for unit testing to validate the logic without directly accessing {@link ManagedLedgerImpl}.
60+
*
61+
* @param maxEntries stop further estimation if the number of estimated entries exceeds this value
62+
* @param maxSizeBytes the maximum size in bytes for the entries to be estimated
63+
* @param readPosition the position in the ledger from where to start reading
64+
* @param ledgersInfo a map of ledger ID to {@link MLDataFormats.ManagedLedgerInfo.LedgerInfo} containing
65+
* metadata for ledgers
66+
* @param lastLedgerId the ID of the last active ledger in the managed ledger
67+
* @param lastLedgerTotalEntries the total number of entries in the last active ledger
68+
* @param lastLedgerTotalSize the total size in bytes of the last active ledger
69+
* @return the estimated number of entries that can be read
70+
*/
71+
static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Position readPosition,
72+
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
73+
ledgersInfo,
74+
Long lastLedgerId, long lastLedgerTotalEntries,
75+
long lastLedgerTotalSize) {
76+
if (maxSizeBytes <= 0) {
77+
// If the specified maximum size is invalid (e.g., non-positive), return 0
78+
return 0;
79+
}
80+
81+
// If the maximum size is Long.MAX_VALUE, return the maximum number of entries
82+
if (maxSizeBytes == Long.MAX_VALUE) {
83+
return maxEntries;
84+
}
85+
86+
// Adjust the read position to ensure it falls within the valid range of available ledgers.
87+
// This handles special cases such as EARLIEST and LATEST positions by resetting them
88+
// to the first available ledger or the last active ledger, respectively.
89+
if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
90+
readPosition = PositionFactory.create(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
91+
} else if (lastLedgerId == null && readPosition.getLedgerId() > ledgersInfo.lastKey()) {
92+
Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
93+
readPosition =
94+
PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
95+
} else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) {
96+
readPosition = PositionFactory.create(ledgersInfo.firstKey(), 0);
97+
}
98+
99+
long estimatedEntryCount = 0;
100+
long remainingBytesSize = maxSizeBytes;
101+
// Start with a default estimated average size per entry, including any overhead
102+
long currentAvgSize = DEFAULT_ESTIMATED_ENTRY_SIZE + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
103+
// Get a collection of ledger info starting from the read position
104+
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersAfterReadPosition =
105+
ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();
106+
107+
// calculate the estimated entry count based on the remaining bytes and ledger metadata
108+
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
109+
if (remainingBytesSize <= 0 || estimatedEntryCount >= maxEntries) {
110+
// Stop processing if there are no more bytes remaining to allocate for entries
111+
// or if the estimated entry count exceeds the maximum allowed entries
112+
break;
113+
}
114+
long ledgerId = ledgerInfo.getLedgerId();
115+
long ledgerTotalSize = ledgerInfo.getSize();
116+
long ledgerTotalEntries = ledgerInfo.getEntries();
117+
118+
// Adjust ledger size and total entry count if this is the last active ledger since the
119+
// ledger metadata doesn't include the current ledger's size and entry count
120+
// the lastLedgerId is null in ReadOnlyManagedLedgerImpl
121+
if (lastLedgerId != null && ledgerId == lastLedgerId.longValue()
122+
&& lastLedgerTotalSize > 0 && lastLedgerTotalEntries > 0) {
123+
ledgerTotalSize = lastLedgerTotalSize;
124+
ledgerTotalEntries = lastLedgerTotalEntries;
125+
}
126+
127+
// Skip processing ledgers that have no entries or size
128+
if (ledgerTotalEntries == 0 || ledgerTotalSize == 0) {
129+
continue;
130+
}
131+
132+
// Update the average entry size based on the current ledger's size and entry count
133+
currentAvgSize = Math.max(1, ledgerTotalSize / ledgerTotalEntries)
134+
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
135+
136+
// Calculate the total size of this ledger, inclusive of bookkeeping overhead per entry
137+
long ledgerTotalSizeWithBkOverhead =
138+
ledgerTotalSize + ledgerTotalEntries * BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
139+
140+
// If the remaining bytes are insufficient to read the full ledger, estimate the readable entries
141+
// or when the read position is beyond the first entry in the ledger
142+
if (remainingBytesSize < ledgerTotalSizeWithBkOverhead
143+
|| readPosition.getLedgerId() == ledgerId && readPosition.getEntryId() > 0) {
144+
long entryCount;
145+
if (readPosition.getLedgerId() == ledgerId && readPosition.getEntryId() > 0) {
146+
entryCount = Math.max(ledgerTotalEntries - readPosition.getEntryId(), 1);
147+
} else {
148+
entryCount = ledgerTotalEntries;
149+
}
150+
// Estimate how many entries can fit within the remaining bytes
151+
long entriesToRead = Math.min(Math.max(1, remainingBytesSize / currentAvgSize), entryCount);
152+
estimatedEntryCount += entriesToRead;
153+
remainingBytesSize -= entriesToRead * currentAvgSize;
154+
} else {
155+
// If the full ledger can be read, add all its entries to the count and reduce its size
156+
estimatedEntryCount += ledgerTotalEntries;
157+
remainingBytesSize -= ledgerTotalSizeWithBkOverhead;
158+
}
159+
}
160+
161+
// Add any remaining bytes to the estimated entry count considering the current average entry size
162+
if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
163+
estimatedEntryCount += remainingBytesSize / currentAvgSize;
164+
}
165+
166+
// Ensure at least one entry is always returned as the result
167+
return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1);
168+
}
169+
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import static java.util.Objects.requireNonNull;
2323
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
24+
import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
2425
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
2526
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
2627
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
27-
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
2828
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
2929
import com.google.common.annotations.VisibleForTesting;
3030
import com.google.common.base.MoreObjects;
@@ -3811,53 +3811,8 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
38113811
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
38123812
return maxEntries;
38133813
}
3814-
long estimatedEntryCount = estimateEntryCountBySize(maxSizeBytes, readPosition, ledger);
3815-
if (estimatedEntryCount > Integer.MAX_VALUE) {
3816-
return maxEntries;
3817-
}
3818-
return Math.min((int) estimatedEntryCount, maxEntries);
3819-
}
3820-
3821-
static long estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) {
3822-
Position posToRead = readPosition;
3823-
if (!ml.isValidPosition(readPosition)) {
3824-
posToRead = ml.getNextValidPosition(readPosition);
3825-
}
3826-
long result = 0;
3827-
long remainingBytesSize = bytesSize;
3828-
3829-
while (remainingBytesSize > 0) {
3830-
// Last ledger.
3831-
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
3832-
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
3833-
// Only read 1 entry if no entries to read.
3834-
return 1;
3835-
}
3836-
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
3837-
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3838-
result += remainingBytesSize / avg;
3839-
break;
3840-
}
3841-
// Skip empty ledger.
3842-
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
3843-
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
3844-
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
3845-
continue;
3846-
}
3847-
// Calculate entries by average of ledgers.
3848-
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3849-
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
3850-
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
3851-
result += remainingBytesSize / avg;
3852-
break;
3853-
} else {
3854-
// Calculate for the next ledger.
3855-
result += remainEntriesOfLedger;
3856-
remainingBytesSize -= remainEntriesOfLedger * avg;
3857-
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
3858-
}
3859-
}
3860-
return Math.max(result, 1);
3814+
int estimatedEntryCount = estimateEntryCountByBytesSize(maxEntries, maxSizeBytes, readPosition, ledger);
3815+
return Math.min(estimatedEntryCount, maxEntries);
38613816
}
38623817

38633818
@Override

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class RangeEntryCacheImpl implements EntryCache {
5959
* Overhead per-entry to take into account the envelope.
6060
*/
6161
public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
62-
private static final int DEFAULT_ESTIMATED_ENTRY_SIZE = 10 * 1024;
62+
public static final int DEFAULT_ESTIMATED_ENTRY_SIZE = 10 * 1024;
6363
private static final boolean DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY = false;
6464

6565
private final RangeEntryCacheManagerImpl manager;

0 commit comments

Comments
 (0)