diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index 3c63d7a6eb7..b148941a0d4 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -51,6 +51,22 @@ java_grpc_library( deps = [":route_guide_java_proto"], ) +proto_library( + name = "echo_proto", + srcs = ["src/main/proto/grpc/examples/echo/echo.proto"], +) + +java_proto_library( + name = "echo_java_proto", + deps = [":echo_proto"], +) + +java_grpc_library( + name = "echo_java_grpc", + srcs = [":echo_proto"], + deps = [":echo_java_proto"], +) + java_library( name = "examples", testonly = 1, @@ -64,6 +80,8 @@ java_library( "@io_grpc_grpc_java//netty", ], deps = [ + ":echo_java_grpc", + ":echo_java_proto", ":hello_streaming_java_grpc", ":hello_streaming_java_proto", ":helloworld_java_grpc", @@ -73,6 +91,7 @@ java_library( "@com_google_protobuf//:protobuf_java", "@com_google_protobuf//:protobuf_java_util", "@io_grpc_grpc_java//api", + "@io_grpc_grpc_java//context", "@io_grpc_grpc_java//protobuf", "@io_grpc_grpc_java//stub", "@maven//:com_google_api_grpc_proto_google_common_protos", diff --git a/examples/build.gradle b/examples/build.gradle index bd3fe1ec198..781d62cac70 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -195,6 +195,20 @@ task keepAliveClient(type: CreateStartScripts) { classpath = startScripts.classpath } +task cancellationClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.cancellation.CancellationClient' + applicationName = 'cancellation-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + +task cancellationServer(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.cancellation.CancellationServer' + applicationName = 'cancellation-server' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + applicationDistribution.into('bin') { from(routeGuideServer) from(routeGuideClient) @@ -215,5 +229,7 @@ applicationDistribution.into('bin') { from(deadlineClient) from(keepAliveServer) from(keepAliveClient) + from(cancellationClient) + from(cancellationServer) fileMode = 0755 } diff --git a/examples/src/main/java/io/grpc/examples/cancellation/CancellationClient.java b/examples/src/main/java/io/grpc/examples/cancellation/CancellationClient.java new file mode 100644 index 00000000000..fa31ee8e6c1 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/cancellation/CancellationClient.java @@ -0,0 +1,204 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.cancellation; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Channel; +import io.grpc.Context; +import io.grpc.Context.CancellableContext; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.echo.EchoGrpc; +import io.grpc.examples.echo.EchoRequest; +import io.grpc.examples.echo.EchoResponse; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.TimeUnit; + +/** + * A client that cancels RPCs to an Echo server. + */ +public class CancellationClient { + private final Channel channel; + + public CancellationClient(Channel channel) { + this.channel = channel; + } + + private void demonstrateCancellation() throws Exception { + echoBlocking("I'M A BLOCKING CLIENT! HEAR ME ROAR!"); + + // io.grpc.Context can be used to cancel RPCs using any of the stubs. It is the only way to + // cancel blocking stub RPCs. io.grpc.Context is a general-purpose alternative to thread + // interruption and can be used outside of gRPC, like to coordinate within your application. + // + // CancellableContext must always be cancelled or closed at the end of its lifetime, otherwise + // it could "leak" memory. + try (CancellableContext context = Context.current().withCancellation()) { + new Thread(() -> { + try { + Thread.sleep(500); // Do some work + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + // Cancellation reasons are never sent to the server. But they are echoed back to the + // client as the RPC failure reason. + context.cancel(new RuntimeException("Oops. Messed that up, let me try again")); + }).start(); + + // context.run() attaches the context to this thread for gRPC to observe. It also restores + // the previous context before returning. + context.run(() -> echoBlocking("RAAWRR haha lol hehe AWWRR GRRR")); + } + + // Futures cancelled with interruption cancel the RPC. + ListenableFuture future = echoFuture("Future clie*cough*nt was here!"); + Thread.sleep(500); // Do some work + // We realize we really don't want to hear that echo. + future.cancel(true); + Thread.sleep(100); // Make logs more obvious. Cancel is async + + ClientCallStreamObserver reqCallObserver = echoAsync("Testing, testing, 1, 2, 3"); + reqCallObserver.onCompleted(); + Thread.sleep(500); // Make logs more obvious. Wait for completion + + // Async's onError() will cancel. But the method can't be called concurrently with other calls + // on the StreamObserver. If you need thread-safety, use CancellableContext as above. + StreamObserver reqObserver = echoAsync("... async client... is the... best..."); + try { + Thread.sleep(500); // Do some work + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + // Since reqObserver.onCompleted() hasn't been called, we can use onError(). + reqObserver.onError(new RuntimeException("That was weak...")); + Thread.sleep(100); // Make logs more obvious. Cancel is async + + // Async's cancel() will cancel. Also may not be called concurrently with other calls on the + // StreamObserver. + reqCallObserver = echoAsync("Async client or bust!"); + reqCallObserver.onCompleted(); + try { + Thread.sleep(250); // Do some work + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + // Since onCompleted() has been called, we can't use onError(). It is safe to use cancel() + // regardless of onCompleted() being called. + reqCallObserver.cancel("That's enough. I'm bored", null); + Thread.sleep(100); // Make logs more obvious. Cancel is async + } + + /** Say hello to server, just like in helloworld example. */ + public void echoBlocking(String text) { + System.out.println("\nYelling: " + text); + EchoRequest request = EchoRequest.newBuilder().setMessage(text).build(); + EchoResponse response; + try { + response = EchoGrpc.newBlockingStub(channel).unaryEcho(request); + } catch (StatusRuntimeException e) { + System.out.println("RPC failed: " + e.getStatus()); + return; + } + System.out.println("Echo: " + response.getMessage()); + } + + /** Say hello to the server, but using future API. */ + public ListenableFuture echoFuture(String text) { + System.out.println("\nYelling: " + text); + EchoRequest request = EchoRequest.newBuilder().setMessage(text).build(); + ListenableFuture future = EchoGrpc.newFutureStub(channel).unaryEcho(request); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(EchoResponse response) { + System.out.println("Echo: " + response.getMessage()); + } + + @Override + public void onFailure(Throwable t) { + System.out.println("RPC failed: " + Status.fromThrowable(t)); + } + }, MoreExecutors.directExecutor()); + return future; + } + + /** Say hello to the server, but using async API and cancelling. */ + public ClientCallStreamObserver echoAsync(String text) { + System.out.println("\nYelling: " + text); + EchoRequest request = EchoRequest.newBuilder().setMessage(text).build(); + + // Client-streaming and bidirectional RPCs can cast the returned StreamObserver to + // ClientCallStreamObserver. + // + // Unary and server-streaming stub methods don't return a StreamObserver. For such RPCs, you can + // use ClientResponseObserver to get the ClientCallStreamObserver. For example: + // EchoGrpc.newStub(channel).unaryEcho(new ClientResponseObserver() {...}); + // Since ClientCallStreamObserver.cancel() is not thread-safe, it isn't safe to call from + // another thread until the RPC stub method (e.g., unaryEcho()) returns. + ClientCallStreamObserver reqObserver = (ClientCallStreamObserver) + EchoGrpc.newStub(channel).bidirectionalStreamingEcho(new StreamObserver() { + @Override + public void onNext(EchoResponse response) { + System.out.println("Echo: " + response.getMessage()); + } + + @Override + public void onCompleted() { + System.out.println("RPC completed"); + } + + @Override + public void onError(Throwable t) { + System.out.println("RPC failed: " + Status.fromThrowable(t)); + } + }); + + reqObserver.onNext(request); + return reqObserver; + } + + /** + * Cancel RPCs to a server. If provided, the first element of {@code args} is the target server. + */ + public static void main(String[] args) throws Exception { + String target = "localhost:50051"; + if (args.length > 0) { + if ("--help".equals(args[0])) { + System.err.println("Usage: [target]"); + System.err.println(""); + System.err.println(" target The server to connect to. Defaults to " + target); + System.exit(1); + } + target = args[0]; + } + + ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .build(); + try { + CancellationClient client = new CancellationClient(channel); + client.demonstrateCancellation(); + } finally { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } +} diff --git a/examples/src/main/java/io/grpc/examples/cancellation/CancellationServer.java b/examples/src/main/java/io/grpc/examples/cancellation/CancellationServer.java new file mode 100644 index 00000000000..cf26e974f5f --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/cancellation/CancellationServer.java @@ -0,0 +1,206 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.cancellation; + +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Context; +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.examples.echo.EchoGrpc; +import io.grpc.examples.echo.EchoRequest; +import io.grpc.examples.echo.EchoResponse; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * Server that manages startup/shutdown of a {@code Greeter} server. + * + *

Any abort of an ongoing RPC is considered "cancellation" of that RPC. The common causes of + * cancellation are the client explicitly cancelling, the deadline expires, and I/O failures. The + * service is not informed the reason for the cancellation. + * + *

There are two APIs for services to be notified of RPC cancellation: io.grpc.Context and + * ServerCallStreamObserver. Context listeners are called on a different thread, so need to be + * thread-safe. The ServerCallStreamObserver cancellation callback is called like other + * StreamObserver callbacks, so the application may not need thread-safe handling. Both APIs have + * thread-safe isCancelled() polling methods. + */ +public class CancellationServer { + public static void main(String[] args) throws IOException, InterruptedException { + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + int port = 50051; + Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(new SlowEcho(scheduler)) + .build() + .start(); + System.out.println("Server started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + } + }); + server.awaitTermination(); + scheduler.shutdown(); + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } + + static class SlowEcho extends EchoGrpc.EchoImplBase { + private final ScheduledExecutorService scheduler; + + /** {@code scheduler} must be single-threaded. */ + public SlowEcho(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + + /** + * Repeatedly echos each request until the client has no more requests. It performs all work + * asynchronously on a single thread. It uses ServerCallStreamObserver to be notified of RPC + * cancellation. + */ + @Override + public StreamObserver bidirectionalStreamingEcho( + StreamObserver responseObserver) { + // If the service is truly asynchronous, using ServerCallStreamObserver to receive + // cancellation notifications tends to work well. + + // It is safe to cast the provided observer to ServerCallStreamObserver. + ServerCallStreamObserver responseCallObserver = + (ServerCallStreamObserver) responseObserver; + System.out.println("\nBidi RPC started"); + class EchoObserver implements StreamObserver { + private static final int delayMs = 200; + private final List> echos = new ArrayList<>(); + + @Override + public void onNext(EchoRequest request) { + System.out.println("Bidi RPC received request: " + request.getMessage()); + EchoResponse response + = EchoResponse.newBuilder().setMessage(request.getMessage()).build(); + Runnable echo = () -> responseObserver.onNext(response); + echos.add(scheduler.scheduleAtFixedRate(echo, delayMs, delayMs, TimeUnit.MILLISECONDS)); + } + + @Override + public void onCompleted() { + System.out.println("Bidi RPC client finished"); + // Let each echo happen two more times, and then stop. + List> echosCopy = new ArrayList<>(echos); + Runnable complete = () -> { + stopEchos(echosCopy); + responseObserver.onCompleted(); + System.out.println("Bidi RPC completed"); + }; + echos.add(scheduler.schedule(complete, 2*delayMs, TimeUnit.MILLISECONDS)); + } + + @Override + public void onError(Throwable t) { + System.out.println("Bidi RPC failed: " + Status.fromThrowable(t)); + stopEchos(echos); + scheduler.execute(() -> responseObserver.onError(t)); + } + + public void onCancel() { + // If onCompleted() hasn't been called by this point, then this method and onError are + // both called. If onCompleted() has been called, then just this method is called. + System.out.println("Bidi RPC cancelled"); + stopEchos(echos); + } + + private void stopEchos(List> echos) { + for (Future echo : echos) { + echo.cancel(false); + } + } + } + + EchoObserver requestObserver = new EchoObserver(); + // onCancel() can be called even after the service completes or fails the RPC, because + // callbacks are racy and the response still has to be sent to the client. Use + // setOnCloseHandler() to be notified when the RPC completed without cancellation (as best as + // the server is able to tell). + responseCallObserver.setOnCancelHandler(requestObserver::onCancel); + return requestObserver; + } + + /** + * Echos the request after a delay. It processes the request in-line within the callback. It + * uses Context to be notified of RPC cancellation. + */ + @Override + public void unaryEcho(EchoRequest request, StreamObserver responseObserver) { + // ServerCallStreamObserver.setOnCancelHandler(Runnable) is not useful for this method, since + // this method only returns once it has a result. ServerCallStreamObserver guarantees the + // Runnable is not run at the same time as other RPC callback methods (including this method), + // so the cancellation notification would be guaranteed to occur too late. + System.out.println("\nUnary RPC started: " + request.getMessage()); + Context currentContext = Context.current(); + // Let's start a multi-part operation. We can check cancellation periodically. + for (int i = 0; i < 10; i++) { + // ServerCallStreamObserver.isCancelled() returns true only if the RPC is cancelled. + // Context.isCancelled() is similar, but also returns true when the RPC completes normally. + // It doesn't matter which API is used here. + if (currentContext.isCancelled()) { + System.out.println("Unary RPC cancelled"); + responseObserver.onError( + Status.CANCELLED.withDescription("RPC cancelled").asRuntimeException()); + return; + } + + FutureTask task = new FutureTask<>(() -> { + Thread.sleep(100); // Do some work + return null; + }); + // Some Java blocking APIs have a method to cancel an ongoing operation, like closing an + // InputStream or interrupting the thread. We can use a Context listener to call that API + // from another thread if the RPC is cancelled. + Context.CancellationListener listener = (Context context) -> task.cancel(true); + Context.current().addListener(listener, MoreExecutors.directExecutor()); + task.run(); // A cancellable operation + Context.current().removeListener(listener); + + // gRPC stubs observe io.grpc.Context cancellation, so cancellation is automatically + // propagated when performing an RPC. You can use a different Context or use Context.fork() + // to disable the automatic propagation. For example, + // Context.ROOT.call(() -> futureStub.unaryEcho(request)); + // context.fork().call(() -> futureStub.unaryEcho(request)); + } + responseObserver.onNext( + EchoResponse.newBuilder().setMessage(request.getMessage()).build()); + responseObserver.onCompleted(); + System.out.println("Unary RPC completed"); + } + } +} diff --git a/examples/src/main/proto/grpc/examples/echo/echo.proto b/examples/src/main/proto/grpc/examples/echo/echo.proto new file mode 100644 index 00000000000..b18e295a9a5 --- /dev/null +++ b/examples/src/main/proto/grpc/examples/echo/echo.proto @@ -0,0 +1,48 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; + +option go_package = "google.golang.org/grpc/examples/features/proto/echo"; +option java_multiple_files = true; +option java_package = "io.grpc.examples.echo"; +option java_outer_classname = "EchoProto"; + +package grpc.examples.echo; + +// EchoRequest is the request for echo. +message EchoRequest { + string message = 1; +} + +// EchoResponse is the response for echo. +message EchoResponse { + string message = 1; +} + +// Echo is the echo service. +service Echo { + // UnaryEcho is unary echo. + rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} + // ServerStreamingEcho is server side streaming. + rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} + // ClientStreamingEcho is client side streaming. + rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} + // BidirectionalStreamingEcho is bidi streaming. + rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} +}