Skip to content

Commit

Permalink
Only allow worker async finishing when sandboxed.
Browse files Browse the repository at this point in the history
All workers under dynamic execution are sandboxed, but on user interrupt non-sandboxed workers might keep working even after another invocation has begun, which could cause issues. Instead, we kill off interrupted workers that are neither sandboxed nor under
dynamic execution.

RELNOTES: None.
PiperOrigin-RevId: 373141238
  • Loading branch information
larsrc-google authored and Copybara-Service committed May 11, 2021
1 parent 05316ca commit 779d660
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 10 deletions.
Expand Up @@ -32,6 +32,11 @@ final class SandboxedWorker extends SingleplexWorker {
workerExecRoot = new WorkerExecRoot(workDir);
}

@Override
public boolean isSandboxed() {
return true;
}

@Override
public void prepareExecution(
SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
Expand Down
Expand Up @@ -92,6 +92,11 @@ Subprocess createProcess() throws IOException {
return processBuilder.start();
}

@Override
public boolean isSandboxed() {
return false;
}

@Override
public void prepareExecution(
SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
Expand Down
Expand Up @@ -66,6 +66,9 @@ SortedMap<PathFragment, HashCode> getWorkerFilesWithHashes() {
return workerKey.getWorkerFilesWithHashes();
}

/** Returns true if this worker is sandboxed. */
public abstract boolean isSandboxed();

/**
* Sets the reporter this {@code Worker} should report anomalous events to, or clears it. We
* expect the reporter to be cleared at end of build.
Expand Down
Expand Up @@ -99,11 +99,9 @@ public PooledObject<Worker> wrap(Worker worker) {
return new DefaultPooledObject<>(worker);
}

/**
* When a worker process is discarded, destroy its process, too.
*/
/** When a worker process is discarded, destroy its process, too. */
@Override
public void destroyObject(WorkerKey key, PooledObject<Worker> p) throws Exception {
public void destroyObject(WorkerKey key, PooledObject<Worker> p) {
if (workerOptions.workerVerbose) {
int workerId = p.getObject().getWorkerId();
reporter.handle(
Expand Down
Expand Up @@ -54,6 +54,11 @@ final class WorkerProxy extends Worker {
Runtime.getRuntime().addShutdownHook(shutdownHook);
}

@Override
public boolean isSandboxed() {
return false;
}

@Override
void setReporter(EventHandler reporter) {
// We might have created this multiplexer after setting the reporter for existing multiplexers
Expand Down
Expand Up @@ -457,12 +457,27 @@ WorkResponse execInWorker(
try {
response = worker.getResponse(request.getRequestId());
} catch (InterruptedException e) {
finishWorkAsync(
key,
worker,
request,
workerOptions.workerCancellation && Spawns.supportsWorkerCancellation(spawn));
worker = null;
if (worker.isSandboxed()) {
// Sandboxed workers can safely finish their work async.
finishWorkAsync(
key,
worker,
request,
workerOptions.workerCancellation && Spawns.supportsWorkerCancellation(spawn));
worker = null;
} else if (!key.isSpeculative()) {
// Non-sandboxed workers interrupted outside of dynamic execution can only mean that
// the user interrupted the build, and we don't want to delay finishing. Instead we
// kill the worker.
// Technically, workers are always sandboxed under dynamic execution, at least for now.
try {
workers.invalidateObject(key, worker);
} catch (IOException e1) {
// Nothing useful we can do here, in fact it may not be possible to get here.
} finally {
worker = null;
}
}
throw e;
} catch (IOException e) {
restoreInterrupt(e);
Expand Down
Expand Up @@ -179,8 +179,10 @@ public void testExecInWorker_sendsCancelMessageOnInterrupt()
throws ExecException, InterruptedException, IOException {
WorkerOptions workerOptions = new WorkerOptions();
workerOptions.workerCancellation = true;
workerOptions.workerSandboxing = true;
when(spawn.getExecutionInfo())
.thenReturn(ImmutableMap.of(ExecutionRequirements.SUPPORTS_WORKER_CANCELLATION, "1"));
when(worker.isSandboxed()).thenReturn(true);
WorkerSpawnRunner runner =
new WorkerSpawnRunner(
new SandboxHelpers(false),
Expand All @@ -196,6 +198,7 @@ public void testExecInWorker_sendsCancelMessageOnInterrupt()
WorkerKey key = createWorkerKey(fs, "mnem", false);
Path logFile = fs.getPath("/worker.log");
Semaphore secondResponseRequested = new Semaphore(0);
// Fake that the getting the regular response gets interrupted and we then answer the cancel.
when(worker.getResponse(anyInt()))
.thenThrow(new InterruptedException())
.thenAnswer(
Expand Down Expand Up @@ -229,6 +232,53 @@ public void testExecInWorker_sendsCancelMessageOnInterrupt()
.isEqualTo(WorkRequest.newBuilder().setRequestId(0).setCancel(true).build());
}

@Test
public void testExecInWorker_unsandboxedDiesOnInterrupt()
throws InterruptedException, IOException {
WorkerOptions workerOptions = new WorkerOptions();
workerOptions.workerCancellation = true;
workerOptions.workerSandboxing = false;
when(spawn.getExecutionInfo())
.thenReturn(ImmutableMap.of(ExecutionRequirements.SUPPORTS_WORKER_CANCELLATION, "1"));
WorkerSpawnRunner runner =
new WorkerSpawnRunner(
new SandboxHelpers(false),
fs.getPath("/execRoot"),
createWorkerPool(),
/* multiplex */ false,
reporter,
localEnvProvider,
/* binTools */ null,
resourceManager,
/* runfilesTreeUpdater=*/ null,
workerOptions);
WorkerKey key = createWorkerKey(fs, "mnem", false);
Path logFile = fs.getPath("/worker.log");
when(worker.getResponse(anyInt())).thenThrow(new InterruptedException());

// Since this worker is not sandboxed, it will just get killed on interrupt.
assertThrows(
InterruptedException.class,
() ->
runner.execInWorker(
spawn,
key,
context,
new SandboxInputs(ImmutableMap.of(), ImmutableSet.of(), ImmutableMap.of()),
SandboxOutputs.create(ImmutableSet.of(), ImmutableSet.of()),
ImmutableList.of(),
inputFileCache,
spawnMetrics));

assertThat(logFile.exists()).isFalse();
verify(context, times(1)).report(ProgressStatus.EXECUTING, "worker");
ArgumentCaptor<WorkRequest> argumentCaptor = ArgumentCaptor.forClass(WorkRequest.class);
verify(worker, times(1)).putRequest(argumentCaptor.capture());
assertThat(argumentCaptor.getAllValues().get(0))
.isEqualTo(WorkRequest.newBuilder().setRequestId(0).build());
verify(worker, times(1)).destroy();
}

@Test
public void testExecInWorker_noMultiplexWithDynamic()
throws ExecException, InterruptedException, IOException {
Expand Down

0 comments on commit 779d660

Please sign in to comment.