Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation stream termination is not an error #18785

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,12 @@ ExecuteResponse start() throws IOException, InterruptedException {
// retrying when received a unauthenticated error, and propagate to refreshIfUnauthenticated
// which will then call retrier again. It will reset the retry time counter so we could
// retry more than --remote_retry times which is not expected.
response =
retrier.execute(
() -> Utils.refreshIfUnauthenticated(this::execute, callCredentialsProvider),
executeBackoff);
if (lastOperation == null) {
response =
retrier.execute(
() -> Utils.refreshIfUnauthenticated(this::execute, callCredentialsProvider),
executeBackoff);
}

// If no response from Execute(), use WaitExecution() in a "loop" which is implemented
// inside the retry block.
Expand Down Expand Up @@ -258,8 +260,8 @@ ExecuteResponse handleOperationStream(Iterator<Operation> operationStream) throw
}
}

// The operation completed successfully but without a result.
throw new IOException("Remote server error: execution terminated with no result");
// The operation stream completed successfully but without a result.
return null;
} finally {
close(operationStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,258 +17,60 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertThrows;

import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.ExecuteRequest;
import build.bazel.remote.execution.v2.ExecuteResponse;
import build.bazel.remote.execution.v2.ExecutionCapabilities;
import build.bazel.remote.execution.v2.OutputFile;
import build.bazel.remote.execution.v2.RequestMetadata;
import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff;
import com.google.devtools.build.lib.remote.common.OperationObserver;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
import com.google.devtools.build.lib.remote.util.TestUtils;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.common.options.Options;
import com.google.longrunning.Operation;
import com.google.rpc.Code;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link ExperimentalGrpcRemoteExecutor}. */
@RunWith(JUnit4.class)
public class ExperimentalGrpcRemoteExecutorTest {

private RemoteActionExecutionContext context;
private FakeExecutionService executionService;
private RemoteOptions remoteOptions;
private Server fakeServer;
public class ExperimentalGrpcRemoteExecutorTest extends GrpcRemoteExecutorTestBase {
private ListeningScheduledExecutorService retryService;
ExperimentalGrpcRemoteExecutor executor;

private static final int MAX_RETRY_ATTEMPTS = 5;

private static final OutputFile DUMMY_OUTPUT =
OutputFile.newBuilder()
.setPath("dummy.txt")
.setDigest(
Digest.newBuilder()
.setHash("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
.setSizeBytes(0)
.build())
.build();

private static final ExecuteRequest DUMMY_REQUEST =
ExecuteRequest.newBuilder()
.setInstanceName("dummy")
.setActionDigest(
Digest.newBuilder()
.setHash("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
.setSizeBytes(0)
.build())
.build();

private static final ExecuteResponse DUMMY_RESPONSE =
ExecuteResponse.newBuilder()
.setResult(ActionResult.newBuilder().addOutputFiles(DUMMY_OUTPUT).build())
.build();

@Before
public final void setUp() throws Exception {
context = RemoteActionExecutionContext.create(RequestMetadata.getDefaultInstance());

executionService = new FakeExecutionService();

String fakeServerName = "fake server for " + getClass();
// Use a mutable service registry for later registering the service impl for each test case.
fakeServer =
InProcessServerBuilder.forName(fakeServerName)
.addService(executionService)
.directExecutor()
.build()
.start();

remoteOptions = Options.getDefaults(RemoteOptions.class);
remoteOptions.remoteMaxRetryAttempts = MAX_RETRY_ATTEMPTS;

retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
@Override
protected RemoteExecutionClient createExecutionService(ServerCapabilities caps, ReferenceCountedChannel channel) throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(
() -> new ExponentialBackoff(remoteOptions),
RemoteRetrier.RETRIABLE_GRPC_ERRORS,
retryService);
ReferenceCountedChannel channel =
new ReferenceCountedChannel(
new ChannelConnectionFactory() {
@Override
public Single<? extends ChannelConnection> create() {
ManagedChannel ch =
InProcessChannelBuilder.forName(fakeServerName)
.intercept(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions))
.directExecutor()
.build();
return Single.just(new ChannelConnection(ch));
}

@Override
public int maxConcurrency() {
return 100;
}
});

ServerCapabilities caps =
ServerCapabilities.newBuilder()
.setExecutionCapabilities(
ExecutionCapabilities.newBuilder().setExecEnabled(true).build())
.build();
return new ExperimentalGrpcRemoteExecutor(
caps, remoteOptions, channel, CallCredentialsProvider.NO_CREDENTIALS, retrier);
}

executor =
new ExperimentalGrpcRemoteExecutor(
caps, remoteOptions, channel, CallCredentialsProvider.NO_CREDENTIALS, retrier);
@Override
public void setUp() throws Exception {
retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
super.setUp();
}

@After
@Override
public void tearDown() throws Exception {
retryService.shutdownNow();
retryService.awaitTermination(
com.google.devtools.build.lib.testutil.TestUtils.WAIT_TIMEOUT_SECONDS, SECONDS);

fakeServer.shutdownNow();
fakeServer.awaitTermination();

executor.close();
}

@Test
public void executeRemotely_smoke() throws Exception {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenAck().thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);

assertThat(response).isEqualTo(DUMMY_RESPONSE);
assertThat(executionService.getExecTimes()).isEqualTo(1);
}

@Test
public void executeRemotely_errorInOperation_retryExecute() throws Exception {
executionService.whenExecute(DUMMY_REQUEST).thenError(new RuntimeException("Unavailable"));
executionService.whenExecute(DUMMY_REQUEST).thenError(Code.UNAVAILABLE);
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);

assertThat(executionService.getExecTimes()).isEqualTo(3);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
}

@Test
public void executeRemotely_errorInResponse_retryExecute() throws Exception {
executionService
.whenExecute(DUMMY_REQUEST)
.thenDone(
ExecuteResponse.newBuilder()
.setStatus(com.google.rpc.Status.newBuilder().setCode(Code.UNAVAILABLE_VALUE))
.build());
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);

assertThat(executionService.getExecTimes()).isEqualTo(2);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
}

@Test
public void executeRemotely_unretriableErrorInResponse_reportError() {
executionService
.whenExecute(DUMMY_REQUEST)
.thenDone(
ExecuteResponse.newBuilder()
.setStatus(com.google.rpc.Status.newBuilder().setCode(Code.INVALID_ARGUMENT_VALUE))
.build());
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);

IOException e =
assertThrows(
IOException.class,
() -> {
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);
});

