Skip to content

Commit

Permalink
FetchAPI Initial Revision
Browse files Browse the repository at this point in the history
Changes all CAS implemented storage to index by hash alone.
  • Loading branch information
werkt committed Dec 11, 2020
1 parent eb41522 commit ec92659
Show file tree
Hide file tree
Showing 19 changed files with 684 additions and 380 deletions.
2 changes: 2 additions & 0 deletions src/main/java/build/buildfarm/BUILD
Expand Up @@ -398,6 +398,8 @@ java_library(
"@maven//:io_grpc_grpc_services",
"@maven//:io_grpc_grpc_stub",
"@maven//:org_threeten_threetenbp",
"@remote_apis//:build_bazel_remote_asset_v1_remote_asset_java_grpc",
"@remote_apis//:build_bazel_remote_asset_v1_remote_asset_java_proto",
"@remote_apis//:build_bazel_remote_execution_v2_remote_execution_java_grpc",
"@remote_apis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
"@remote_apis//:build_bazel_semver_java_proto",
Expand Down
Expand Up @@ -67,9 +67,14 @@ public boolean isEmpty() {
}
}

/** Indicates presence in the CAS for a single digest. */
/**
* Indicates presence in the CAS for a single digest.
*
* <p>If supported, a size_bytes of -1 may be used to look up the size of a digest A size
* mismatch, if partial key selection is supported, may result in correction
*/
@ThreadSafe
boolean contains(Digest digest);
boolean contains(Digest digest, Digest.Builder result);

/** Indicates presence in the CAS for a sequence of digests. */
@ThreadSafe
Expand Down
42 changes: 29 additions & 13 deletions src/main/java/build/buildfarm/cas/ContentAddressableStorages.java
Expand Up @@ -126,20 +126,25 @@ public static ContentAddressableStorage create(ContentAddressableStorageConfig c
}

