Skip to content

Commit

Permalink
Optimize batch read and caching in compactor
Browse files Browse the repository at this point in the history
Read cache does not cache checkpoint entries. This invalidates the
use of checkpoint batch read in stream layer. This PR adds the extra
parameter CHECKPOINT_READ_BATCH_SIZE to compactor_runner.py and set
it to 1 for compactor.

This PR also enables read cache for compactor to allow just one batch
read of non-checkpoint entries to be cached. This optimizes the read
efficiency of checkpointer.
  • Loading branch information
Lujie1996 authored and Maithem committed Jun 15, 2023
1 parent d16178d commit 4b69c44
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,18 @@ public void buildRuntime() {
builder.bulkReadSize(Integer.parseInt(bulkReadSizeStr));
});

getOpt("--maxCacheEntries").ifPresent(maxCacheEntries -> {
builder.maxCacheEntries(Integer.parseInt(maxCacheEntries));
});

builder.maxMvoCacheEntries(0);

builder.clientName(host);
builder.systemDownHandlerTriggerLimit(SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT)
.systemDownHandler(defaultSystemDownHandler);

params = builder
.priorityLevel(PriorityLevel.HIGH)
.cacheDisabled(true)
.build();
}

Expand Down Expand Up @@ -128,6 +133,7 @@ public static class CompactorCmdLineHelper {
"[--persistedCacheRoot=<pathToTempDirForLargeTables>] " +
"[--maxWriteSize=<maxWriteSizeLimit>] " +
"[--bulkReadSize=<bulkReadSize>] " +
"[--maxCacheEntries=<maxCacheEntries>] " +
"[--tlsEnabled=<tls_enabled>]";

public static final String OPTIONS_PARAMS =
Expand All @@ -139,7 +145,8 @@ public static class CompactorCmdLineHelper {
+ "--truststore_password=<truststore_password> Truststore Password\n"
+ "--persistedCacheRoot=<pathToTempDirForLargeTables> Path to Temp Dir\n"
+ "--maxWriteSize=<maxWriteSize> Max write size smaller than 2GB\n"
+ "--bulkReadSize=<bulkReadSize> Read size for chain replication\n"
+ "--bulkReadSize=<bulkReadSize> Number of log entries read in one batch\n"
+ "--maxCacheEntries=<maxCacheEntries> AddressSpaceView read cache size\n"
+ "--tlsEnabled=<tls_enabled>";
}
}
4 changes: 2 additions & 2 deletions runtime/src/main/java/org/corfudb/runtime/CorfuRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public static class CorfuRuntimeParameters extends RuntimeParameters {
* Checkpoint read Batch Size: number of checkpoint addresses to fetch in batch when stream
* address discovery mechanism relies on address maps instead of follow backpointers;
*/
int checkpointReadBatchSize = 5;
int checkpointReadBatchSize = 1;

/*
* Cache Option for local writes.
Expand Down Expand Up @@ -440,7 +440,7 @@ public static class CorfuRuntimeParametersBuilder extends RuntimeParametersBuild
private int checkpointBatchSize = 50;
private int restoreBatchSize = 50;
private int streamBatchSize = 10;
private int checkpointReadBatchSize = 5;
private int checkpointReadBatchSize = 1;
private Duration runtimeGCPeriod = Duration.ofMinutes(20);
private UUID clusterId = null;
private int systemDownHandlerTriggerLimit = 20;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ public AddressSpaceView(@Nonnull final CorfuRuntime runtime) {
cacheBuilder.maximumSize(maxCacheEntries);
log.info("AddressSpaceView readCache size is set to {}", maxCacheEntries);

if (maxCacheEntries == 0) {
log.warn("Since AddressSpaceView readCache size is 0, " +
"overriding CorfuRuntime bulkReadSize and checkpointReadBatchSize to 1.");
runtime.getParameters().setBulkReadSize(1);
runtime.getParameters().setCheckpointReadBatchSize(1);
}

if (concurrencyLevel != 0) {
cacheBuilder.concurrencyLevel(concurrencyLevel);
}
Expand Down
2 changes: 2 additions & 0 deletions scripts/compactor_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

COMPACTOR_CONTROLS_CLASS_NAME = "org.corfudb.compactor.CompactorController"
COMPACTOR_CHECKPOINTER_CLASS_NAME = "org.corfudb.compactor.CompactorCheckpointer"
MAX_CACHE_ENTRIES = 20
COMPACTOR_BULK_READ_SIZE = 20
COMPACTOR_JVM_XMX = 1024
FORCE_DISABLE_CHECKPOINTING = "FORCE_DISABLE_CHECKPOINTING"
Expand Down Expand Up @@ -146,6 +147,7 @@ def get_corfu_compactor_cmd(self, compactor_config, class_to_invoke):
cmd.append("--port=" + self._config.corfu_port)
cmd.append("--tlsEnabled=true")
cmd.append("--bulkReadSize=" + str(COMPACTOR_BULK_READ_SIZE))
cmd.append("--maxCacheEntries=" + str(MAX_CACHE_ENTRIES))

Security = compactor_config["Security"]
cmd.append("--keystore=" + Security["Keystore"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ public class CompactorConfigUnitTest {
private static final String truststore = "truststore";
private static final String ks_password = "ks_password";
private static final String truststore_password = "truststore_password";
private static final int bulkReadSize = 50;
private static final int bulkReadSize = 20;
private static final int maxCacheEntries = 20;
private static final String SPACE = " ";
private final String baseCmd = "--hostname=" + hostname + " --port=" + port +
" --tlsEnabled=true --bulkReadSize=" + bulkReadSize + " --keystore=" + keystore + " --ks_password=" +
" --tlsEnabled=true --bulkReadSize=" + bulkReadSize +
" --maxCacheEntries=" + maxCacheEntries +
" --keystore=" + keystore + " --ks_password=" +
ks_password + " --truststore=" + truststore + " --truststore_password=" + truststore_password;

@Test
Expand All @@ -40,7 +43,9 @@ public void testCompactorControlsConfigSuccess() {
.systemDownHandlerTriggerLimit(CompactorBaseConfig.SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT)
.systemDownHandler(corfuCompactorControlsConfig.getDefaultSystemDownHandler())
.clientName(hostname)
.cacheDisabled(true)
.cacheDisabled(false)
.maxCacheEntries(bulkReadSize)
.maxMvoCacheEntries(0)
.build();

assertEquals(Optional.empty(), corfuCompactorControlsConfig.getPersistedCacheRoot());
Expand Down

0 comments on commit 4b69c44

Please sign in to comment.