Skip to content

Commit

Permalink
9477: Direct memory / data source leak if teacher becomes inaccessibl…
Browse files Browse the repository at this point in the history
…e during reconnect (#9780)

Fixes: #9477
Reviewed-by: Oleg Mazurov <oleg.mazurov@swirldslabs.com>
Signed-off-by: Artem Ananev <artem.ananev@swirldslabs.com>
Signed-off-by: Nick Poorman <nick@swirldslabs.com>
  • Loading branch information
artemananiev authored and nickpoorman committed Nov 22, 2023
1 parent e8450a8 commit 48561de
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,25 +461,10 @@ public <K extends VirtualKey, V extends VirtualValue> void closeDataSource(
final int tableId = dataSource.getTableId();
assert dataSources.get(tableId) != null;
dataSources.set(tableId, null);
if (!primaryTables.contains(tableId)) {
// Delete data, if the table is secondary
removeTable(tableId);
}
}

/**
* For testing purpose only.
*
* Removes the table. Table config and table data files are deleted.
*
* @param tableId ID of the table to remove
*/
void removeTable(final int tableId) {
final TableMetadata metadata = tableConfigs.get(tableId);
if (metadata == null) {
throw new IllegalArgumentException("Unknown table ID: " + tableId);
}
assert dataSources.get(tableId) == null; // data source must have been already closed
final String label = metadata.tableName();
tableConfigs.set(tableId, null);
DataFileCommon.deleteDirectoryAndContents(getTableDir(label, tableId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,19 +757,6 @@ public boolean loadAndWriteHash(long path, SerializableDataOutputStream out) thr
return true;
}

/**
* Wait for any merges to finish and then close all data stores.
* <p>
* <b>After closing delete the database directory and all data!</b> For testing purpose only.
*/
public void closeAndDelete() throws IOException {
try {
close();
} finally {
database.removeTable(tableId);
}
}

/** Wait for any merges to finish, then close all data stores and free all resources. */
@Override
public void close() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void createAndCheckLeaves() throws IOException {

@AfterEach
public void afterEach() throws IOException {
dataSource.closeAndDelete();
dataSource.close();
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
// check the database was deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static com.swirlds.virtualmap.datasource.VirtualDataSource.INVALID_PATH;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -136,15 +137,15 @@ void createAndCheckInternalNodeHashes(final TestType testType, final int hashesR
assertEquals("path is less than 0", e.getMessage(), "Detail message should capture the failure");

// close data source
dataSource.closeAndDelete();
dataSource.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
// check the database was deleted
assertEventuallyFalse(
() -> Files.exists(testDirectory.resolve(tableName)),
Duration.ofSeconds(1),
"Database should have been deleted by closeAndDelete()");
"Database should have been deleted by close()");
}

private static Stream<Arguments> provideParameters() {
Expand Down Expand Up @@ -202,11 +203,11 @@ void testRandomHashUpdates(final TestType testType) throws IOException {
} catch (Exception e) {
e.printStackTrace();
// close data source
dataSource.closeAndDelete();
dataSource.close();
System.exit(1);
} finally {
// close data source
dataSource.closeAndDelete();
dataSource.close();
}
}

Expand Down Expand Up @@ -238,7 +239,7 @@ void createAndCheckLeaves(final TestType testType) throws IOException {
assertEquals("path is less than 0", e.getMessage(), "Detail message should capture the failure");

// close data source
dataSource.closeAndDelete();
dataSource.close();
}

@ParameterizedTest
Expand Down Expand Up @@ -298,7 +299,7 @@ void updateLeaves(final TestType testType) throws IOException, InterruptedExcept
IntStream.range(incFirstLeafPath + 21, exclLastLeafPath)
.forEach(i -> assertLeaf(testType, dataSource, i, i, i + 10_000));
// close data source
dataSource.closeAndDelete();
dataSource.close();

// check db count
assertEventuallyEquals(
Expand Down Expand Up @@ -341,7 +342,7 @@ void moveLeaf(final TestType testType) throws IOException {
"creating/loading same LeafRecord gives different results");
assertLeaf(testType, dataSource, 250, 500);
// close data source
dataSource.closeAndDelete();
dataSource.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
Expand Down Expand Up @@ -370,7 +371,7 @@ void preservesInterruptStatusWhenInterruptedSavingRecords() throws IOException,
"Thread interrupt status should NOT be cleared (two total interrupts)");
savingThread.join();
// close data source
dataSource.closeAndDelete();
dataSource.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
Expand Down Expand Up @@ -401,10 +402,10 @@ void createCloseSnapshotCheckDelete(final TestType testType) throws IOException
dataSource.getDatabase().snapshot(snapshotDbPath, dataSource);
// close data source
dataSource.close();
// check directory still exists and temporary snapshot path does not
assertTrue(
// check directory is deleted on close
assertFalse(
Files.exists(originalDb.getTableDir(tableName, dataSource.getTableId())),
"Database dir should still exist");
"Data source dir should be deleted");
final MerkleDb snapshotDb = MerkleDb.getInstance(snapshotDbPath);
assertTrue(
Files.exists(snapshotDb.getTableDir(tableName, dataSource.getTableId())),
Expand All @@ -415,7 +416,7 @@ void createCloseSnapshotCheckDelete(final TestType testType) throws IOException
// check all the leaf data
IntStream.range(count, count * 2).forEach(i -> assertLeaf(testType, dataSource2, i, i));
// close data source
dataSource2.closeAndDelete();
dataSource2.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
Expand Down Expand Up @@ -449,7 +450,7 @@ void preservesInterruptStatusWhenInterruptedClosing() throws IOException, Interr
closingThread.join();
savingThread.join();
// close data source
dataSource.closeAndDelete();
dataSource.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
Expand Down Expand Up @@ -505,7 +506,7 @@ void testKeyIndexTypes(final TestType testType) throws Exception {
"Data source in expected long key mode.");
} finally {
// close data source
dataSource.closeAndDelete();
dataSource.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ void testVirtualKeySet() throws IOException {
Assertions.assertEquals(tableConfig, dataSource.getTableConfig());
Assertions.assertEquals(tableConfig, instance.getTableConfig(tableId));
dataSource.close();
// Table config is preserved across data source close/reopen
Assertions.assertNotNull(instance.getTableConfig(tableId));
// Table config deleted on data source close
Assertions.assertNull(instance.getTableConfig(tableId));
}

@Test
Expand Down Expand Up @@ -220,12 +220,7 @@ public void testGetDataSourceAfterClose() throws IOException {
instance.createDataSource(tableName, tableConfig, false);
Assertions.assertNotNull(dataSource);
dataSource.close();
final MerkleDbDataSource<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> dataSource2 =
instance.getDataSource(tableName, false);
Assertions.assertNotNull(dataSource2);
Assertions.assertEquals(dataSource2, dataSource);
Assertions.assertNotSame(dataSource2, dataSource);
dataSource2.close();
Assertions.assertThrows(IllegalStateException.class, () -> instance.getDataSource(tableName, false));
}

@Test
Expand Down Expand Up @@ -278,7 +273,6 @@ public void testSnapshot() throws IOException {
instance2.getDataSource(tableName1, false);
Assertions.assertEquals(tableConfig, restored1.getTableConfig());
restored1.close();
Assertions.assertEquals(tableConfig, instance2.getTableConfig(restored1.getTableId()));
final MerkleDbDataSource<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> restored2 =
instance2.getDataSource(tableName2, false);
Assertions.assertEquals(tableConfig, restored2.getTableConfig());
Expand Down Expand Up @@ -464,10 +458,10 @@ void testCopiedDataSourceAutoDeleted() throws IOException {
inactiveCopy1.close();
activeCopy2.close();

Assertions.assertTrue(Files.exists(instance.getTableDir(tableName1, dataSource1.getTableId())));
Assertions.assertFalse(Files.exists(instance.getTableDir(tableName1, dataSource1.getTableId())));
Assertions.assertFalse(Files.exists(instance.getTableDir(tableName1, inactiveCopy1.getTableId())));
Assertions.assertFalse(Files.exists(instance.getTableDir(tableName2, dataSource2.getTableId())));
Assertions.assertTrue(Files.exists(instance.getTableDir(tableName2, activeCopy2.getTableId())));
Assertions.assertFalse(Files.exists(instance.getTableDir(tableName2, activeCopy2.getTableId())));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ public VirtualRootNode<K, V> copy() {
protected void destroyNode() {
if (pipeline != null) {
pipeline.destroyCopy(this);
} else {
logger.info(
VIRTUAL_MERKLE_STATS.getMarker(),
"Destroying virtual root node at route {}, but its pipeline is null. It may happen during failed reconnect",
getRoute());
closeDataSource();
}
}

Expand Down Expand Up @@ -953,15 +959,17 @@ public V remove(final K key) {
*/
@Override
public void onShutdown(final boolean immediately) {

if (immediately) {
// If immediate shutdown is required then let the hasher know it is being stopped. If shutdown
// is not immediate, the hasher will eventually stop once it finishes all of its work.
hasher.shutdown();
}
closeDataSource();
}

// We can now try to shut down the data source. If this doesn't shut things down, then there
// isn't much we can do aside from logging the fact. The node may well die before too long.
private void closeDataSource() {
// Shut down the data source. If this doesn't shut things down, then there isn't
// much we can do aside from logging the fact. The node may well die before too long
if (dataSource != null) {
try {
dataSource.close();
Expand Down

0 comments on commit 48561de

Please sign in to comment.