From a52f38fe7ce5051091505c4e26785cfbc7bf778a Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Mon, 16 Mar 2026 20:26:06 -0700 Subject: [PATCH] [fix](test) Fix BDB JE resource leak in BDBEnvironmentTest causing FE UT timeout (#61404) ### What problem does this PR solve? The following resource leak issues in BDBEnvironmentTest cause BDB JE checkpoint threads to enter an infinite VLSN consistency retry loop during JVM shutdown, blocking the Maven Surefire process from exiting and triggering TeamCity execution timeout (~3 hours): 1. testReadTxnIsNotMatched: reopens BDB JE master environment and database at the end but never closes them. 2. testRollbackException: has an early 'return' when fe1 is elected as master, which exits the method without closing any of the 3 BDB JE environments. Fix by wrapping the test logic of both multi-node cluster tests (testRollbackException, testReadTxnIsNotMatched) with try/finally blocks that guarantee all BDB JE environments are closed, regardless of whether the test passes, fails an assertion, or returns early. --- .../journal/bdbje/BDBEnvironmentTest.java | 324 +++++++++--------- 1 file changed, 167 insertions(+), 157 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java index b93766b3c9da7a..cae2ec0f699c59 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java @@ -501,124 +501,127 @@ public void testRollbackException() throws Exception { followersInfo.add(Pair.of(followerEnvironment, new NodeInfo(nodeName, nodeHostPort, nodeDir))); } - Pair masterPair = findMaster(followersInfo); - String beginDbName = String.valueOf(0L); - Database masterDb = masterPair.first.openDatabase(beginDbName); - DatabaseEntry key = new DatabaseEntry(randomBytes()); - DatabaseEntry value = new DatabaseEntry(randomBytes()); - Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value)); - Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size()); - LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir); - - for (Pair entryPair : followersInfo) { - if (entryPair.second.dir.equals(masterPair.second.dir)) { - LOG.info("skip {}", entryPair.second.name); - return; - } + try { + Pair masterPair = findMaster(followersInfo); + String beginDbName = String.valueOf(0L); + Database masterDb = masterPair.first.openDatabase(beginDbName); + DatabaseEntry key = new DatabaseEntry(randomBytes()); + DatabaseEntry value = new DatabaseEntry(randomBytes()); + Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value)); + Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size()); + LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir); - Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size()); - Database followerDb = entryPair.first.openDatabase(beginDbName); - DatabaseEntry readValue = new DatabaseEntry(); - Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.DEFAULT)); - Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); - followerDb.close(); - } + for (Pair entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterPair.second.dir)) { + LOG.info("skip {}", entryPair.second.name); + return; + } - masterDb.close(); - masterEnvironment.getEpochDB().close(); + Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size()); + Database followerDb = entryPair.first.openDatabase(beginDbName); + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.DEFAULT)); + Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); + followerDb.close(); + } - followersInfo.stream().forEach(entryPair -> { - entryPair.first.close(); - LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); - }); + masterDb.close(); + masterEnvironment.getEpochDB().close(); - // all follower closed - for (Pair entryPair : followersInfo) { - String followerCopyDir = entryPair.second.dir + "_copy"; - LOG.info("Copy from {} to {}", entryPair.second.dir, followerCopyDir); - FileUtils.copyDirectory(new File(entryPair.second.dir), new File(followerCopyDir)); - } + followersInfo.stream().forEach(entryPair -> { + entryPair.first.close(); + LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); + }); - followersInfo.stream().forEach(entryPair -> { - entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir)); - LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir); - }); + // all follower closed + for (Pair entryPair : followersInfo) { + String followerCopyDir = entryPair.second.dir + "_copy"; + LOG.info("Copy from {} to {}", entryPair.second.dir, followerCopyDir); + FileUtils.copyDirectory(new File(entryPair.second.dir), new File(followerCopyDir)); + } - masterPair = findMaster(followersInfo); + followersInfo.stream().forEach(entryPair -> { + entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir)); + LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir); + }); - masterDb = masterPair.first.openDatabase(String.valueOf(1L)); - for (int i = 0; i < 2 * Config.txn_rollback_limit + 10; i++) { - // for (int i = 0; i < 10; i++) { - OperationStatus status = masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes())); - Assertions.assertEquals(OperationStatus.SUCCESS, status); - } - Assertions.assertEquals(2, masterPair.first.getDatabaseNames().size()); - Assertions.assertEquals(0, masterPair.first.getDatabaseNames().get(0)); - Assertions.assertEquals(1, masterPair.first.getDatabaseNames().get(1)); + masterPair = findMaster(followersInfo); - followersInfo.stream().forEach(entryPair -> { - entryPair.first.close(); - LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); - }); - - // Restore follower's (not new master) bdbje dir - for (Pair entryPair : followersInfo) { - if (entryPair.second.dir.equals(masterDir)) { - String masterCopyDir = entryPair.second.dir + "_copy"; - FileUtils.deleteDirectory(new File(masterCopyDir)); - continue; + masterDb = masterPair.first.openDatabase(String.valueOf(1L)); + for (int i = 0; i < 2 * Config.txn_rollback_limit + 10; i++) { + // for (int i = 0; i < 10; i++) { + OperationStatus status = masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes())); + Assertions.assertEquals(OperationStatus.SUCCESS, status); } - LOG.info("Delete followerDir {} ", entryPair.second.dir); - FileUtils.deleteDirectory(new File(entryPair.second.dir)); - // FileUtils.moveDirectory(new File(entryPair.second.dir), new File(entryPair.second.dir + "_copy2")); - String followerCopyDir = entryPair.second.dir + "_copy"; - LOG.info("Move {} to {}", followerCopyDir, entryPair.second.dir); - FileUtils.moveDirectory(new File(followerCopyDir), new File(entryPair.second.dir)); - } + Assertions.assertEquals(2, masterPair.first.getDatabaseNames().size()); + Assertions.assertEquals(0, masterPair.first.getDatabaseNames().get(0)); + Assertions.assertEquals(1, masterPair.first.getDatabaseNames().get(1)); - Thread.sleep(1000); - for (Pair entryPair : followersInfo) { - if (entryPair.second.dir.equals(masterPair.second.dir)) { - LOG.info("skip open {} | {}", entryPair.second.name, entryPair.second.dir); - continue; + followersInfo.stream().forEach(entryPair -> { + entryPair.first.close(); + LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); + }); + + // Restore follower's (not new master) bdbje dir + for (Pair entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterDir)) { + String masterCopyDir = entryPair.second.dir + "_copy"; + FileUtils.deleteDirectory(new File(masterCopyDir)); + continue; + } + LOG.info("Delete followerDir {} ", entryPair.second.dir); + FileUtils.deleteDirectory(new File(entryPair.second.dir)); + // FileUtils.moveDirectory(new File(entryPair.second.dir), new File(entryPair.second.dir + "_copy2")); + String followerCopyDir = entryPair.second.dir + "_copy"; + LOG.info("Move {} to {}", followerCopyDir, entryPair.second.dir); + FileUtils.moveDirectory(new File(followerCopyDir), new File(entryPair.second.dir)); } - entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir)); - LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir); - } - BDBEnvironment newMasterEnvironment = null; - boolean found = false; - for (int i = 0; i < 300; i++) { + Thread.sleep(1000); for (Pair entryPair : followersInfo) { if (entryPair.second.dir.equals(masterPair.second.dir)) { + LOG.info("skip open {} | {}", entryPair.second.name, entryPair.second.dir); continue; } + entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir)); + LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir); + } - LOG.info("name:{} state:{} dir:{}", entryPair.first.getReplicatedEnvironment().getNodeName(), - entryPair.first.getReplicatedEnvironment().getState(), - entryPair.second.dir); - if (entryPair.first.getReplicatedEnvironment().getState().equals(ReplicatedEnvironment.State.MASTER)) { - newMasterEnvironment = entryPair.first; - found = true; + BDBEnvironment newMasterEnvironment = null; + boolean found = false; + for (int i = 0; i < 300; i++) { + for (Pair entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterPair.second.dir)) { + continue; + } + + LOG.info("name:{} state:{} dir:{}", entryPair.first.getReplicatedEnvironment().getNodeName(), + entryPair.first.getReplicatedEnvironment().getState(), + entryPair.second.dir); + if (entryPair.first.getReplicatedEnvironment().getState().equals(ReplicatedEnvironment.State.MASTER)) { + newMasterEnvironment = entryPair.first; + found = true; + break; + } + } + if (found) { break; } + Thread.sleep(1000); } - if (found) { - break; - } - Thread.sleep(1000); + Assertions.assertNotNull(newMasterEnvironment); + + masterDb = newMasterEnvironment.openDatabase(beginDbName); + Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes()))); + Assertions.assertEquals(1, newMasterEnvironment.getDatabaseNames().size()); + // // old master + masterEnvironment.openReplicatedEnvironment(new File(masterDir)); + } finally { + followersInfo.stream().forEach(entryPair -> { + entryPair.first.close(); + LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); + }); } - Assertions.assertNotNull(newMasterEnvironment); - - masterDb = newMasterEnvironment.openDatabase(beginDbName); - Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes()))); - Assertions.assertEquals(1, newMasterEnvironment.getDatabaseNames().size()); - // // old master - masterEnvironment.openReplicatedEnvironment(new File(masterDir)); - followersInfo.stream().forEach(entryPair -> { - entryPair.first.close(); - LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); - }); LOG.info("end"); } @@ -670,73 +673,80 @@ public void testReadTxnIsNotMatched() throws Exception { followersInfo.add(Pair.of(followerEnvironment, new NodeInfo(nodeName, nodeHostPort, nodeDir))); } - Pair masterPair = findMaster(followersInfo); - String beginDbName = String.valueOf(0L); - Database masterDb = masterPair.first.openDatabase(beginDbName); - DatabaseEntry key = new DatabaseEntry(randomBytes()); - DatabaseEntry value = new DatabaseEntry(randomBytes()); - Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value)); - Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size()); - LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir); + try { + Pair masterPair = findMaster(followersInfo); + String beginDbName = String.valueOf(0L); + Database masterDb = masterPair.first.openDatabase(beginDbName); + DatabaseEntry key = new DatabaseEntry(randomBytes()); + DatabaseEntry value = new DatabaseEntry(randomBytes()); + Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value)); + Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size()); + LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir); - for (Pair entryPair : followersInfo) { - if (entryPair.second.dir.equals(masterPair.second.dir)) { - LOG.info("skip {}", entryPair.second.name); - continue; - } - - Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size()); - Database followerDb = entryPair.first.openDatabase(beginDbName); - DatabaseEntry readValue = new DatabaseEntry(); - Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.DEFAULT)); - Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); - } + for (Pair entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterPair.second.dir)) { + LOG.info("skip {}", entryPair.second.name); + continue; + } - Field envImplField = ReplicatedEnvironment.class.getDeclaredField("repEnvironmentImpl"); - envImplField.setAccessible(true); - RepImpl impl = (RepImpl) envImplField.get(masterPair.first.getReplicatedEnvironment()); - Assertions.assertNotNull(impl); - - new Expectations(impl) {{ - // Below method will replicate log item to followers. - impl.registerVLSN(withNotNull()); - // Below method will wait until the logs are replicated. - impl.postLogCommitHook(withNotNull(), withNotNull()); - result = new InsufficientAcksException("mocked"); - }}; - - long count = masterDb.count(); - final Database oldMasterDb = masterDb; - Assertions.assertThrows(InsufficientAcksException.class, () -> { - // Since this key is not replicated to any replicas, it should not be read. - DatabaseEntry k = new DatabaseEntry(new byte[]{1, 2, 3}); - DatabaseEntry v = new DatabaseEntry(new byte[]{4, 5, 6}); - oldMasterDb.put(null, k, v); - }); + Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size()); + Database followerDb = entryPair.first.openDatabase(beginDbName); + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.DEFAULT)); + Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); + } - LOG.info("close old master {} | {}", masterPair.second.name, masterPair.second.dir); - masterDb.close(); - masterEnvironment.getEpochDB().close(); - masterEnvironment.close(); + Field envImplField = ReplicatedEnvironment.class.getDeclaredField("repEnvironmentImpl"); + envImplField.setAccessible(true); + RepImpl impl = (RepImpl) envImplField.get(masterPair.first.getReplicatedEnvironment()); + Assertions.assertNotNull(impl); + + new Expectations(impl) {{ + // Below method will replicate log item to followers. + impl.registerVLSN(withNotNull()); + // Below method will wait until the logs are replicated. + impl.postLogCommitHook(withNotNull(), withNotNull()); + result = new InsufficientAcksException("mocked"); + }}; + + long count = masterDb.count(); + final Database oldMasterDb = masterDb; + Assertions.assertThrows(InsufficientAcksException.class, () -> { + // Since this key is not replicated to any replicas, it should not be read. + DatabaseEntry k = new DatabaseEntry(new byte[]{1, 2, 3}); + DatabaseEntry v = new DatabaseEntry(new byte[]{4, 5, 6}); + oldMasterDb.put(null, k, v); + }); + + LOG.info("close old master {} | {}", masterPair.second.name, masterPair.second.dir); + masterDb.close(); + masterEnvironment.getEpochDB().close(); + masterEnvironment.close(); - for (Pair entryPair : followersInfo) { - if (entryPair.second.dir.equals(masterPair.second.dir)) { - LOG.info("skip {}", entryPair.second.name); - continue; + for (Pair entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterPair.second.dir)) { + LOG.info("skip {}", entryPair.second.name); + continue; + } + LOG.info("close follower {} | {}", entryPair.second.name, entryPair.second.dir); + entryPair.first.close(); } - LOG.info("close follower {} | {}", entryPair.second.name, entryPair.second.dir); - entryPair.first.close(); - } - masterPair.first.openReplicatedEnvironment(new File(masterPair.second.dir)); - masterDb = masterPair.first.openDatabase(beginDbName); - LOG.info("open {} | {}", masterPair.second.name, masterPair.second.dir); + masterPair.first.openReplicatedEnvironment(new File(masterPair.second.dir)); + masterDb = masterPair.first.openDatabase(beginDbName); + LOG.info("open {} | {}", masterPair.second.name, masterPair.second.dir); - // The local commit txn is readable!!! - Assertions.assertEquals(count + 1, masterDb.count()); + // The local commit txn is readable!!! + Assertions.assertEquals(count + 1, masterDb.count()); - key = new DatabaseEntry(new byte[]{1, 2, 3}); - DatabaseEntry readValue = new DatabaseEntry(); - Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null, key, readValue, LockMode.DEFAULT)); + key = new DatabaseEntry(new byte[]{1, 2, 3}); + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null, key, readValue, LockMode.DEFAULT)); + } finally { + followersInfo.stream().forEach(entryPair -> { + entryPair.first.close(); + LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); + }); + } } }