From 6386cda3b5aa2592ab24f973d3d61aba7b2901c3 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Mon, 20 Mar 2023 17:14:09 -0700 Subject: [PATCH 1/6] examples:Client and server sharing example --- examples/BUILD.bazel | 16 ++ examples/build.gradle | 16 ++ .../grpc/examples/multiplex/EchoService.java | 110 ++++++++ .../multiplex/MultiplexingServer.java | 109 ++++++++ .../examples/multiplex/SharingClient.java | 246 ++++++++++++++++++ 5 files changed, 497 insertions(+) create mode 100644 examples/src/main/java/io/grpc/examples/multiplex/EchoService.java create mode 100644 examples/src/main/java/io/grpc/examples/multiplex/MultiplexingServer.java create mode 100644 examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index b148941a0d4..49b993ae855 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -1,6 +1,22 @@ load("@rules_proto//proto:defs.bzl", "proto_library") load("@io_grpc_grpc_java//:java_grpc_library.bzl", "java_grpc_library") +proto_library( + name = "echo_proto", + srcs = ["src/main/proto/echo.proto"], +) + +java_proto_library( + name = "echo_java_proto", + deps = [":echo_proto"], +) + +java_grpc_library( + name = "echo_java_grpc", + srcs = [":echo_proto"], + deps = [":echo_java_proto"], +) + proto_library( name = "helloworld_proto", srcs = ["src/main/proto/helloworld.proto"], diff --git a/examples/build.gradle b/examples/build.gradle index 5455cdd9eff..5bcbbb6d330 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -216,6 +216,20 @@ task cancellationServer(type: CreateStartScripts) { classpath = startScripts.classpath } +task multiplexingServer(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.multiplex.MultiplexingServer' + applicationName = 'multiplexing-server' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + +task sharingClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.multiplex.SharingClient' + applicationName = 'sharing-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + applicationDistribution.into('bin') { from(routeGuideServer) from(routeGuideClient) @@ -239,5 +253,7 @@ applicationDistribution.into('bin') { from(keepAliveClient) from(cancellationClient) from(cancellationServer) + from(multiplexingServer) + from(sharingClient) fileMode = 0755 } diff --git a/examples/src/main/java/io/grpc/examples/multiplex/EchoService.java b/examples/src/main/java/io/grpc/examples/multiplex/EchoService.java new file mode 100644 index 00000000000..69603f9567e --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/multiplex/EchoService.java @@ -0,0 +1,110 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.multiplex; + +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.Server; +import io.grpc.examples.echo.EchoGrpc; +import io.grpc.examples.echo.EchoRequest; +import io.grpc.examples.echo.EchoResponse; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Service that echoes back whatever is sent to it. + */ +public class EchoService extends EchoGrpc.EchoImplBase { + private static final Logger logger = Logger.getLogger(EchoService.class.getName()); + + @Override + public void unaryEcho(EchoRequest request, + StreamObserver responseObserver) { + logger.info("Received echo request: " + request.getMessage()); + EchoResponse response = EchoResponse.newBuilder().setMessage(request.getMessage()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void serverStreamingEcho(EchoRequest request, + StreamObserver responseObserver) { + logger.info("Received server streaming echo request: " + request.getMessage()); + EchoResponse response = EchoResponse.newBuilder().setMessage(request.getMessage()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public StreamObserver clientStreamingEcho( + final StreamObserver responseObserver) { + return new StreamObserver() { + List requestList = new ArrayList<>(); + + @Override + public void onNext(EchoRequest request) { + logger.info("Received client streaming echo request: " + request.getMessage()); + requestList.add(request.getMessage()); + } + + @Override + public void onError(Throwable t) { + logger.log(Level.WARNING, "echo stream cancelled or had a problem and is no longer usable " + t.getMessage()); + responseObserver.onError(t); + } + + @Override + public void onCompleted() { + logger.info("Client streaming complete"); + String reply = requestList.stream().collect(Collectors.joining(", ")); + EchoResponse response = EchoResponse.newBuilder().setMessage(reply).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver bidirectionalStreamingEcho( + final StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(EchoRequest request) { + logger.info("Received bidirection streaming echo request: " + request.getMessage()); + EchoResponse response = EchoResponse.newBuilder().setMessage(request.getMessage()).build(); + responseObserver.onNext(response); + } + + @Override + public void onError(Throwable t) { + logger.log(Level.WARNING, + "echo stream cancelled or had a problem and is no longer usable " + t.getMessage()); + } + + @Override + public void onCompleted() { + logger.info("Bidirectional stream completed from client side"); + responseObserver.onCompleted(); + } + }; + } +} + diff --git a/examples/src/main/java/io/grpc/examples/multiplex/MultiplexingServer.java b/examples/src/main/java/io/grpc/examples/multiplex/MultiplexingServer.java new file mode 100644 index 00000000000..a52aba06599 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/multiplex/MultiplexingServer.java @@ -0,0 +1,109 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.multiplex; + +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.examples.echo.EchoGrpc; +import io.grpc.examples.echo.EchoRequest; +import io.grpc.examples.echo.EchoResponse; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * A sample gRPC server that serves both the Greeting and Echo services. + */ +public class MultiplexingServer { + + private static final Logger logger = Logger.getLogger(MultiplexingServer.class.getName()); + + private final int port; + private Server server; + + public MultiplexingServer(int port) throws IOException { + this.port = port; + } + + private void start() throws IOException { + /* The port on which the server should run */ + server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(new MultiplexingServer.GreeterImpl()) + .addService(new EchoService()) + .build() + .start(); + logger.info("Server started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + MultiplexingServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + } + }); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + System.setProperty("java.util.logging.SimpleFormatter.format", + "%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n"); + + final MultiplexingServer server = new MultiplexingServer(50051); + server.start(); + server.blockUntilShutdown(); + } + + static class GreeterImpl extends GreeterGrpc.GreeterImplBase { + + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + logger.info("Received sayHello request: " + req.getName()); + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + } +} diff --git a/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java b/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java new file mode 100644 index 00000000000..1e57a862c37 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java @@ -0,0 +1,246 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.multiplex; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Channel; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.examples.echo.EchoGrpc; +import io.grpc.examples.echo.EchoRequest; +import io.grpc.examples.echo.EchoResponse; +import io.grpc.examples.helloworld.HelloWorldClient; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + + +/** + * A client that shares a channel across multiple stubs to a single service and across services + * being provided by one server process. + */ +public class SharingClient { + private static final Logger logger = Logger.getLogger( + HelloWorldClient.class.getName()); + + private final GreeterGrpc.GreeterBlockingStub greeterStub1; + private final GreeterGrpc.GreeterBlockingStub greeterStub2; + private final EchoGrpc.EchoStub echoStub; + + private Random random = new Random(); + + /** Construct client for accessing HelloWorld server using the existing channel. */ + public SharingClient(Channel channel) { + // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to + // shut it down. + + // Passing Channels to code makes code easier to test and makes it easier to reuse Channels. + greeterStub1 = GreeterGrpc.newBlockingStub(channel); + greeterStub2 = GreeterGrpc.newBlockingStub(channel); + echoStub = EchoGrpc.newStub(channel); + } + + /** Say hello to server. */ + private void greet(String name, GreeterGrpc.GreeterBlockingStub stub, String stubName) + throws InterruptedException { + System.out.println("Will try to greet " + name + " using " + stubName); + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response; + try { + response = stub.sayHello(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + System.out.println("Greeting: " + response.getMessage()); + // pause to allow interleaving + Thread.sleep(1000); + } + + public void greet1(String name) throws InterruptedException { + greet(name, greeterStub1, "greeter #1"); + } + + public void greet2(String name) throws InterruptedException { + greet(name, greeterStub2, "greeter #2"); + } + + public StreamingFuture> initiateEchos(List valuesToSend) { + StreamingFuture> future = new StreamingFuture> (); + List valuesReceived = new ArrayList<>(); + + // The logic that gets called by the framework during the RPC's lifecycle + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(EchoResponse response) { + valuesReceived.add(response.getMessage()); + } + + @Override + public void onError(Throwable t) { + logger.warning("Echo Failed: {0}" + Status.fromThrowable(t)); + future.setException(t); + } + + @Override + public void onCompleted() { + System.out.println("Server acknowledged end of echo stream."); + future.set(valuesReceived); + } + }; + + future.setObserver(responseObserver); + + new Thread(new Runnable() { + public void run() { + StreamObserver requestObserver = + echoStub.bidirectionalStreamingEcho(responseObserver); + + try { + for (String curValue : valuesToSend) { + EchoRequest req = EchoRequest.newBuilder().setMessage(curValue).build(); + requestObserver.onNext(req); + + // Sleep for a bit before sending the next one. + Thread.sleep(random.nextInt(1000) + 500); + } + } catch (RuntimeException e) { + // Cancel RPC + requestObserver.onError(e); + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + requestObserver.onError(e); + } + + + // Mark the end of requests + requestObserver.onCompleted(); + } + }).start(); + + return future; + } + + /** + * Greet server. If provided, the first element of {@code args} is the name to use in the + * greeting. The second argument is the target server. + * You can see the multiplexing in the server logs. + */ + public static void main(String[] args) throws Exception { + String user = "world"; + // Access a service running on the local machine on port 50051 + String target = "localhost:50051"; + // Allow passing in the user and target strings as command line arguments + if (args.length > 0) { + if ("--help".equals(args[0])) { + System.err.println("Usage: [name [target]]"); + System.err.println(""); + System.err.println(" name The name you wish to be greeted by. Defaults to " + user); + System.err.println(" target The server to connect to. Defaults to " + target); + System.exit(1); + } + user = args[0]; + } + if (args.length > 1) { + target = args[1]; + } + + // Create a communication channel to the server, known as a Channel. Channels are thread-safe + // and reusable. It is common to create channels at the beginning of your application and reuse + // them until the application shuts down. + // + // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To + // use TLS, use TlsChannelCredentials instead. + ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .build(); + List echoInput = ImmutableList.of("some", "thing", "wicked", "this", "way", "comes"); + try { + SharingClient client = new SharingClient(channel); + + StreamingFuture> future = client.initiateEchos(echoInput); + client.greet1(user + " the great"); + client.greet2(user + " the lesser"); + client.greet1(user + " the humble"); + // Receiving happens asynchronously + + String resultStr = future.get(1, TimeUnit.MINUTES).toString(); + System.out.println("The echo requests and results were:"); + System.out.println(echoInput.toString()); + System.out.println(resultStr); + + if (!future.isDone()) { + System.err.println("Streaming rpc failed to complete in 1 minute"); + } + } finally { + // ManagedChannels use resources like threads and TCP connections. To prevent leaking these + // resources the channel should be shut down when it will no longer be used. If it may be used + // again leave it running. + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } + + private class StreamingFuture extends AbstractFuture { + private StreamObserver requestObserver = null; + + private void setObserver(StreamObserver requestObserver) { + this.requestObserver = requestObserver; + } + + @Override + protected void interruptTask() { + if (requestObserver != null) { + requestObserver.onError(new RuntimeException("Stream was cancelled through a future")); + } + } + + @Override + protected boolean set(@Nullable RespT resp) { + return super.set(resp); + } + + @Override + protected boolean setException(Throwable throwable) { + return super.setException(throwable); + } + + @Override + protected String pendingToString() { + if (isCancelled()) { + return "Cancelled"; + } + + return super.pendingToString(); + } + } + +} From ecd51dc92f232a22115f093bd6d3c7b4607c6c82 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Thu, 23 Mar 2023 23:57:54 +0000 Subject: [PATCH 2/6] improved client output --- .../src/main/java/io/grpc/examples/multiplex/SharingClient.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java b/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java index 1e57a862c37..6b8ee189753 100644 --- a/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java +++ b/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java @@ -102,6 +102,7 @@ public StreamingFuture> initiateEchos(List valuesToSend) { StreamObserver responseObserver = new StreamObserver() { @Override public void onNext(EchoResponse response) { + System.out.println("Received an echo: " + response.getMessage()); valuesReceived.add(response.getMessage()); } @@ -127,6 +128,7 @@ public void run() { try { for (String curValue : valuesToSend) { + System.out.println("Sending an echo request for: " + curValue); EchoRequest req = EchoRequest.newBuilder().setMessage(curValue).build(); requestObserver.onNext(req); From eab66a5d7f21202d558c872b7b5167258ff3edfd Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 24 Mar 2023 19:02:18 +0000 Subject: [PATCH 3/6] removed duplicate echo.proto rules from bazel file --- examples/BUILD.bazel | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index 49b993ae855..b148941a0d4 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -1,22 +1,6 @@ load("@rules_proto//proto:defs.bzl", "proto_library") load("@io_grpc_grpc_java//:java_grpc_library.bzl", "java_grpc_library") -proto_library( - name = "echo_proto", - srcs = ["src/main/proto/echo.proto"], -) - -java_proto_library( - name = "echo_java_proto", - deps = [":echo_proto"], -) - -java_grpc_library( - name = "echo_java_grpc", - srcs = [":echo_proto"], - deps = [":echo_java_proto"], -) - proto_library( name = "helloworld_proto", srcs = ["src/main/proto/helloworld.proto"], From 1b74623a174b07ab4a9875a0cc033e3fffcb20c3 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 24 Mar 2023 16:07:45 -0700 Subject: [PATCH 4/6] Addressed review comments. --- .../examples/multiplex/SharingClient.java | 36 ++++--------------- 1 file changed, 7 insertions(+), 29 deletions(-) diff --git a/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java b/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java index 6b8ee189753..b1caba5b394 100644 --- a/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java +++ b/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java @@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.MoreExecutors; import io.grpc.Channel; import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; @@ -34,10 +33,8 @@ import io.grpc.examples.helloworld.HelloWorldClient; import io.grpc.stub.StreamObserver; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Random; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -142,9 +139,9 @@ public void run() { } catch (InterruptedException e) { Thread.currentThread().interrupt(); requestObserver.onError(e); + return; } - // Mark the end of requests requestObserver.onCompleted(); } @@ -212,37 +209,18 @@ public static void main(String[] args) throws Exception { } private class StreamingFuture extends AbstractFuture { - private StreamObserver requestObserver = null; - - private void setObserver(StreamObserver requestObserver) { - this.requestObserver = requestObserver; - } - - @Override - protected void interruptTask() { - if (requestObserver != null) { - requestObserver.onError(new RuntimeException("Stream was cancelled through a future")); - } - } - @Override - protected boolean set(@Nullable RespT resp) { - return super.set(resp); - } + private StreamObserver responseObserver = null; - @Override - protected boolean setException(Throwable throwable) { - return super.setException(throwable); + private void setObserver(StreamObserver responseObserver) { + this.responseObserver = responseObserver; } @Override - protected String pendingToString() { - if (isCancelled()) { - return "Cancelled"; + protected void interruptTask() { + if (responseObserver != null) { + responseObserver.onError(Status.ABORTED.asException()); } - - return super.pendingToString(); } } - } From ba21ba94de0561d34ec5702b239c3459e2aa785f Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 24 Mar 2023 23:28:53 +0000 Subject: [PATCH 5/6] Fixed syntax errors --- .../io/grpc/examples/multiplex/SharingClient.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java b/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java index b1caba5b394..d4128b11258 100644 --- a/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java +++ b/examples/src/main/java/io/grpc/examples/multiplex/SharingClient.java @@ -221,6 +221,19 @@ protected void interruptTask() { if (responseObserver != null) { responseObserver.onError(Status.ABORTED.asException()); } + + } + + // These are needed for visibility from the parent object + @Override + protected boolean set(@Nullable RespT resp) { + return super.set(resp); } + + @Override + protected boolean setException(Throwable throwable) { + return super.setException(throwable); + } + } } From cf70d550efca5ae5e01bafeaab7358d529c46e9d Mon Sep 17 00:00:00 2001 From: Larry Safran <107004254+larry-safran@users.noreply.github.com> Date: Mon, 27 Mar 2023 13:07:01 -0700 Subject: [PATCH 6/6] Add missing call to responseObserver.onError(t) --- .../src/main/java/io/grpc/examples/multiplex/EchoService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/src/main/java/io/grpc/examples/multiplex/EchoService.java b/examples/src/main/java/io/grpc/examples/multiplex/EchoService.java index 69603f9567e..f7410f34ce4 100644 --- a/examples/src/main/java/io/grpc/examples/multiplex/EchoService.java +++ b/examples/src/main/java/io/grpc/examples/multiplex/EchoService.java @@ -97,6 +97,7 @@ public void onNext(EchoRequest request) { public void onError(Throwable t) { logger.log(Level.WARNING, "echo stream cancelled or had a problem and is no longer usable " + t.getMessage()); + responseObserver.onError(t); } @Override