Skip to content

Commit

Permalink
Merge pull request #4 from amishra-u/hot_keys_2
Browse files Browse the repository at this point in the history
Hot keys 2
  • Loading branch information
amishra-u committed Nov 8, 2023
2 parents 28c89a1 + 350b807 commit 1244187
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 3 deletions.
10 changes: 8 additions & 2 deletions src/main/java/build/buildfarm/cas/MemoryCAS.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
public class MemoryCAS implements ContentAddressableStorage {
private final long maxSizeInBytes;
private final Consumer<Digest> onPut;
private final Consumer<Digest> onReadComplete;

@GuardedBy("this")
private final Map<String, Entry> storage;
Expand All @@ -63,13 +64,17 @@ public class MemoryCAS implements ContentAddressableStorage {
private final Writes writes = new Writes(this);

public MemoryCAS(long maxSizeInBytes) {
this(maxSizeInBytes, (digest) -> {}, /* delegate=*/ null);
this(maxSizeInBytes, (digest) -> {}, /* onReadComplete=*/ (digest) -> {}, /* delegate=*/ null);
}

public MemoryCAS(
long maxSizeInBytes, Consumer<Digest> onPut, ContentAddressableStorage delegate) {
long maxSizeInBytes,
Consumer<Digest> onPut,
Consumer<Digest> onReadComplete,
ContentAddressableStorage delegate) {
this.maxSizeInBytes = maxSizeInBytes;
this.onPut = onPut;
this.onReadComplete = onReadComplete;
this.delegate = delegate;
sizeInBytes = 0;
header.before = header.after = header;
Expand Down Expand Up @@ -223,6 +228,7 @@ private synchronized Entry getEntry(Digest digest) {
return null;
}
e.recordAccess(header);
onReadComplete.accept(digest);
return e;
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/build/buildfarm/cas/cfc/CASFileCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public abstract class CASFileCache implements ContentAddressableStorage {
private final DigestUtil digestUtil;
private final ConcurrentMap<String, Entry> storage;
private final Consumer<Digest> onPut;
private final Consumer<Digest> onReadComplete;
private final Consumer<Iterable<Digest>> onExpire;
private final Executor accessRecorder;
private final ExecutorService expireService;
Expand Down Expand Up @@ -324,6 +325,7 @@ public CASFileCache(
/* storage=*/ Maps.newConcurrentMap(),
/* directoriesIndexDbName=*/ DEFAULT_DIRECTORIES_INDEX_NAME,
/* onPut=*/ (digest) -> {},
/* onReadComplete=*/ (digest) -> {},
/* onExpire=*/ (digests) -> {},
/* delegate=*/ null,
/* delegateSkipLoad=*/ false);
Expand All @@ -342,6 +344,7 @@ public CASFileCache(
ConcurrentMap<String, Entry> storage,
String directoriesIndexDbName,
Consumer<Digest> onPut,
Consumer<Digest> onReadComplete,
Consumer<Iterable<Digest>> onExpire,
@Nullable ContentAddressableStorage delegate,
boolean delegateSkipLoad) {
Expand All @@ -354,6 +357,7 @@ public CASFileCache(
this.accessRecorder = accessRecorder;
this.storage = storage;
this.onPut = onPut;
this.onReadComplete = onReadComplete;
this.onExpire = onExpire;
this.delegate = delegate;
this.delegateSkipLoad = delegateSkipLoad;
Expand Down Expand Up @@ -679,6 +683,7 @@ void sendBuffer() throws IOException {
if (len < 0) {
in.close();
blobObserver.onCompleted();
onReadComplete.accept(digest);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/build/buildfarm/common/config/Backplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public enum BACKPLANE_TYPE {
private String casPrefix = "ContentAddressableStorage";
private int casExpire = 604800; // 1 Week
private String casReadCountSetName = "CasReadCount";
private boolean enableCasAccessMetrics = false;
private int casReadCountWindow = 14400; // 4 hours
private int casReadCountUpdateInterval = 900; // 15 mins

@Getter(AccessLevel.NONE)
private boolean subscribeToBackplane = true; // deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ShardCASFileCache extends CASFileCache {
ExecutorService expireService,
Executor accessRecorder,
Consumer<Digest> onPut,
Consumer<Digest> onReadComplete,
Consumer<Iterable<Digest>> onExpire,
ContentAddressableStorage delegate,
boolean delegateSkipLoad) {
Expand All @@ -59,6 +60,7 @@ class ShardCASFileCache extends CASFileCache {
/* storage=*/ Maps.newConcurrentMap(),
DEFAULT_DIRECTORIES_INDEX_NAME,
onPut,
onReadComplete,
onExpire,
delegate,
delegateSkipLoad);
Expand Down
27 changes: 26 additions & 1 deletion src/main/java/build/buildfarm/worker/shard/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import build.buildfarm.cas.ContentAddressableStorage;
import build.buildfarm.cas.ContentAddressableStorage.Blob;
import build.buildfarm.cas.MemoryCAS;
import build.buildfarm.cas.cfc.CASAccessMetricsRecorder;
import build.buildfarm.cas.cfc.CASFileCache;
import build.buildfarm.common.BuildfarmExecutors;
import build.buildfarm.common.DigestUtil;
Expand Down Expand Up @@ -135,6 +136,7 @@ public final class Worker extends LoggingMain {
private Backplane backplane;
private LoadingCache<String, Instance> workerStubs;
private AtomicBoolean released = new AtomicBoolean(true);
@Nullable private CASAccessMetricsRecorder casAccessMetricsRecorder;

private Worker() {
super("BuildFarmShardWorker");
Expand Down Expand Up @@ -300,7 +302,8 @@ private ContentAddressableStorage createStorage(
throw new IllegalArgumentException("Invalid cas type specified");
case MEMORY:
case FUSE: // FIXME have FUSE refer to a name for storage backing, and topo
return new MemoryCAS(cas.getMaxSizeBytes(), this::onStoragePut, delegate);
return new MemoryCAS(
cas.getMaxSizeBytes(), this::onStoragePut, this::onReadComplete, delegate);
case GRPC:
checkState(delegate == null, "grpc cas cannot delegate");
return createGrpcCAS(cas);
Expand All @@ -318,6 +321,7 @@ private ContentAddressableStorage createStorage(
removeDirectoryService,
accessRecorder,
this::onStoragePut,
this::onReadComplete,
delegate == null ? this::onStorageExpire : (digests) -> {},
delegate,
delegateSkipLoad);
Expand Down Expand Up @@ -347,6 +351,9 @@ private void onStoragePut(Digest digest) {
if (configs.getWorker().getCapabilities().isCas()) {
backplane.addBlobLocation(digest, configs.getWorker().getPublicName());
}
if (configs.getBackplane().isEnableCasAccessMetrics()) {
casAccessMetricsRecorder.recordWrite(digest);
}
} catch (IOException e) {
throw Status.fromThrowable(e).asRuntimeException();
}
Expand All @@ -363,6 +370,12 @@ private void onStorageExpire(Iterable<Digest> digests) {
}
}

private void onReadComplete(Digest digest) {
if (configs.getBackplane().isEnableCasAccessMetrics()) {
casAccessMetricsRecorder.recordRead(digest);
}
}

private void removeWorker(String name) {
try {
backplane.removeWorker(name, "removing self prior to initialization");
Expand Down Expand Up @@ -575,6 +588,15 @@ public void start() throws ConfigurationException, InterruptedException, IOExcep
execFileSystem.start(
(digests) -> addBlobsLocation(digests, configs.getWorker().getPublicName()), skipLoad);

if (configs.getBackplane().isEnableCasAccessMetrics()) {
casAccessMetricsRecorder =
new CASAccessMetricsRecorder(
backplane,
java.time.Duration.ofSeconds(configs.getBackplane().getCasReadCountWindow()),
java.time.Duration.ofSeconds(configs.getBackplane().getCasReadCountUpdateInterval()));
casAccessMetricsRecorder.start();
}

server.start();
healthStatusManager.setStatus(
HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.SERVING);
Expand Down Expand Up @@ -645,6 +667,9 @@ private void shutdown() throws InterruptedException {
execFileSystem.stop();
execFileSystem = null;
}
if (casAccessMetricsRecorder != null) {
casAccessMetricsRecorder.stop();
}
if (server != null) {
log.info("Shutting down the server");
server.shutdown();
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void setUp() throws IOException, InterruptedException {
storage,
/* directoriesIndexDbName=*/ ":memory:",
onPut,
/* onReadComplete=*/ digest -> {},
onExpire,
delegate,
/* delegateSkipLoad=*/ false) {
Expand Down Expand Up @@ -1118,6 +1119,7 @@ public void copyExternalInputRetries() throws Exception {
storage,
/* directoriesIndexDbName=*/ ":memory:",
/* onPut=*/ digest -> {},
/* onReadComplete=*/ digest -> {},
/* onExpire=*/ digests -> {},
/* delegate=*/ null,
/* delegateSkipLoad=*/ false) {
Expand Down Expand Up @@ -1181,6 +1183,7 @@ public void newInputThrowsNoSuchFileExceptionWithoutDelegate() throws Exception
storage,
/* directoriesIndexDbName=*/ ":memory:",
/* onPut=*/ digest -> {},
/* onReadComplete=*/ digest -> {},
/* onExpire=*/ digests -> {},
/* delegate=*/ null,
/* delegateSkipLoad=*/ false) {
Expand Down

0 comments on commit 1244187

Please sign in to comment.