Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] DO NOT MERGE #285

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
27e5360
[fix][broker] Make ExtensibleLoadManagerImpl.getOwnedServiceUnits asy…
heesung-sn May 17, 2024
431ef3d
[improve] [broker] [break change] Do not create partitioned DLQ/Retry…
poorbarcode May 15, 2024
687a253
[fix][schema] Error checking schema compatibility on a schema-less to…
shibd May 16, 2024
24432eb
[fix][offload] Break the fillbuffer loop when met EOF (#22722)
zymap May 17, 2024
63eb45e
[feat][broker] Implementation of PIP-323: Complete Backlog Quota Tele…
codelipenghui May 18, 2024
5e5f69f
[improve][broker] checkTopicExists supports checking partitioned topi…
nodece Dec 19, 2023
6a51b81
[improve] [test] Add a test to guarantee the TNX topics will not be r…
poorbarcode May 16, 2024
d5e5ed8
[fix] [broker] fix deadlock when disable topic level Geo-Replication …
poorbarcode May 19, 2024
0307c65
[fix][ml]: subscription props could be lost in case of missing ledger…
nicoloboschi May 20, 2024
d37b340
[fix] [ml] Add entry fail due to race condition about add entry faile…
poorbarcode May 21, 2024
9290050
[fix][admin][part-1]Clearly define REST API on Open API (#22774)
shibd May 26, 2024
5e66fe9
[fix][admin] Clearly define REST API on Open API for Namesaces@v2 (#2…
shibd May 27, 2024
9d2f817
[fix][admin] Clearly define REST API on Open API for Topics (#22782)
shibd May 27, 2024
5e883e0
[fix][admin] Clearly define REST API on Open API (#22783)
shibd May 27, 2024
5c02572
[fix] [broker] fix topic partitions was expanded even if disabled top…
poorbarcode May 28, 2024
d7d048e
[improve][cli][branch-3.0] PIP-353: Improve transaction message visib…
shibd May 28, 2024
71c4210
[improve][broker] avoid creating new objects when intercepting (#22790)
mattisonchao May 28, 2024
974ad16
[fix][broker] EntryFilters fix NoClassDefFoundError due to closed cla…
eolivelli May 29, 2024
c5edb86
[improve][broker] Clear thread local BrokerEntryMetadata instance bef…
lhotari May 22, 2024
b17d429
[improve][broker] Remove ClassLoaderSwitcher to avoid objects allocat…
dao-jun May 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public interface ManagedLedgerMXBean {
*/
long getAddEntryErrors();

/**
* @return the number of entries read from the managed ledger (from cache or BK)
*/
long getEntriesReadTotalCount();

/**
* @return the number of readEntries requests that succeeded
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,46 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.StampedLock;
import lombok.Value;
import lombok.experimental.UtilityClass;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;

/**
* Contains cursors for a ManagedLedger.
*
* <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
*
* <p/>This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* <p>
* The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
* <p>
* This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
* in a single array. More details about heap implementations:
* https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
*
* <p/>The heap is updated and kept sorted when a cursor is updated.
* <a href="https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation">here</a>
* <p>
* The heap is updated and kept sorted when a cursor is updated.
*
*/
public class ManagedCursorContainer implements Iterable<ManagedCursor> {

/**
* This field is incremented everytime the cursor information is updated.
*/
private long version;

@Value
public static class CursorInfo {
ManagedCursor cursor;
PositionImpl position;

/**
* Cursor info's version.
* <p>
* Use {@link DataVersion#compareVersions(long, long)} to compare between two versions,
* since it rolls over to 0 once reaching Long.MAX_VALUE
*/
long version;
}

private static class Item {
final ManagedCursor cursor;
PositionImpl position;
Expand All @@ -56,10 +77,66 @@ private static class Item {
}
}

public ManagedCursorContainer() {
/**
* Utility class to manage a data version, which rolls over to 0 when reaching Long.MAX_VALUE.
*/
@UtilityClass
public class DataVersion {

/**
* Compares two data versions, which either rolls overs to 0 when reaching Long.MAX_VALUE.
* <p>
* Use {@link DataVersion#getNextVersion(long)} to increment the versions. The assumptions
* are that metric versions are compared with close time proximity one to another, hence,
* they are expected not close to each other in terms of distance, hence we don't
* expect the distance ever to exceed Long.MAX_VALUE / 2, otherwise we wouldn't be able
* to know which one is a later version in case the furthest rolls over to beyond 0. We
* assume the shortest distance between them dictates that.
* <p>
* @param v1 First version to compare
* @param v2 Second version to compare
* @return the value {@code 0} if {@code v1 == v2};
* a value less than {@code 0} if {@code v1 < v2}; and
* a value greater than {@code 0} if {@code v1 > v2}
*/
public static int compareVersions(long v1, long v2) {
if (v1 == v2) {
return 0;
}

// 0-------v1--------v2--------MAX_LONG
if (v2 > v1) {
long distance = v2 - v1;
long wrapAroundDistance = (Long.MAX_VALUE - v2) + v1;
if (distance < wrapAroundDistance) {
return -1;
} else {
return 1;
}

// 0-------v2--------v1--------MAX_LONG
} else {
long distance = v1 - v2;
long wrapAroundDistance = (Long.MAX_VALUE - v1) + v2;
if (distance < wrapAroundDistance) {
return 1; // v1 is bigger
} else {
return -1; // v2 is bigger
}
}
}

public static long getNextVersion(long existingVersion) {
if (existingVersion == Long.MAX_VALUE) {
return 0;
} else {
return existingVersion + 1;
}
}
}

public ManagedCursorContainer() {}

// Used to keep track of slowest cursor.
private final ArrayList<Item> heap = new ArrayList<>();

Expand Down Expand Up @@ -94,6 +171,7 @@ public void add(ManagedCursor cursor, Position position) {
if (cursor.isDurable()) {
durableCursorCount++;
}
version = DataVersion.getNextVersion(version);
} finally {
rwLock.unlockWrite(stamp);
}
Expand Down Expand Up @@ -129,6 +207,7 @@ public boolean removeCursor(String name) {
if (item.cursor.isDurable()) {
durableCursorCount--;
}
version = DataVersion.getNextVersion(version);
return true;
} else {
return false;
Expand Down Expand Up @@ -162,6 +241,7 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi

PositionImpl previousSlowestConsumer = heap.get(0).position;
item.position = (PositionImpl) newPosition;
version = DataVersion.getNextVersion(version);

if (heap.size() == 1) {
return Pair.of(previousSlowestConsumer, item.position);
Expand Down Expand Up @@ -204,6 +284,24 @@ public ManagedCursor getSlowestReader() {
}
}

/**
* @return Returns the CursorInfo for the cursor with the oldest position,
* or null if there aren't any tracked cursors
*/
public CursorInfo getCursorWithOldestPosition() {
long stamp = rwLock.readLock();
try {
if (heap.isEmpty()) {
return null;
} else {
Item item = heap.get(0);
return new CursorInfo(item.cursor, item.position, version);
}
} finally {
rwLock.unlockRead(stamp);
}
}

/**
* Check whether there are any cursors.
* @return true is there are no cursors and false if there are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -242,6 +243,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected volatile long lastAddEntryTimeMs = 0;
private long inactiveLedgerRollOverTimeMs = 0;

/** A signal that may trigger all the subsequent OpAddEntry of current ledger to be failed due to timeout. **/
protected volatile AtomicBoolean currentLedgerTimeoutTriggered;

protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
private static final String MIGRATION_STATE_PROPERTY = "migrated";
Expand Down Expand Up @@ -325,7 +329,7 @@ public enum PositionBound {

/**
* This variable is used for testing the tests.
* {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()}
* ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()
*/
@VisibleForTesting
Map<String, byte[]> createdLedgerCustomMetadata;
Expand Down Expand Up @@ -534,6 +538,7 @@ public void operationFailed(MetaStoreException e) {
STATE_UPDATER.set(this, State.LedgerOpened);
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
currentLedger = lh;
currentLedgerTimeoutTriggered = new AtomicBoolean();

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
// bypass empty ledgers, find last ledger with Message if possible.
Expand Down Expand Up @@ -776,7 +781,8 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx)

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}
Expand All @@ -792,7 +798,8 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}
Expand Down Expand Up @@ -844,6 +851,7 @@ protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {

// Write into lastLedger
addOperation.setLedger(currentLedger);
addOperation.setTimeoutTriggered(currentLedgerTimeoutTriggered);

++currentLedgerEntries;
currentLedgerSize += addOperation.data.readableBytes();
Expand Down Expand Up @@ -1587,6 +1595,7 @@ public void operationComplete(Void v, Stat stat) {
LedgerHandle originalCurrentLedger = currentLedger;
ledgers.put(lh.getId(), newLedger);
currentLedger = lh;
currentLedgerTimeoutTriggered = new AtomicBoolean();
currentLedgerEntries = 0;
currentLedgerSize = 0;
updateLedgersIdsComplete(originalCurrentLedger);
Expand Down Expand Up @@ -1670,9 +1679,11 @@ void createNewOpAddEntryForNewLedger() {
if (existsOp != null) {
// If op is used by another ledger handle, we need to close it and create a new one
if (existsOp.ledger != null) {
existsOp.close();
existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data,
existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
existsOp = existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
} else {
// This scenario should not happen.
log.warn("[{}] An OpAddEntry's ledger is empty.", name);
existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
Expand Down Expand Up @@ -2135,6 +2146,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
}

protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
mbean.addEntriesRead(1);
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
Expand Down Expand Up @@ -4210,13 +4222,14 @@ private void checkAddTimeout() {
}
OpAddEntry opAddEntry = pendingAddEntries.peek();
if (opAddEntry != null) {
final long finalAddOpCount = opAddEntry.addOpCount;
boolean isTimedOut = opAddEntry.lastInitTime != -1
&& TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec;
if (isTimedOut) {
log.error("Failed to add entry for ledger {} in time-out {} sec",
(opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger, finalAddOpCount);
log.warn("[{}] Failed to add entry {}:{} in time-out {} sec", this.name,
opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1,
opAddEntry.entryId, timeoutSec);
currentLedgerTimeoutTriggered.set(true);
opAddEntry.handleAddFailure(opAddEntry.ledger);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
private final Rate readEntriesOpsFailed = new Rate();
private final Rate readEntriesOpsCacheMisses = new Rate();
private final Rate markDeleteOps = new Rate();
private final Rate entriesRead = new Rate();

private final LongAdder dataLedgerOpenOp = new LongAdder();
private final LongAdder dataLedgerCloseOp = new LongAdder();
Expand Down Expand Up @@ -80,6 +81,7 @@ public void refreshStats(long period, TimeUnit unit) {
ledgerAddEntryLatencyStatsUsec.refresh();
ledgerSwitchLatencyStatsUsec.refresh();
entryStats.refresh();
entriesRead.calculateRate(seconds);
}

public void addAddEntrySample(long size) {
Expand Down Expand Up @@ -120,6 +122,10 @@ public void addReadEntriesSample(int count, long totalSize) {
readEntriesOps.recordMultipleEvents(count, totalSize);
}

public void addEntriesRead(int count) {
entriesRead.recordEvent(count);
}

public void startDataLedgerOpenOp() {
dataLedgerOpenOp.increment();
}
Expand Down Expand Up @@ -189,6 +195,11 @@ public String getName() {
return managedLedger.getName();
}

@Override
public long getEntriesReadTotalCount() {
return entriesRead.getTotalCount();
}

@Override
public double getAddEntryMessagesRate() {
return addEntryOps.getRate();
Expand Down Expand Up @@ -333,5 +344,4 @@ public PendingBookieOpsStats getPendingBookieOpsStats() {
result.cursorLedgerDeleteOp = cursorLedgerDeleteOp.longValue();
return result;
}

}
Loading
Loading