Skip to content

Commit ec5dca1

Browse files
poorbarcodeganesh-ctds
authored andcommitted
[fix][broker]Fix memory leak when using a customized ManagedLedger implementation (apache#25016)
(cherry picked from commit 3937788) (cherry picked from commit 7adf72f)
1 parent e2e64bc commit ec5dca1

File tree

5 files changed

+1237
-2
lines changed

5 files changed

+1237
-2
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,8 @@ public interface VoidCallback {
337337
void operationFailed(ManagedLedgerException exception);
338338
}
339339

340-
ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName) {
340+
@VisibleForTesting
341+
protected ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName) {
341342
this.bookkeeper = bookkeeper;
342343
this.cursorProperties = Collections.emptyMap();
343344
this.ledger = ledger;

pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
5454
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
5555
private static final String DEFAULT_STORAGE_CLASS_NAME = "bookkeeper";
5656
private BookkeeperManagedLedgerStorageClass defaultStorageClass;
57-
private ManagedLedgerFactory managedLedgerFactory;
57+
@VisibleForTesting
58+
protected ManagedLedgerFactory managedLedgerFactory;
5859
private BookKeeper defaultBkClient;
5960
private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
6061
bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().recordStats().buildAsync();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4067,6 +4067,10 @@ public void readEntryComplete(Entry entry, Object ctx) {
40674067
} catch (IOException e) {
40684068
log.warn("[{}] [{}] Error while getting the oldest message", topic, cursor.toString(), e);
40694069
res.complete(false);
4070+
} finally {
4071+
if (entry != null) {
4072+
entry.release();
4073+
}
40704074
}
40714075

40724076
}

0 commit comments

Comments
 (0)