/** decorates a map with a CAS interface, does not react to removals with expirations */
public static ContentAddressableStorage casMapDecorator(Map<Digest, ByteString> map) {
public static ContentAddressableStorage casMapDecorator(Map<String, ByteString> map) {
return new ContentAddressableStorage() {
final Writes writes = new Writes(this);

@Override
public boolean contains(Digest digest) {
return map.containsKey(digest);
public boolean contains(Digest digest, Digest.Builder result) {
ByteString data = getData(digest);
if (data != null) {
result.mergeFrom(digest).setSizeBytes(data.size());
return true;
}
return false;
}

@Override
public Iterable<Digest> findMissingBlobs(Iterable<Digest> digests) {
ImmutableList.Builder<Digest> missing = ImmutableList.builder();
for (Digest digest : digests) {
if (digest.getSizeBytes() != 0 && !map.containsKey(digest)) {
if (getData(digest) == null) {
missing.add(digest);
}
}
Expand All @@ -153,7 +158,7 @@ public Write getWrite(Digest digest, UUID uuid, RequestMetadata requestMetadata)

@Override
public InputStream newInput(Digest digest, long offset) throws IOException {
ByteString data = map.get(digest);
ByteString data = getData(digest);
if (data == null) {
throw new NoSuchFileException(digest.getHash());
}
Expand All @@ -169,23 +174,34 @@ public void get(
long count,
ServerCallStreamObserver<ByteString> responseObserver,
RequestMetadata requestMetadata) {
ByteString data = map.get(digest);
if (data == null) {
responseObserver.onError(Status.NOT_FOUND.asException());
} else {
responseObserver.onNext(map.get(digest));
ByteString data = getData(digest);
if (data != null) {
responseObserver.onNext(data);
responseObserver.onCompleted();
} else {
responseObserver.onError(Status.NOT_FOUND.asException());
}
}

private ByteString getData(Digest digest) {
if (digest.getSizeBytes() == 0) {
return ByteString.EMPTY;
}
ByteString data = map.get(digest.getHash());
if (data == null || (digest.getSizeBytes() > 0 && digest.getSizeBytes() != data.size())) {
return null;
}
return data;
}

@Override
public ListenableFuture<Iterable<Response>> getAllFuture(Iterable<Digest> digests) {
return immediateFuture(MemoryCAS.getAll(digests, map::get));
return immediateFuture(MemoryCAS.getAll(digests, this::getData));
}

@Override
public Blob get(Digest digest) {
ByteString data = map.get(digest);
ByteString data = getData(digest);
if (data == null) {
return null;
}
Expand All @@ -194,7 +210,7 @@ public Blob get(Digest digest) {

@Override
public void put(Blob blob) {
map.put(blob.getDigest(), blob.getData());
map.put(blob.getDigest().getHash(), blob.getData());

writes.getFuture(blob.getDigest()).set(blob.getData());
}
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/build/buildfarm/cas/GrpcCAS.java
Expand Up @@ -117,8 +117,12 @@ private String getBlobName(Digest digest) {
}

@Override
public boolean contains(Digest digest) {
public boolean contains(Digest digest, Digest.Builder result) {
// QueryWriteStatusRequest?
if (digest.getSizeBytes() < 0) {
throw new UnsupportedOperationException("cannot lookup hash without size via grpc cas");
}
result.mergeFrom(digest);
return Iterables.isEmpty(findMissingBlobs(ImmutableList.of(digest)));
}

Expand Down
44 changes: 29 additions & 15 deletions src/main/java/build/buildfarm/cas/MemoryCAS.java
Expand Up @@ -55,7 +55,7 @@ public class MemoryCAS implements ContentAddressableStorage {
private final Consumer<Digest> onPut;

@GuardedBy("this")
private final Map<Digest, Entry> storage;
private final Map<String, Entry> storage;

@GuardedBy("this")
private final Entry header = new SentinelEntry();
Expand All @@ -81,8 +81,13 @@ public MemoryCAS(
}

@Override
public synchronized boolean contains(Digest digest) {
return get(digest) != null || (delegate != null && delegate.contains(digest));
public synchronized boolean contains(Digest digest, Digest.Builder result) {
Entry entry = getEntry(digest);
if (entry != null) {
result.setHash(entry.key).setSizeBytes(entry.value.size());
return true;
}
return delegate != null && delegate.contains(digest, result);
}

@Override
Expand All @@ -91,7 +96,7 @@ public Iterable<Digest> findMissingBlobs(Iterable<Digest> digests) throws Interr
synchronized (this) {
// incur access use of the digest
for (Digest digest : digests) {
if (digest.getSizeBytes() != 0 && !contains(digest)) {
if (digest.getSizeBytes() != 0 && !contains(digest, Digest.newBuilder())) {
builder.add(digest);
}
}
Expand Down Expand Up @@ -196,22 +201,30 @@ public static Response getResponse(Digest digest, Function<Digest, ByteString> b
}

@Override
public synchronized Blob get(Digest digest) {
public Blob get(Digest digest) {
if (digest.getSizeBytes() == 0) {
throw new IllegalArgumentException("Cannot fetch empty blob");
}

Entry e = storage.get(digest);
Entry e = getEntry(digest);
if (e == null) {
if (delegate != null) {
return delegate.get(digest);
}
return null;
}
e.recordAccess(header);
return e.value;
}

private synchronized Entry getEntry(Digest digest) {
Entry e = storage.get(digest.getHash());
if (e == null) {
return null;
}
e.recordAccess(header);
return e;
}

@GuardedBy("this")
private long size() {
Entry e = header.before;
Expand Down Expand Up @@ -245,7 +258,7 @@ public void put(Blob blob, Runnable onExpiration) {
}

private synchronized boolean add(Blob blob, Runnable onExpiration) {
Entry e = storage.get(blob.getDigest());
Entry e = storage.get(blob.getDigest().getHash());
if (e != null) {
if (onExpiration != null) {
e.addOnExpiration(onExpiration);
Expand All @@ -270,7 +283,7 @@ private synchronized boolean add(Blob blob, Runnable onExpiration) {

createEntry(blob, onExpiration);

storage.put(blob.getDigest(), header.before);
storage.put(blob.getDigest().getHash(), header.before);

return true;
}
Expand All @@ -291,27 +304,28 @@ private void createEntry(Blob blob, Runnable onExpiration) {

@GuardedBy("this")
private void expireEntry(Entry e) {
logger.log(Level.INFO, "MemoryLRUCAS: expiring " + DigestUtil.toString(e.key));
Digest digest = DigestUtil.buildDigest(e.key, e.value.size());
logger.log(Level.INFO, "MemoryLRUCAS: expiring " + DigestUtil.toString(digest));
if (delegate != null) {
try {
Write write =
delegate.getWrite(e.key, UUID.randomUUID(), RequestMetadata.getDefaultInstance());
delegate.getWrite(digest, UUID.randomUUID(), RequestMetadata.getDefaultInstance());
try (OutputStream out = write.getOutput(1, MINUTES, () -> {})) {
e.value.getData().writeTo(out);
}
} catch (IOException ioEx) {
logger.log(
Level.SEVERE, String.format("error delegating %s", DigestUtil.toString(e.key)), ioEx);
Level.SEVERE, String.format("error delegating %s", DigestUtil.toString(digest)), ioEx);
}
}
storage.remove(e.key);
e.expire();
sizeInBytes -= e.value.size();
sizeInBytes -= digest.getSizeBytes();
}

private static class Entry {
Entry before, after;
final Digest key;
final String key;
final Blob value;
private List<Runnable> onExpirations;

Expand All @@ -323,7 +337,7 @@ private Entry() {
}

public Entry(Blob blob) {
key = blob.getDigest();
key = blob.getDigest().getHash();
value = blob;
onExpirations = null;
}
Expand Down

0 comments on commit ec92659

Please sign in to comment.