Skip to content

Commit

Permalink
MemoryInstance now revalidates action on requeueing
Browse files Browse the repository at this point in the history
  • Loading branch information
th0br0 committed Oct 11, 2018
1 parent c8c08eb commit 43af2e4
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 48 deletions.
29 changes: 15 additions & 14 deletions src/main/java/build/buildfarm/BUILD
Expand Up @@ -73,6 +73,7 @@ java_library(
"//3rdparty/jvm/io/grpc:grpc_stub",
"//src/main/protobuf:build_buildfarm_v1test_buildfarm_java_proto",
"@googleapis//:google_longrunning_operations_java_proto",
"@googleapis//:google_rpc_code_java_proto",
"@googleapis//:google_rpc_status_java_proto",
"@remote_apis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
Expand Down Expand Up @@ -110,27 +111,27 @@ java_library(
java_binary(
name = "buildfarm-server",
main_class = "build.buildfarm.server.BuildFarmServer",
visibility = ["//visibility:public"],
runtime_deps = [
":buildfarm",
],
visibility = ["//visibility:public"],
)

container_image(
name = "server.container",
base = "@java_base//image",
cmd = [
"buildfarm-server_deploy.jar",
"/config/server.config",
"--port",
"8980",
],
# leverage the implicit target of the buildfarm-server to get a fat jar.
# this is simply a workaround for the fact that we have so many dependencies,
# so we'd want some wrappy script. This seemed more straightforward.
# https://docs.bazel.build/versions/master/be/java.html#java_binary_implicit_outputs
files = [
":buildfarm-server_deploy.jar"
],
cmd = [
"buildfarm-server_deploy.jar",
"/config/server.config",
"--port",
"8980"
":buildfarm-server_deploy.jar",
],
visibility = ["//visibility:public"],
)
Expand Down Expand Up @@ -184,10 +185,10 @@ java_library(
name = "operationqueue-worker",
srcs = glob(["worker/operationqueue/*.java"]),
deps = [
":worker",
":common",
":instance",
":stub-instance",
":worker",
"//3rdparty/jvm/com/github/pcj:google_options",
"//3rdparty/jvm/com/google/guava",
"//3rdparty/jvm/com/google/protobuf:protobuf_java",
Expand Down Expand Up @@ -233,25 +234,25 @@ java_library(
java_binary(
name = "buildfarm-worker",
main_class = "build.buildfarm.worker.operationqueue.Worker",
visibility = ["//visibility:public"],
runtime_deps = [
":operationqueue-worker",
],
visibility = ["//visibility:public"],
)

container_image(
name = "worker.container",
base = "@java_base//image",
cmd = [
"buildfarm-worker_deploy.jar",
"/config/worker.config",
],
# leverage the implicit target of the buildfarm-server to get a fat jar.
# this is simply a workaround for the fact that we have so many dependencies,
# so we'd want some wrappy script. This seemed more straightforward.
# https://docs.bazel.build/versions/master/be/java.html#java_binary_implicit_outputs
files = [
":buildfarm-worker_deploy.jar",
],
cmd = [
"buildfarm-worker_deploy.jar",
"/config/worker.config",
],
visibility = ["//visibility:public"],
)
80 changes: 65 additions & 15 deletions src/main/java/build/buildfarm/instance/AbstractServerInstance.java
Expand Up @@ -47,6 +47,7 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Code;
import io.grpc.Status;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -186,7 +187,7 @@ public Digest putBlob(ByteString content)
public Iterable<Digest> putAllBlobs(Iterable<ByteString> blobs)
throws InterruptedException {
ImmutableList.Builder<Digest> blobDigestsBuilder =
new ImmutableList.Builder<Digest>();
new ImmutableList.Builder<Digest>();
for (ByteString blob : blobs) {
blobDigestsBuilder.add(putBlob(blob));
}
Expand All @@ -206,7 +207,9 @@ public Iterable<Digest> findMissingBlobs(Iterable<Digest> digests) {
}

protected abstract int getTreeDefaultPageSize();

protected abstract int getTreeMaxPageSize();

protected abstract TokenizableIterator<Directory> createTreeIterator(
Digest rootDigest, String pageToken);

Expand All @@ -222,7 +225,7 @@ public String getTree(
}

TokenizableIterator<Directory> iter =
createTreeIterator(rootDigest, pageToken);
createTreeIterator(rootDigest, pageToken);

while (iter.hasNext() && pageSize != 0) {
Directory directory = iter.next();
Expand Down Expand Up @@ -478,8 +481,8 @@ public static void validateOutputs(
// An output file cannot be a parent of another output file, be a child of a listed output directory, or have the same path as any of the listed output directories.
for (String outputFile : outputFiles) {
Preconditions.checkState(
!inputDirectories.contains(outputFile),
"output file is input directory");
!inputDirectories.contains(outputFile),
"output file is input directory");
String currentPath = outputFile;
while (currentPath != "") {
final String dirname;
Expand All @@ -496,8 +499,8 @@ public static void validateOutputs(
// An output directory cannot be a parent of another output directory, be a parent of a listed output file, or have the same path as any of the listed output files.
for (String outputDir : outputDirectories) {
Preconditions.checkState(
!inputFiles.contains(outputDir),
"output directory is input file");
!inputFiles.contains(outputDir),
"output directory is input file");
String currentPath = outputDir;
while (currentPath != "") {
final String dirname;
Expand Down Expand Up @@ -536,7 +539,7 @@ public void execute(
ActionKey actionKey = DigestUtil.asActionKey(actionDigest);
Operation operation = createOperation(actionKey);
ExecuteOperationMetadata metadata =
expectExecuteOperationMetadata(operation);
expectExecuteOperationMetadata(operation);

putOperation(operation);

Expand Down Expand Up @@ -591,7 +594,7 @@ protected ExecuteOperationMetadata expectExecuteOperationMetadata(
operation.getMetadata().is(ExecuteOperationMetadata.class));
try {
return operation.getMetadata().unpack(ExecuteOperationMetadata.class);
} catch(InvalidProtocolBufferException ex) {
} catch (InvalidProtocolBufferException ex) {
return null;
}
}
Expand All @@ -600,7 +603,7 @@ protected Action expectAction(Operation operation) {
try {
return Action.parseFrom(getBlob(
expectExecuteOperationMetadata(operation).getActionDigest()));
} catch(InvalidProtocolBufferException ex) {
} catch (InvalidProtocolBufferException ex) {
return null;
}
}
Expand All @@ -612,7 +615,7 @@ protected Command expectCommand(Operation operation) {
}
try {
return Command.parseFrom(getBlob(action.getCommandDigest()));
} catch(InvalidProtocolBufferException ex) {
} catch (InvalidProtocolBufferException ex) {
return null;
}
}
Expand All @@ -623,7 +626,7 @@ protected Directory expectDirectory(Digest directoryBlobDigest) {
if (directoryBlob != null) {
return Directory.parseFrom(directoryBlob);
}
} catch(InvalidProtocolBufferException ex) {
} catch (InvalidProtocolBufferException ex) {
}
return null;
}
Expand All @@ -650,6 +653,7 @@ protected boolean isComplete(Operation operation) {
}

abstract protected boolean matchOperation(Operation operation);

abstract protected void enqueueOperation(Operation operation);

@Override
Expand Down Expand Up @@ -677,15 +681,15 @@ public boolean putOperation(Operation operation) throws InterruptedException {

/**
* per-operation lock factory/indexer method
*
* <p>
* the lock retrieved for an operation will guard against races
* during transfers/retrievals/removals
*/
protected abstract Object operationLock(String operationName);

protected void updateOperationWatchers(Operation operation) throws InterruptedException {
if (operation.getDone()) {
synchronized(operationLock(operation.getName())) {
synchronized (operationLock(operation.getName())) {
completedOperations.put(operation.getName(), operation);
outstandingOperations.remove(operation.getName());
}
Expand All @@ -696,7 +700,7 @@ protected void updateOperationWatchers(Operation operation) throws InterruptedEx

@Override
public Operation getOperation(String name) {
synchronized(operationLock(name)) {
synchronized (operationLock(name)) {
Operation operation = completedOperations.get(name);
if (operation == null) {
operation = outstandingOperations.get(name);
Expand All @@ -706,7 +710,9 @@ public Operation getOperation(String name) {
}

protected abstract int getListOperationsDefaultPageSize();

protected abstract int getListOperationsMaxPageSize();

protected abstract TokenizableIterator<Operation> createOperationsIterator(String pageToken);

@Override
Expand Down Expand Up @@ -735,7 +741,7 @@ pageSize > getListOperationsMaxPageSize()) {

@Override
public void deleteOperation(String name) {
synchronized(operationLock(name)) {
synchronized (operationLock(name)) {
Operation deletedOperation = completedOperations.remove(name);
if (deletedOperation == null &&
outstandingOperations.contains(name)) {
Expand All @@ -755,6 +761,50 @@ public void cancelOperation(String name) throws InterruptedException {
.build());
}

public boolean requeueOperation(Operation operation) throws InterruptedException {
if(!isQueued(operation)) {
throw new IllegalStateException("Operation stage is not QUEUED");
}

Action action = expectAction(operation);

try {
validateAction(action);
} catch (IllegalStateException e) {
errorOperation(operation.getName(), com.google.rpc.Status.newBuilder()
.setCode(com.google.rpc.Code.FAILED_PRECONDITION.getNumber())
.build());
return false;
}

return putOperation(operation);
}


protected void errorOperation(String name, com.google.rpc.Status status) throws InterruptedException {
Operation operation = getOperation(name);
if (operation == null) {
// throw new IllegalStateException("Trying to error nonexistent operation [" + name + "]");
operation = Operation.newBuilder()
.setName(name)
.build();
}
if (operation.getDone()) {
throw new IllegalStateException("Trying to error already completed operation [" + name + "]");
}
ExecuteOperationMetadata metadata = expectExecuteOperationMetadata(operation);
if (metadata == null) {
metadata = ExecuteOperationMetadata.getDefaultInstance();
}
putOperation(operation.toBuilder()
.setDone(true)
.setMetadata(Any.pack(metadata.toBuilder()
.setStage(ExecuteOperationMetadata.Stage.COMPLETED)
.build()))
.setError(status)
.build());
}

protected void expireOperation(Operation operation) throws InterruptedException {
ActionResult actionResult = ActionResult.newBuilder()
.setExitCode(-1)
Expand Down
Expand Up @@ -59,6 +59,7 @@
import io.grpc.Channel;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
Expand Down Expand Up @@ -331,7 +332,7 @@ protected void validateAction(Action action) {
Duration maximum = config.getMaximumActionTimeout();
Preconditions.checkState(
timeout.getSeconds() < maximum.getSeconds() ||
(timeout.getSeconds() == maximum.getSeconds() && timeout.getNanos() < maximum.getNanos()));
(timeout.getSeconds() == maximum.getSeconds() && timeout.getNanos() < maximum.getNanos()));
}

super.validateAction(action);
Expand Down Expand Up @@ -403,7 +404,7 @@ public boolean putOperation(Operation operation) throws InterruptedException {

private void onDispatched(Operation operation) {
Duration timeout = config.getOperationPollTimeout();
Watchdog requeuer = new Watchdog(timeout, () -> putOperation(operation));
Watchdog requeuer = new Watchdog(timeout, () -> requeueOperation(operation));
requeuers.put(operation.getName(), requeuer);
new Thread(requeuer).start();
}
Expand Down Expand Up @@ -472,7 +473,7 @@ private boolean satisfiesRequirements(Platform platform, Command command) {
// string compare only
// no duplicate names
ImmutableMap.Builder<String, String> provisionsBuilder =
new ImmutableMap.Builder<String, String>();
new ImmutableMap.Builder<String, String>();
for (Platform.Property property : platform.getPropertiesList()) {
provisionsBuilder.put(property.getName(), property.getValue());
}
Expand Down

0 comments on commit 43af2e4

Please sign in to comment.