Skip to content

Commit

Permalink
Interface and flag specification for worker cancellation.
Browse files Browse the repository at this point in the history
See design doc https://docs.google.com/document/d/1-h4gcBV8Jn6DK9G_e23kZQ159jmX__uckhub1Gv9dzc/edit

Also fixes the JSON protocol processor to ignore unknown fields, as per spec.

RELNOTES: Updates worker protocol with cancellation fields, and adds experimental_worker_cancellation flag to control cancellation.
PiperOrigin-RevId: 367600226
  • Loading branch information
larsrc-google committed Jul 30, 2021
1 parent 04cbf71 commit 186deca
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 11 deletions.
Expand Up @@ -160,6 +160,8 @@ public String parseIfMatches(String tag) throws ValidationException {
/** Specify the type of worker protocol the worker uses. */
public static final String REQUIRES_WORKER_PROTOCOL = "requires-worker-protocol";

public static final String SUPPORTS_WORKER_CANCELLATION = "supports-worker-cancellation";

/** Denotes what the type of worker protocol the worker uses. */
public enum WorkerProtocolFormat {
JSON,
Expand Down
Expand Up @@ -92,6 +92,11 @@ public static boolean supportsMultiplexWorkers(Spawn spawn) {
.equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_MULTIPLEX_WORKERS));
}

public static boolean supportsWorkerCancellation(Spawn spawn) {
return "1"
.equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_WORKER_CANCELLATION));
}

/**
* Returns which worker protocol format a Spawn claims a persistent worker uses. Defaults to proto
* if the protocol format is not specified.
Expand Down
Expand Up @@ -92,10 +92,12 @@ private WorkResponse parseResponse() throws IOException {
requestId = reader.nextInt();
break;
default:
throw new IOException(name + " is an incorrect field in work response");
// As per https://docs.bazel.build/versions/master/creating-workers.html#work-responses,
// unknown fields are ignored.
reader.skipValue();
}
}
reader.endObject();
reader.endObject();
} catch (MalformedJsonException | EOFException | IllegalStateException e) {
throw new IOException("Could not parse json work request correctly", e);
}
Expand Down
Expand Up @@ -178,4 +178,12 @@ public String getTypeDescription() {
"Currently a no-op. Future: If enabled, workers that support the experimental"
+ " multiplexing feature will use that feature.")
public boolean workerMultiplex;

@Option(
name = "experimental_worker_cancellation",
defaultValue = "false",
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
effectTags = {OptionEffectTag.EXECUTION},
help = "If enabled, Bazel may send cancellation requests to workers that support them.")
public boolean workerCancellation;
}
30 changes: 24 additions & 6 deletions src/main/protobuf/worker_protocol.proto
Expand Up @@ -39,9 +39,19 @@ message WorkRequest {
// request.
repeated Input inputs = 2;

// To support multiplex worker, each WorkRequest must have an unique ID. This
// ID should be attached unchanged to the WorkResponse.
// Each WorkRequest must have either a unique
// request_id or request_id = 0. If request_id is 0, this WorkRequest must be
// processed alone, otherwise the worker may process multiple WorkRequests in
// parallel (multiplexing). As an exception to the above, if the cancel field
// is true, the request_id must be the same as a previously sent WorkRequest.
// The request_id must be attached unchanged to the corresponding
// WorkResponse.
int32 request_id = 3;

// EXPERIMENTAL: When true, this is a cancel request, indicating that a
// previously sent WorkRequest with the same request_id should be cancelled.
// The arguments and inputs fields must be empty and should be ignored.
bool cancel = 4;
}

// The worker sends this message to Blaze when it finished its work on the
Expand All @@ -54,9 +64,17 @@ message WorkResponse {
// string type here, which gives us UTF-8 encoding.
string output = 2;

// To support multiplex worker, each WorkResponse must have an unique ID.
// Since worker processes which support multiplex worker will handle multiple
// WorkRequests in parallel, this ID will be used to determined which
// WorkerProxy does this WorkResponse belong to.
// This field must be set to the same request_id as the WorkRequest it is a
// response to. Since worker processes which support multiplex worker will
// handle multiple WorkRequests in parallel, this ID will be used to
// determined which WorkerProxy does this WorkResponse belong to.
int32 request_id = 3;

// EXPERIMENTAL When true, indicates that this response was sent due to
// receiving a cancel request. The exit_code and output fields should be empty
// and will be ignored. Exactly one WorkResponse must be sent for each
// non-cancelling WorkRequest received by the worker, but if the worker
// received a cancel request, it doesn't matter if it replies with a regular
// WorkResponse or with one where was_cancelled = true.
bool was_cancelled = 4;
}
14 changes: 11 additions & 3 deletions src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java
Expand Up @@ -199,8 +199,16 @@ public void testGetResponse_json_multipleRequestId_fails() throws IOException {
}

@Test
public void testGetResponse_json_incorrectFields_fails() throws IOException {
verifyGetResponseFailure(
"{\"testField\":0}", "testField is an incorrect field in work response");
public void testGetResponse_json_unknownFieldsIgnored() throws IOException, InterruptedException {
TestWorker testWorker =
createTestWorker(
"{\"exitCode\":1,\"output\":\"test output\",\"requestId\":1,\"unknown\":{1:['a']}}"
.getBytes(UTF_8),
JSON);
WorkResponse readResponse = testWorker.getResponse(1);
WorkResponse response =
WorkResponse.newBuilder().setExitCode(1).setOutput("test output").setRequestId(1).build();

assertThat(readResponse).isEqualTo(response);
}
}

0 comments on commit 186deca

Please sign in to comment.