Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

9477: Direct memory / data source leak if teacher becomes inaccessible during reconnect #9780

Merged
merged 4 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -461,25 +461,10 @@ public <K extends VirtualKey, V extends VirtualValue> void closeDataSource(
final int tableId = dataSource.getTableId();
assert dataSources.get(tableId) != null;
dataSources.set(tableId, null);
if (!primaryTables.contains(tableId)) {
// Delete data, if the table is secondary
removeTable(tableId);
}
}

/**
* For testing purpose only.
*
* Removes the table. Table config and table data files are deleted.
*
* @param tableId ID of the table to remove
*/
void removeTable(final int tableId) {
final TableMetadata metadata = tableConfigs.get(tableId);
if (metadata == null) {
throw new IllegalArgumentException("Unknown table ID: " + tableId);
}
assert dataSources.get(tableId) == null; // data source must have been already closed
final String label = metadata.tableName();
tableConfigs.set(tableId, null);
DataFileCommon.deleteDirectoryAndContents(getTableDir(label, tableId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,19 +757,6 @@ public boolean loadAndWriteHash(long path, SerializableDataOutputStream out) thr
return true;
}

/**
* Wait for any merges to finish and then close all data stores.
* <p>
* <b>After closing delete the database directory and all data!</b> For testing purpose only.
*/
public void closeAndDelete() throws IOException {
try {
close();
} finally {
database.removeTable(tableId);
}
}

/** Wait for any merges to finish, then close all data stores and free all resources. */
@Override
public void close() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void createAndCheckLeaves() throws IOException {

@AfterEach
public void afterEach() throws IOException {
dataSource.closeAndDelete();
dataSource.close();
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
// check the database was deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static com.swirlds.virtualmap.datasource.VirtualDataSource.INVALID_PATH;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -136,15 +137,15 @@ void createAndCheckInternalNodeHashes(final TestType testType, final int hashesR
assertEquals("path is less than 0", e.getMessage(), "Detail message should capture the failure");

// close data source
dataSource.closeAndDelete();
dataSource.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
// check the database was deleted
assertEventuallyFalse(
() -> Files.exists(testDirectory.resolve(tableName)),
Duration.ofSeconds(1),
"Database should have been deleted by closeAndDelete()");
"Database should have been deleted by close()");
}

private static Stream<Arguments> provideParameters() {
Expand Down Expand Up @@ -202,11 +203,11 @@ void testRandomHashUpdates(final TestType testType) throws IOException {
} catch (Exception e) {
e.printStackTrace();
// close data source
dataSource.closeAndDelete();
dataSource.close();
System.exit(1);
} finally {
// close data source
dataSource.closeAndDelete();
dataSource.close();
}
}

Expand Down Expand Up @@ -238,7 +239,7 @@ void createAndCheckLeaves(final TestType testType) throws IOException {
assertEquals("path is less than 0", e.getMessage(), "Detail message should capture the failure");

// close data source
dataSource.closeAndDelete();
dataSource.close();
}

@ParameterizedTest
Expand Down Expand Up @@ -298,7 +299,7 @@ void updateLeaves(final TestType testType) throws IOException, InterruptedExcept
IntStream.range(incFirstLeafPath + 21, exclLastLeafPath)
.forEach(i -> assertLeaf(testType, dataSource, i, i, i + 10_000));
// close data source
dataSource.closeAndDelete();
dataSource.close();

// check db count
assertEventuallyEquals(
Expand Down Expand Up @@ -341,7 +342,7 @@ void moveLeaf(final TestType testType) throws IOException {
"creating/loading same LeafRecord gives different results");
assertLeaf(testType, dataSource, 250, 500);
// close data source
dataSource.closeAndDelete();
dataSource.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
Expand Down Expand Up @@ -370,7 +371,7 @@ void preservesInterruptStatusWhenInterruptedSavingRecords() throws IOException,
"Thread interrupt status should NOT be cleared (two total interrupts)");
savingThread.join();
// close data source
dataSource.closeAndDelete();
dataSource.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
Expand Down Expand Up @@ -401,10 +402,10 @@ void createCloseSnapshotCheckDelete(final TestType testType) throws IOException
dataSource.getDatabase().snapshot(snapshotDbPath, dataSource);
// close data source
dataSource.close();
// check directory still exists and temporary snapshot path does not
assertTrue(
// check directory is deleted on close
assertFalse(
Files.exists(originalDb.getTableDir(tableName, dataSource.getTableId())),
"Database dir should still exist");
"Data source dir should be deleted");
final MerkleDb snapshotDb = MerkleDb.getInstance(snapshotDbPath);
assertTrue(
Files.exists(snapshotDb.getTableDir(tableName, dataSource.getTableId())),
Expand All @@ -415,7 +416,7 @@ void createCloseSnapshotCheckDelete(final TestType testType) throws IOException
// check all the leaf data
IntStream.range(count, count * 2).forEach(i -> assertLeaf(testType, dataSource2, i, i));
// close data source
dataSource2.closeAndDelete();
dataSource2.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
Expand Down Expand Up @@ -449,7 +450,7 @@ void preservesInterruptStatusWhenInterruptedClosing() throws IOException, Interr
closingThread.join();
savingThread.join();
// close data source
dataSource.closeAndDelete();
dataSource.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
Expand Down Expand Up @@ -505,7 +506,7 @@ void testKeyIndexTypes(final TestType testType) throws Exception {
"Data source in expected long key mode.");
} finally {
// close data source
dataSource.closeAndDelete();
dataSource.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ void testVirtualKeySet() throws IOException {
Assertions.assertEquals(tableConfig, dataSource.getTableConfig());
Assertions.assertEquals(tableConfig, instance.getTableConfig(tableId));
dataSource.close();
// Table config is preserved across data source close/reopen
Assertions.assertNotNull(instance.getTableConfig(tableId));
// Table config deleted on data source close
Assertions.assertNull(instance.getTableConfig(tableId));
}

@Test
Expand Down Expand Up @@ -220,12 +220,7 @@ public void testGetDataSourceAfterClose() throws IOException {
instance.createDataSource(tableName, tableConfig, false);
Assertions.assertNotNull(dataSource);
dataSource.close();
final MerkleDbDataSource<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> dataSource2 =
instance.getDataSource(tableName, false);
Assertions.assertNotNull(dataSource2);
Assertions.assertEquals(dataSource2, dataSource);
Assertions.assertNotSame(dataSource2, dataSource);
dataSource2.close();
Assertions.assertThrows(IllegalStateException.class, () -> instance.getDataSource(tableName, false));
}

@Test
Expand Down Expand Up @@ -278,7 +273,6 @@ public void testSnapshot() throws IOException {
instance2.getDataSource(tableName1, false);
Assertions.assertEquals(tableConfig, restored1.getTableConfig());
restored1.close();
Assertions.assertEquals(tableConfig, instance2.getTableConfig(restored1.getTableId()));
final MerkleDbDataSource<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> restored2 =
instance2.getDataSource(tableName2, false);
Assertions.assertEquals(tableConfig, restored2.getTableConfig());
Expand Down Expand Up @@ -464,10 +458,10 @@ void testCopiedDataSourceAutoDeleted() throws IOException {
inactiveCopy1.close();
activeCopy2.close();

Assertions.assertTrue(Files.exists(instance.getTableDir(tableName1, dataSource1.getTableId())));
Assertions.assertFalse(Files.exists(instance.getTableDir(tableName1, dataSource1.getTableId())));
Assertions.assertFalse(Files.exists(instance.getTableDir(tableName1, inactiveCopy1.getTableId())));
Assertions.assertFalse(Files.exists(instance.getTableDir(tableName2, dataSource2.getTableId())));
Assertions.assertTrue(Files.exists(instance.getTableDir(tableName2, activeCopy2.getTableId())));
Assertions.assertFalse(Files.exists(instance.getTableDir(tableName2, activeCopy2.getTableId())));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ public VirtualRootNode<K, V> copy() {
protected void destroyNode() {
if (pipeline != null) {
pipeline.destroyCopy(this);
} else {
logger.info(
OlegMazurov marked this conversation as resolved.
Show resolved Hide resolved
VIRTUAL_MERKLE_STATS.getMarker(),
"Destroying virtual root node at route {}, but its pipeline is null. It may happen during failed reconnect",
getRoute());
closeDataSource();
}
}

Expand Down Expand Up @@ -953,15 +959,17 @@ public V remove(final K key) {
*/
@Override
public void onShutdown(final boolean immediately) {

if (immediately) {
// If immediate shutdown is required then let the hasher know it is being stopped. If shutdown
// is not immediate, the hasher will eventually stop once it finishes all of its work.
hasher.shutdown();
}
closeDataSource();
}

// We can now try to shut down the data source. If this doesn't shut things down, then there
// isn't much we can do aside from logging the fact. The node may well die before too long.
private void closeDataSource() {
// Shut down the data source. If this doesn't shut things down, then there isn't
// much we can do aside from logging the fact. The node may well die before too long
if (dataSource != null) {
try {
dataSource.close();
Expand Down