Skip to content

Commit

Permalink
Optimize batch read and caching in compactor
Browse files Browse the repository at this point in the history
Read cache does not cache checkpoint entries. This invalidates the
use of checkpoint batch read in stream layer. This PR adds the extra
parameter CHECKPOINT_READ_BATCH_SIZE to compactor_runner.py and set
it to 1 for compactor.

This PR also enables read cache for compactor to allow just one batch
read of non-checkpoint entries to be cached. This optimizes the read
efficiency of checkpointer.
  • Loading branch information
Lujie1996 committed Jun 8, 2023
1 parent f2ec6e1 commit db6d7eb
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,20 @@ public void buildRuntime() {
builder.bulkReadSize(Integer.parseInt(bulkReadSizeStr));
});

getOpt("--checkpointReadBatchSize").ifPresent(checkpointReadBatchSize -> {
builder.checkpointReadBatchSize(Integer.parseInt(checkpointReadBatchSize));
});

getOpt("--maxCacheEntries").ifPresent(maxCacheEntries -> {
builder.maxCacheEntries(Integer.parseInt(maxCacheEntries));
});

builder.clientName(host);
builder.systemDownHandlerTriggerLimit(SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT)
.systemDownHandler(defaultSystemDownHandler);

params = builder
.priorityLevel(PriorityLevel.HIGH)
.cacheDisabled(true)
.build();
}

Expand Down Expand Up @@ -128,6 +135,8 @@ public static class CompactorCmdLineHelper {
"[--persistedCacheRoot=<pathToTempDirForLargeTables>] " +
"[--maxWriteSize=<maxWriteSizeLimit>] " +
"[--bulkReadSize=<bulkReadSize>] " +
"[--checkpointReadBatchSize=<checkpointReadBatchSize>] " +
"[--maxCacheEntries=<maxCacheEntries>] " +
"[--tlsEnabled=<tls_enabled>]";

public static final String OPTIONS_PARAMS =
Expand All @@ -139,7 +148,9 @@ public static class CompactorCmdLineHelper {
+ "--truststore_password=<truststore_password> Truststore Password\n"
+ "--persistedCacheRoot=<pathToTempDirForLargeTables> Path to Temp Dir\n"
+ "--maxWriteSize=<maxWriteSize> Max write size smaller than 2GB\n"
+ "--bulkReadSize=<bulkReadSize> Read size for chain replication\n"
+ "--bulkReadSize=<bulkReadSize> Number of log entries read in one batch\n"
+ "--checkpointReadBatchSize=<checkpointReadBatchSize> Number of CP entries scanned in one batch\n"
+ "--maxCacheEntries=<maxCacheEntries> AddressSpaceView read cache size\n"
+ "--tlsEnabled=<tls_enabled>";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ public AddressSpaceView(@Nonnull final CorfuRuntime runtime) {
cacheBuilder.maximumSize(maxCacheEntries);
log.info("AddressSpaceView readCache size is set to {}", maxCacheEntries);

if (maxCacheEntries == 0) {
log.warn("Since AddressSpaceView readCache size is 0, " +
"overriding CorfuRuntime bulkReadSize and checkpointReadBatchSize to 1.");
runtime.getParameters().setBulkReadSize(1);
runtime.getParameters().setCheckpointReadBatchSize(1);
}

if (concurrencyLevel != 0) {
cacheBuilder.concurrencyLevel(concurrencyLevel);
}
Expand Down
4 changes: 4 additions & 0 deletions scripts/compactor_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@

COMPACTOR_CONTROLS_CLASS_NAME = "org.corfudb.compactor.CompactorController"
COMPACTOR_CHECKPOINTER_CLASS_NAME = "org.corfudb.compactor.CompactorCheckpointer"
MAX_CACHE_ENTRIES = 20
COMPACTOR_BULK_READ_SIZE = 20
CHECKPOINT_READ_BATCH_SIZE = 1
COMPACTOR_JVM_XMX = 1024
FORCE_DISABLE_CHECKPOINTING = "FORCE_DISABLE_CHECKPOINTING"
POD_NAME = "POD_NAME"
Expand Down Expand Up @@ -187,6 +189,8 @@ def get_corfu_compactor_cmd(self, compactor_config, class_to_invoke):
cmd.append("--port=" + self._config.corfu_port)
cmd.append("--tlsEnabled=true")
cmd.append("--bulkReadSize=" + str(COMPACTOR_BULK_READ_SIZE))
cmd.append("--checkpointReadBatchSize=" + str(CHECKPOINT_READ_BATCH_SIZE))
cmd.append("--maxCacheEntries=" + str(MAX_CACHE_ENTRIES))

Security = compactor_config["Security"]
cmd.append("--keystore=" + Security["Keystore"])
Expand Down

0 comments on commit db6d7eb

Please sign in to comment.