Skip to content

Commit

Permalink
Optimize disk usage for disk-backed compaction (#3493)
Browse files Browse the repository at this point in the history
Delete RocksDB files for every corfu table right after
the checkpointing finishes.
  • Loading branch information
Lujie1996 committed Jan 26, 2023
1 parent 198b0bb commit d65be9b
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class CheckpointWriter<T extends ICorfuTable<?, ?>> {
/** Local ref to the object that we're dumping.
* TODO: generalize to all SMR objects.
*/
@Getter
private final T corfuTable;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ private CheckpointingStatus appendCheckpoint(TableName tableName,
try {
CheckpointWriter<ICorfuTable<?,?>> cpw = checkpointWriterFn.apply(tableName);
cpw.appendCheckpoint(Optional.of(livenessUpdater));
cpw.getCorfuTable().close();
returnStatus = StatusType.COMPLETED;
break;
} catch (RuntimeException re) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface ICorfuTable<K, V> {

void clear();

void close();

boolean isTableCached();

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,18 @@ public static Options getPersistedStreamingMapOptions() {
private final ISerializer serializer;
private final RocksDB rocksDb;

private final String absolutePathString;

private Options options;

public PersistedStreamingMap(@NonNull Path dataPath,
@NonNull Options options,
@NonNull ISerializer serializer,
@NonNull CorfuRuntime corfuRuntime) {
this.absolutePathString = dataPath.toFile().getAbsolutePath();
this.options = options;
try {
RocksDB.destroyDB(dataPath.toFile().getAbsolutePath(), options);
RocksDB.destroyDB(this.absolutePathString, this.options);
this.rocksDb = RocksDB.open(options, dataPath.toFile().getAbsolutePath());
} catch (RocksDBException e) {
throw new UnrecoverableCorfuError(e);
Expand Down Expand Up @@ -311,5 +317,13 @@ public Stream<Entry<K, V>> entryStream() {
@Override
public void close() {
this.rocksDb.close();

// Release disk space
try {
RocksDB.destroyDB(this.absolutePathString, this.options);
log.info("Cleared RocksDB data on {}", absolutePathString);
} catch (RocksDBException e) {
throw new UnrecoverableCorfuError(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public void clear() {
proxy.logUpdate("clear", false, null);
}

@Override
public void close() {
// NO OP
}

@Override
public boolean isTableCached() {
return ((MVOCorfuCompileProxy)proxy).isObjectCached();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void setup() {
doNothing().when(txn).putRecord(any(), any(), any(), any());
doNothing().when(txn).delete(anyString(), any(TableName.class));
when(txn.commit()).thenReturn(Timestamp.getDefaultInstance());
when(cpw.getCorfuTable()).thenReturn(mock(CorfuTable.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,21 @@ void verifyCommitAddressInterleavingTxn(@ForAll @Size(SAMPLE_SIZE) Set<String> i
}
}

/**
* Verify RocksDB persisted cache is cleaned up
*/
@Property(tries = NUM_OF_TRIES)
void verifyPersistedCacheCleanUp() {
resetTests();
assertThat(persistedCacheLocation).doesNotExist();
try (final CorfuTable<String, String> table1 = setupTable(alternateMapName)) {
table1.put(defaultNewMapEntry, defaultNewMapEntry);
assertThat(persistedCacheLocation).exists();
table1.close();
assertThat(persistedCacheLocation).doesNotExist();
}
}

/**
* A custom generator for a set of {@link Uuid}.
*/
Expand Down

0 comments on commit d65be9b

Please sign in to comment.