Skip to content

Commit

Permalink
Specify CAS and AC endpoints on worker (#143)
Browse files Browse the repository at this point in the history
Provide configuration to specify an endpoint for CAS and ActionCache
interactions.
  • Loading branch information
werkt committed Jun 5, 2018
1 parent 24a5d7e commit e8047f6
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 62 deletions.
32 changes: 26 additions & 6 deletions examples/worker.config.example
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
# the instance domain that this worker will execute work in
# all requests will be tagged with this instance name
instance_name: "default_memory_instance"

# the hash function for this worker, required
# to match out of band between the client and server
# since resource names must be determined on the client
# for a valid upload
hash_function: SHA256

# the endpoint used for all api requests
operation_queue: "localhost:8980"
# the endpoint used to execute operations
operation_queue: {
target: "localhost:8980"

# the instance domain that this worker will execute work in
# all requests will be tagged with this instance name
instance_name: "default_memory_instance"
}

# the endpoint used for cas interactions
content_addressable_storage: {
target: "localhost:8980"

# the instance domain that this worker will make resource requests in
# all requests will be tagged with this instance name
instance_name: "default_memory_instance"
}

# the endpoint used for action cache interactions
action_cache: {
target: "localhost:8980"

# the instance domain that this worker will make resource requests in
# all requests will be tagged with this instance name
instance_name: "default_memory_instance"
}

# all content for the operations will be stored under this path
root: "/tmp/worker"
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/build/buildfarm/worker/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private void runInterruptible() throws InterruptedException {
.setMetadata(Any.pack(executingMetadata))
.build();

if (!workerContext.getInstance().putOperation(operation)) {
if (!workerContext.putOperation(operation)) {
owner.error().put(operationContext);
owner.release();
return;
Expand Down Expand Up @@ -158,12 +158,12 @@ private Code executeCommand(
OutputStream stdoutSink = null, stderrSink = null;

if (stdoutStreamName != null && !stdoutStreamName.isEmpty() && workerContext.getStreamStdout()) {
stdoutSink = workerContext.getInstance().getStreamOutput(stdoutStreamName);
stdoutSink = workerContext.getStreamOutput(stdoutStreamName);
} else {
stdoutSink = nullOutputStream;
}
if (stderrStreamName != null && !stderrStreamName.isEmpty() && workerContext.getStreamStderr()) {
stderrSink = workerContext.getInstance().getStreamOutput(stderrStreamName);
stderrSink = workerContext.getStreamOutput(stderrStreamName);
} else {
stderrSink = nullOutputStream;
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/build/buildfarm/worker/ReportResultStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ protected OperationContext tick(OperationContext operationContext) throws Interr
inlineContentBytes += updateActionResultStdOutputs(resultBuilder, contents, inlineContentBytes);

try {
workerContext.getInstance().putAllBlobs(contents.build());
workerContext.putAllBlobs(contents.build());
} catch (IOException ex) {
poller.stop();
return null;
}

ActionResult result = resultBuilder.build();
if (!operationContext.action.getDoNotCache() && resultBuilder.getExitCode() == 0) {
workerContext.getInstance().putActionResult(DigestUtil.asActionKey(operationContext.metadata.getActionDigest()), result);
workerContext.putActionResult(DigestUtil.asActionKey(operationContext.metadata.getActionDigest()), result);
}

ExecuteOperationMetadata metadata = operationContext.metadata.toBuilder()
Expand All @@ -224,7 +224,7 @@ protected OperationContext tick(OperationContext operationContext) throws Interr

poller.stop();

if (!workerContext.getInstance().putOperation(operation)) {
if (!workerContext.putOperation(operation)) {
return null;
}

Expand Down
8 changes: 7 additions & 1 deletion src/main/java/build/buildfarm/worker/WorkerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
package build.buildfarm.worker;

import build.buildfarm.common.DigestUtil;
import build.buildfarm.common.DigestUtil.ActionKey;
import build.buildfarm.instance.Instance;
import build.buildfarm.v1test.CASInsertionPolicy;
import com.google.devtools.remoteexecution.v1test.Action;
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.devtools.remoteexecution.v1test.ExecuteOperationMetadata.Stage;
import com.google.longrunning.Operation;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.function.Predicate;

Expand All @@ -45,10 +48,13 @@ public interface WorkerContext {
boolean getStreamStderr();
Duration getDefaultActionTimeout();
Duration getMaximumActionTimeout();
Instance getInstance();
ByteString getBlob(Digest digest);
void createActionRoot(Path root, Action action) throws IOException, InterruptedException;
void destroyActionRoot(Path root) throws IOException;
Path getRoot();
void removeDirectory(Path path) throws IOException;
boolean putOperation(Operation operation);
OutputStream getStreamOutput(String name);
Iterable<Digest> putAllBlobs(Iterable<ByteString> blobs) throws IOException, IllegalArgumentException, InterruptedException;
void putActionResult(ActionKey actionKey, ActionResult actionResult);
}
80 changes: 58 additions & 22 deletions src/main/java/build/buildfarm/worker/operationqueue/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package build.buildfarm.worker.operationqueue;

import build.buildfarm.common.DigestUtil;
import build.buildfarm.common.DigestUtil.ActionKey;
import build.buildfarm.common.DigestUtil.HashFunction;
import build.buildfarm.instance.Instance;
import build.buildfarm.instance.stub.ByteStreamUploader;
import build.buildfarm.instance.stub.Retrier;
import build.buildfarm.instance.stub.Retrier.Backoff;
import build.buildfarm.instance.stub.StubInstance;
import build.buildfarm.v1test.CASInsertionPolicy;
import build.buildfarm.v1test.InstanceEndpoint;
import build.buildfarm.v1test.WorkerConfig;
import build.buildfarm.worker.CASFileCache;
import build.buildfarm.worker.ExecuteActionStage;
Expand All @@ -41,6 +43,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.common.options.OptionsParser;
import com.google.devtools.remoteexecution.v1test.Action;
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.devtools.remoteexecution.v1test.Directory;
import com.google.devtools.remoteexecution.v1test.DirectoryNode;
Expand All @@ -58,6 +61,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -76,13 +80,17 @@

public class Worker {
public static final Logger logger = Logger.getLogger(Worker.class.getName());
private final Instance instance;
private final DigestUtil digestUtil;
private final Instance casInstance;
private final Instance acInstance;
private final Instance operationQueueInstance;
private final WorkerConfig config;
private final Path root;
private final CASFileCache fileCache;
private final Map<Path, Iterable<Path>> rootInputFiles = new ConcurrentHashMap<>();
private final Map<Path, Iterable<Digest>> rootInputDirectories = new ConcurrentHashMap<>();
private final ListeningScheduledExecutorService retryScheduler =

private static final ListeningScheduledExecutorService retryScheduler =
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));

private static Channel createChannel(String target) {
Expand Down Expand Up @@ -127,10 +135,20 @@ private static Retrier createStubRetrier() {
Retrier.DEFAULT_IS_RETRIABLE);
}

private ByteStreamUploader createStubUploader(Channel channel) {
private static ByteStreamUploader createStubUploader(Channel channel) {
return new ByteStreamUploader("", channel, null, 300, createStubRetrier(), retryScheduler);
}

private static Instance createInstance(InstanceEndpoint instanceEndpoint,
DigestUtil digestUtil) {
Channel channel = createChannel(instanceEndpoint.getTarget());
return new StubInstance(
instanceEndpoint.getInstanceName(),
digestUtil,
channel,
createStubUploader(channel));
}

public Worker(WorkerConfig config) throws ConfigurationException {
this.config = config;

Expand All @@ -140,23 +158,21 @@ public Worker(WorkerConfig config) throws ConfigurationException {
HashFunction hashFunction = getValidHashFunction(config);

/* initialization */
Channel channel = createChannel(config.getOperationQueue());
instance = new StubInstance(
config.getInstanceName(),
new DigestUtil(hashFunction),
channel,
createStubUploader(channel));
digestUtil = new DigestUtil(hashFunction);
casInstance = createInstance(config.getContentAddressableStorage(), digestUtil);
acInstance = createInstance(config.getActionCache(), digestUtil);
operationQueueInstance = createInstance(config.getOperationQueue(), digestUtil);
InputStreamFactory inputStreamFactory = new InputStreamFactory() {
@Override
public InputStream apply(Digest digest) {
return instance.newStreamInput(instance.getBlobName(digest));
return casInstance.newStreamInput(casInstance.getBlobName(digest));
}
};
fileCache = new CASFileCache(
inputStreamFactory,
root.resolve(casCacheDirectory),
config.getCasCacheMaxSizeBytes(),
instance.getDigestUtil());
casInstance.getDigestUtil());
}

class OutputDirectory {
Expand Down Expand Up @@ -187,13 +203,13 @@ private void fetchInputs(
String pageToken = "";

do {
pageToken = instance.getTree(inputRoot, config.getTreePageSize(), pageToken, directories);
pageToken = casInstance.getTree(inputRoot, config.getTreePageSize(), pageToken, directories);
} while (!pageToken.isEmpty());

Set<Digest> directoryDigests = new HashSet<>();
ImmutableMap.Builder<Digest, Directory> directoriesIndex = new ImmutableMap.Builder<>();
for (Directory directory : directories.build()) {
Digest directoryDigest = instance.getDigestUtil().compute(directory);
Digest directoryDigest = casInstance.getDigestUtil().compute(directory);
if (!directoryDigests.add(directoryDigest)) {
continue;
}
Expand Down Expand Up @@ -305,7 +321,7 @@ public Poller createPoller(String name, String operationName, Stage stage) {
@Override
public Poller createPoller(String name, String operationName, Stage stage, Runnable onFailure) {
Poller poller = new Poller(config.getOperationPollPeriod(), () -> {
boolean success = instance.pollOperation(operationName, stage);
boolean success = operationQueueInstance.pollOperation(operationName, stage);
if (!success) {
onFailure.run();
}
Expand All @@ -317,12 +333,15 @@ public Poller createPoller(String name, String operationName, Stage stage, Runna

@Override
public DigestUtil getDigestUtil() {
return instance.getDigestUtil();
return digestUtil;
}

@Override
public void match(Predicate<Operation> onMatch) throws InterruptedException {
instance.match(config.getPlatform(), config.getRequeueOnFailure(), onMatch);
operationQueueInstance.match(
config.getPlatform(),
config.getRequeueOnFailure(),
onMatch);
}

@Override
Expand Down Expand Up @@ -390,14 +409,9 @@ public Duration getMaximumActionTimeout() {
return config.getMaximumActionTimeout();
}

@Override
public Instance getInstance() {
return instance;
}

@Override
public ByteString getBlob(Digest digest) {
return instance.getBlob(digest);
return casInstance.getBlob(digest);
}

@Override
Expand Down Expand Up @@ -445,6 +459,28 @@ public Path getRoot() {
public void removeDirectory(Path path) throws IOException {
CASFileCache.removeDirectory(path);
}

@Override
public boolean putOperation(Operation operation) {
return operationQueueInstance.putOperation(operation);
}

// doesn't belong in CAS or AC, must be in OQ
@Override
public OutputStream getStreamOutput(String name) {
return operationQueueInstance.getStreamOutput(name);
}

@Override
public Iterable<Digest> putAllBlobs(Iterable<ByteString> blobs)
throws IOException, IllegalArgumentException, InterruptedException {
return casInstance.putAllBlobs(blobs);
}

@Override
public void putActionResult(ActionKey actionKey, ActionResult actionResult) {
acInstance.putActionResult(actionKey, actionResult);
}
};

PipelineStage errorStage = new ReportResultStage.NullStage(); /* ErrorStage(); */
Expand Down
Loading

0 comments on commit e8047f6

Please sign in to comment.