assertThat(e).hasMessageThat().contains("INVALID_ARGUMENT");
assertThat(executionService.getExecTimes()).isEqualTo(1);
}

@Test
public void executeRemotely_retryExecuteAndFail() {
for (int i = 0; i <= MAX_RETRY_ATTEMPTS * 2; ++i) {
executionService.whenExecute(DUMMY_REQUEST).thenError(Code.UNAVAILABLE);
}

IOException exception =
assertThrows(
IOException.class,
() -> {
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);
});

assertThat(executionService.getExecTimes()).isEqualTo(MAX_RETRY_ATTEMPTS + 1);
assertThat(exception).hasMessageThat().contains("UNAVAILABLE");
}

@Test
public void executeRemotely_executeAndWait() throws Exception {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);

assertThat(executionService.getExecTimes()).isEqualTo(1);
assertThat(executionService.getWaitTimes()).isEqualTo(1);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
}

@Test
public void executeRemotely_executeAndRetryWait() throws Exception {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);

assertThat(executionService.getExecTimes()).isEqualTo(1);
assertThat(executionService.getWaitTimes()).isEqualTo(1);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
super.tearDown();
}

@Test
public void executeRemotely_executeAndRetryWait_forever() throws Exception {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
int errorTimes = MAX_RETRY_ATTEMPTS * 2;
for (int i = 0; i < errorTimes; ++i) {
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.DEADLINE_EXCEEDED);
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Status.DEADLINE_EXCEEDED.asRuntimeException());
}
executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);

Expand All @@ -282,7 +84,7 @@ public void executeRemotely_executeAndRetryWait_forever() throws Exception {

@Test
public void executeRemotely_executeAndRetryWait_failForConsecutiveErrors() {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
for (int i = 0; i < MAX_RETRY_ATTEMPTS * 2; ++i) {
executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.UNAVAILABLE);
}
Expand Down Expand Up @@ -340,23 +142,10 @@ public void executeRemotely_responseWithoutResult_shouldNotCrash() {
assertThat(executionService.getExecTimes()).isEqualTo(1);
}

@Test
public void executeRemotely_retryExecuteWhenUnauthenticated()
throws IOException, InterruptedException {
executionService.whenExecute(DUMMY_REQUEST).thenError(Code.UNAUTHENTICATED);
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);

assertThat(executionService.getExecTimes()).isEqualTo(2);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
}

@Test
public void executeRemotely_retryWaitExecutionWhenUnauthenticated()
throws IOException, InterruptedException {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.UNAUTHENTICATED);
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);

Expand All @@ -367,53 +156,4 @@ public void executeRemotely_retryWaitExecutionWhenUnauthenticated()
assertThat(executionService.getWaitTimes()).isEqualTo(2);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
}

@Test
public void executeRemotely_retryExecuteIfNotFound() throws IOException, InterruptedException {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.NOT_FOUND);
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);

assertThat(executionService.getExecTimes()).isEqualTo(2);
assertThat(executionService.getWaitTimes()).isEqualTo(2);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
}

@Test
public void executeRemotely_notFoundLoop_reportError() {
for (int i = 0; i <= MAX_RETRY_ATTEMPTS * 2; ++i) {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.NOT_FOUND);
}

IOException e =
assertThrows(
IOException.class,
() -> {
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);
});

assertThat(e).hasCauseThat().isInstanceOf(ExecutionStatusException.class);
ExecutionStatusException executionStatusException = (ExecutionStatusException) e.getCause();
assertThat(executionStatusException.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
assertThat(executionService.getExecTimes()).isEqualTo(MAX_RETRY_ATTEMPTS + 1);
assertThat(executionService.getWaitTimes()).isEqualTo(MAX_RETRY_ATTEMPTS + 1);
}

@Test
public void executeRemotely_notifyObserver() throws IOException, InterruptedException {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);

List<Operation> notified = new ArrayList<>();
executor.executeRemotely(context, DUMMY_REQUEST, notified::add);

assertThat(notified)
.containsExactly(
FakeExecutionService.ackOperation(DUMMY_REQUEST),
FakeExecutionService.doneOperation(DUMMY_REQUEST, DUMMY_RESPONSE));
}
}
Loading
Loading