-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
L Server endpoints config #153
Conversation
Provide configuration for the CAS and ActionCache for memory instances. Support a grpc endpoint for both, with mixed mode support for delegation and reorganization of CAS and AC into their own packages.
9f2ada1
to
c48a715
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass comments -- I feel I need a little more context for the overall plan and use cases for this to make it easier to review. Thank you!
} | ||
|
||
@Override | ||
public boolean contains(Digest digest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you use queryWriteStatus for contains instead of getMissingBlobs? I feel that this API method is more for ongoing write operations, rather than keeping persistent metadata. I think getMissing is more appropriate. But, if you are using queryWriteStatus (and will implement it to essentially be encompass getMissing), then why are you expiring the item on false? What if there's an ongoing write that's just not finished yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what getMissing is, it doesn't appear in the bytestream API. Do you mean findMissingBlobs
from the cas? The presumption for me was that we could use a bytestream only implementation for this cas, instead of requiring the (less common) ContentAddressableStorage implementation from remote_execution.
I'm expiring it so that on any access (since I don't have a callback mechanism for bytestream, though this might be a good case for a watcher), either get or contains, we trigger the onExpiration for a key. poor man's event system.
Note the comparison to the committed size: only the completed upload with committed size maching the digest size is considered 'contained' - we cannot return true for contains when the committed size has not reached the digest-indicated size. This check was actually missing from get
, I will add it.
return contains; | ||
} | ||
|
||
private synchronized void addOnExpiration(Digest digest, Runnable onExpiration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Say, how are onExpiration runnables used? I couldn't find where you use them. It's a bit weird to have them on a proxy cache that doesn't manage the actual items -- seems like you can't really guarantee to run them as needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use them to back a map of CompletedOperations and the ActionCache - build.buildfarm.instance.DelegateCASMap does the heavy lifting there - I've found them to be fairly reliable in the memory instance, allowing the backing store to be contributed to by ActionResult and Operation messages for a single overall byte limit (if a bit naive). As noted above, something like Watcher would be required to do this reliably with GrpcCAS.
} | ||
}); | ||
|
||
public InputStream newStreamInput(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if it throws a NOT_FOUND?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For streaming responses, grpc throws on hasNext()
for the iterated replies, the first time the iterator is accessed. ByteStringIteratorInputStream handles this by transforming NOT_FOUND into an IOException with it as a cause.
|
||
@Override | ||
public ActionResult get(ActionKey actionKey) { | ||
return actionCacheBlockingStub.get().getActionResult(GetActionResultRequest.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you need to transform NOT_FOUND status into null here?
} | ||
|
||
@Override | ||
public void put(ActionKey actionKey, ActionResult actionResult) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing retries for all these methods are coming later? Because now only the upload has them, IIUC.
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
class GrpcCAS implements ContentAddressableStorage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, not for this PR, but for the future: you might want to push the get/putAllBlobs interface down to ContentAddressableStorage, to benefit from batching to reduce latency. Alexis did some performance analysis (results to be published soon) which discovered that the BatchUpdateBlobs method is extremely beneficial for average builds (and, by extension, BatchDownloadBlobs method for the worker). And this was with just using the most trivial bin-packing algorithm.
super( | ||
name, | ||
digestUtil, | ||
contentAddressableStorage, | ||
/*actionCache=*/ new DelegateCASMap<ActionKey, ActionResult>(contentAddressableStorage, ActionResult.parser(), digestUtil), | ||
/*actionCache=*/ MemoryInstance.createActionCache(config.getActionCacheConfig(), contentAddressableStorage, digestUtil), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now the code is self-explanatory, no need for comments on parameter names, I think.
@@ -14,11 +14,17 @@ | |||
|
|||
package build.buildfarm.instance.memory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, so now the MemoryInstance can potentially use a gRPC backend? What scenarios do you want to use this in? Do you want to enable only CAS to be gRPC-based, and AC to be memory-based? (The other way around won't work, I think -- you can't proxy the AC but store the blobs in memory -- will that be a problem?)
Maybe this is a different kind of instance, then? I mean, do I understand correctly that a StubInstance is something that gRPC proxies everything, while MemoryInstance after this change could gRPC proxy some things, but not others? Sorry, it's just the distinction between the instance types became more blurry to me at this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this does change the lines a bit. I fought with this logically and came up with the following:
The memory instance is responsible for retaining all ephemeral state in memory for the purpose of execution. It is using well known interfaces for access to the ContentAddressableStorage and ActionCache, which were quite literally set apart in the remote_execution service definition. It is incapable of continuing a watch for a currently executing Operation through termination, and workers which attempt to report back via the OperationQueueService through a termination will be met with failure. Since he's not assuming responsibility for the CAS or AC that he is being passed, and not dropping those other memory-based limitations, it's not a convention break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated summary to include a reference to the issue that I wanted to add this for.
} | ||
}); | ||
|
||
public InputStream newStreamInput(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For streaming responses, grpc throws on hasNext()
for the iterated replies, the first time the iterator is accessed. ByteStringIteratorInputStream handles this by transforming NOT_FOUND into an IOException with it as a cause.
} | ||
|
||
@Override | ||
public boolean contains(Digest digest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what getMissing is, it doesn't appear in the bytestream API. Do you mean findMissingBlobs
from the cas? The presumption for me was that we could use a bytestream only implementation for this cas, instead of requiring the (less common) ContentAddressableStorage implementation from remote_execution.
I'm expiring it so that on any access (since I don't have a callback mechanism for bytestream, though this might be a good case for a watcher), either get or contains, we trigger the onExpiration for a key. poor man's event system.
Note the comparison to the committed size: only the completed upload with committed size maching the digest size is considered 'contained' - we cannot return true for contains when the committed size has not reached the digest-indicated size. This check was actually missing from get
, I will add it.
return contains; | ||
} | ||
|
||
private synchronized void addOnExpiration(Digest digest, Runnable onExpiration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use them to back a map of CompletedOperations and the ActionCache - build.buildfarm.instance.DelegateCASMap does the heavy lifting there - I've found them to be fairly reliable in the memory instance, allowing the backing store to be contributed to by ActionResult and Operation messages for a single overall byte limit (if a bit naive). As noted above, something like Watcher would be required to do this reliably with GrpcCAS.
@@ -14,11 +14,17 @@ | |||
|
|||
package build.buildfarm.instance.memory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this does change the lines a bit. I fought with this logically and came up with the following:
The memory instance is responsible for retaining all ephemeral state in memory for the purpose of execution. It is using well known interfaces for access to the ContentAddressableStorage and ActionCache, which were quite literally set apart in the remote_execution service definition. It is incapable of continuing a watch for a currently executing Operation through termination, and workers which attempt to report back via the OperationQueueService through a termination will be met with failure. Since he's not assuming responsibility for the CAS or AC that he is being passed, and not dropping those other memory-based limitations, it's not a convention break.
The size of the download content must be exactly the digest size to be consistent with contains
Addresses limitations indicated in #142