From 4afc5b118d468c394563f1ead178e58dc8c84b26 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Wed, 22 Mar 2023 14:31:41 -0700 Subject: [PATCH 01/21] checkpoint --- .../healthservice/HealthServiceClient.java | 127 ++++++++++++++++++ .../healthservice/HealthServiceServer.java | 117 ++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java create mode 100644 examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java new file mode 100644 index 00000000000..8a02e284a13 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java @@ -0,0 +1,127 @@ +/* + * Copyright 2015 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.helloworld; + +import io.grpc.Channel; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.StatusRuntimeException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A simple client that requests a greeting from the {@link HelloWorldServer}. + */ +public class HelloWorldClient { + private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName()); + + private final GreeterGrpc.GreeterBlockingStub blockingStub; + + private final HealthGrpc.HealthStub stub; + private final HealthGrpc.HealthBlockingStub blockingStub; + + private final HealthCheckRequest healthRequest; + + /** Construct client for accessing HelloWorld server using the existing channel. */ + public HelloWorldClient(Channel channel) { + stub = HealthGrpc.newStub(channel); + blockingStub = HealthGrpc.newBlockingStub(channel); + + healthRequest = + HealthCheckRequest.newBuilder().setService(HealthStatusManager.SERVICE_NAME_ALL_SERVICES) + .build(); + + checkHealth(); + } + + private ServingStatus checkHealth(String prefix) { + HealthCheckResponse response = + blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request); + logger.info(prefix + ", current health is: " + response.getStatus()); + return response.getStatus(); + } + + /** Say hello to server. */ + public void greet(String name) { + logger.info("Will try to greet " + name + " ..."); + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response; + try { + response = blockingStub.sayHello(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + logger.info("Greeting: " + response.getMessage()); + } + + /** + * Greet server. If provided, the first element of {@code args} is the name to use in the + * greeting. The second argument is the target server. + */ + public static void main(String[] args) throws Exception { + String[] users = {"world", "foo", "I am Grut"}; + // Access a service running on the local machine on port 50051 + String target = "localhost:50051"; + // Allow passing in the user and target strings as command line arguments + if (args.length > 0) { + if ("--help".equals(args[0])) { + System.err.println("Usage: [target [name] [name] ...]"); + System.err.println(""); + System.err.println(" target The server to connect to. Defaults to " + target); + System.err.println(" name The names you wish to be greeted by. Defaults to " + users); + System.exit(1); + } + target = args[0]; + } + if (args.length > 1) { + users = new String[args.length-1]; + for (i=0; i < users.length; i++) { + users[i] = args[i+1]; + } + } + + // Create a communication channel to the server, known as a Channel. Channels are thread-safe + // and reusable. It is common to create channels at the beginning of your application and reuse + // them until the application shuts down. + // + // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To + // use TLS, use TlsChannelCredentials instead. + ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .build(); + try { + // Set a watch + HelloWorldClient client = new HelloWorldClient(channel); + client.checkHealth("Before call"); + client.greet(user[0]); + client.checkHealth("After user " + user[0]); + for (String user : users) { + client.greet(user); + } + client.checkHealth("After all users"); + Thread.sleep(5000); + client.checkHealth("After 5 second wait"); + } finally { + // ManagedChannels use resources like threads and TCP connections. To prevent leaking these + // resources the channel should be shut down when it will no longer be used. If it may be used + // again leave it running. + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } +} diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java new file mode 100644 index 00000000000..24bc86075a9 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java @@ -0,0 +1,117 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.helloworld; + +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.Server; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * Server that manages startup/shutdown of a {@code Greeter} server. + */ +public class HealthServiceServer { + private static final Logger logger = Logger.getLogger(HealthServiceServer.class.getName()); + + private Server server; + private HealthStatusManager health; + + private void start() throws IOException { + /* The port on which the server should run */ + int port = 50051; + health = new HealthStatusManager(); + server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(new GreeterImpl()) + .addService(health.getHealthService()) + .build() + .start(); + logger.info("Server started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + HelloWorldServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + } + }); + + health.setStatus("Greeter", ServingStatus.SERVING); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + final HealthServiceServer server = new HealthServiceServer(); + server.start(); + server.blockUntilShutdown(); + } + + static class GreeterImpl extends GreeterGrpc.GreeterImplBase { + + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + + updateHealthStatus(req); + } + + private void updateHealthStatus(HelloRequest req) { + if (req.getMessage().length < 5) { + logger.warning("Tiny message received, throwing a temper tantrum"); + } + + health.setStatus("Greeter", ServingStatus.NOT_SERVING); + + // In 2 seconds set it back to serving + Context.current() + .withDeadlineAfter(2, TimeUnit.SECONDS, Executors.newSingleThreadScheduledExecutor()) + .run(new Runnable() { + @Override + public void run() { + health.setStatus("Greeter", ServingStatus.SERVING); + logger.info("tantrum complete"); + } + }); + } + } +} From bb3faec6ddc9d3e3e9b665528e4b82a8a373d362 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 24 Mar 2023 12:48:22 -0700 Subject: [PATCH 02/21] Completed health service example. --- .../healthservice/HealthServiceClient.java | 48 ++++++++++--------- .../healthservice/HealthServiceServer.java | 15 ++++-- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java index 8a02e284a13..0348ad8c764 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java @@ -14,45 +14,47 @@ * limitations under the License. */ -package io.grpc.examples.helloworld; +package io.grpc.examples.healthservice; import io.grpc.Channel; import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; import io.grpc.ManagedChannel; import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.health.v1.HealthGrpc; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** - * A simple client that requests a greeting from the {@link HelloWorldServer}. + * A client that requests a greeting from the {@link HelloWorldServer}. */ -public class HelloWorldClient { - private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName()); +public class HealthServiceClient { + private static final Logger logger = Logger.getLogger(HealthServiceClient.class.getName()); - private final GreeterGrpc.GreeterBlockingStub blockingStub; - - private final HealthGrpc.HealthStub stub; - private final HealthGrpc.HealthBlockingStub blockingStub; + private final GreeterGrpc.GreeterBlockingStub greeterBlockingStub; + private final HealthGrpc.HealthStub healthStub; + private final HealthGrpc.HealthBlockingStub healthBlockingStub; private final HealthCheckRequest healthRequest; /** Construct client for accessing HelloWorld server using the existing channel. */ - public HelloWorldClient(Channel channel) { - stub = HealthGrpc.newStub(channel); - blockingStub = HealthGrpc.newBlockingStub(channel); - - healthRequest = - HealthCheckRequest.newBuilder().setService(HealthStatusManager.SERVICE_NAME_ALL_SERVICES) - .build(); - - checkHealth(); + public HealthServiceClient(Channel channel) { + greeterBlockingStub = GreeterGrpc.newBlockingStub(channel); + healthStub = HealthGrpc.newStub(channel); + healthBlockingStub = HealthGrpc.newBlockingStub(channel); + healthRequest = HealthCheckRequest.getDefaultInstance(); } private ServingStatus checkHealth(String prefix) { HealthCheckResponse response = - blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request); + healthBlockingStub.check(healthRequest); logger.info(prefix + ", current health is: " + response.getStatus()); return response.getStatus(); } @@ -63,7 +65,7 @@ public void greet(String name) { HelloRequest request = HelloRequest.newBuilder().setName(name).build(); HelloReply response; try { - response = blockingStub.sayHello(request); + response = greeterBlockingStub.sayHello(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; @@ -92,7 +94,7 @@ public static void main(String[] args) throws Exception { } if (args.length > 1) { users = new String[args.length-1]; - for (i=0; i < users.length; i++) { + for (int i=0; i < users.length; i++) { users[i] = args[i+1]; } } @@ -107,10 +109,10 @@ public static void main(String[] args) throws Exception { .build(); try { // Set a watch - HelloWorldClient client = new HelloWorldClient(channel); + HealthServiceClient client = new HealthServiceClient(channel); client.checkHealth("Before call"); - client.greet(user[0]); - client.checkHealth("After user " + user[0]); + client.greet(users[0]); + client.checkHealth("After user " + users[0]); for (String user : users) { client.greet(user); } diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java index 24bc86075a9..4994f2666f7 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java @@ -14,13 +14,20 @@ * limitations under the License. */ -package io.grpc.examples.helloworld; +package io.grpc.examples.healthservice.; import io.grpc.Grpc; import io.grpc.InsecureServerCredentials; import io.grpc.Server; +import io.grpc.Context; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.protobuf.services.HealthStatusManager; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -49,7 +56,7 @@ public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); try { - HelloWorldServer.this.stop(); + HealthServiceServer.this.stop(); } catch (InterruptedException e) { e.printStackTrace(System.err); } @@ -84,7 +91,7 @@ public static void main(String[] args) throws IOException, InterruptedException server.blockUntilShutdown(); } - static class GreeterImpl extends GreeterGrpc.GreeterImplBase { + private class GreeterImpl extends GreeterGrpc.GreeterImplBase { @Override public void sayHello(HelloRequest req, StreamObserver responseObserver) { @@ -96,7 +103,7 @@ public void sayHello(HelloRequest req, StreamObserver responseObserv } private void updateHealthStatus(HelloRequest req) { - if (req.getMessage().length < 5) { + if (req.getName().length() < 5) { logger.warning("Tiny message received, throwing a temper tantrum"); } From a0d287a29a9e31be475b13bc3b632ee8ebfeccff Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 24 Mar 2023 22:21:34 +0000 Subject: [PATCH 03/21] Everything ready except for bazel dependency problem --- examples/BUILD.bazel | 22 +++++++ examples/build.gradle | 17 +++++ examples/pom.xml | 4 ++ .../healthservice/HealthServiceClient.java | 10 ++- .../healthservice/HealthServiceServer.java | 65 ++++++++++++------- 5 files changed, 92 insertions(+), 26 deletions(-) diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index 3c63d7a6eb7..c5770cf7294 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -73,8 +73,12 @@ java_library( "@com_google_protobuf//:protobuf_java", "@com_google_protobuf//:protobuf_java_util", "@io_grpc_grpc_java//api", + "@io_grpc_grpc_java//context", "@io_grpc_grpc_java//protobuf", + "@io_grpc_grpc_java//services:health", + "@io_grpc_grpc_java//services:healthlb", "@io_grpc_grpc_java//stub", + "@io_grpc_grpc_proto//:health_proto", "@maven//:com_google_api_grpc_proto_google_common_protos", "@maven//:com_google_code_findbugs_jsr305", "@maven//:com_google_code_gson_gson", @@ -189,3 +193,21 @@ java_binary( ":examples", ], ) + +java_binary( + name = "healthservice-server", + testonly = 1, + main_class = "io.grpc.examples.healthservice.HealthServiceServer", + runtime_deps = [ + ":examples", + ], +) + +java_binary( + name = "healthservice-client", + testonly = 1, + main_class = "io.grpc.examples.healthservice.HealthServiceClient", + runtime_deps = [ + ":examples", + ], +) diff --git a/examples/build.gradle b/examples/build.gradle index bd3fe1ec198..a842b2c026d 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -27,6 +27,7 @@ def protocVersion = protobufVersion dependencies { implementation "io.grpc:grpc-protobuf:${grpcVersion}" + implementation "io.grpc:grpc-services:${grpcVersion}" implementation "io.grpc:grpc-stub:${grpcVersion}" compileOnly "org.apache.tomcat:annotations-api:6.0.53" @@ -195,6 +196,20 @@ task keepAliveClient(type: CreateStartScripts) { classpath = startScripts.classpath } +task healthServiceServer(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.healthservice.HealthServiceServer' + applicationName = 'health-service-server' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + +task healthServiceClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.healthservice.HealthServiceClient' + applicationName = 'health-service-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + applicationDistribution.into('bin') { from(routeGuideServer) from(routeGuideClient) @@ -215,5 +230,7 @@ applicationDistribution.into('bin') { from(deadlineClient) from(keepAliveServer) from(keepAliveClient) + from(healthServiceServer) + from(healthServiceClient) fileMode = 0755 } diff --git a/examples/pom.xml b/examples/pom.xml index 4482a2b8c8f..13f8164fb42 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -42,6 +42,10 @@ io.grpc grpc-protobuf + + io.grpc + grpc-services + io.grpc grpc-stub diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java index 0348ad8c764..bd7b20bbd90 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 The gRPC Authors + * Copyright 2023 The gRPC Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,6 +78,9 @@ public void greet(String name) { * greeting. The second argument is the target server. */ public static void main(String[] args) throws Exception { + System.setProperty("java.util.logging.SimpleFormatter.format", + "%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n"); + String[] users = {"world", "foo", "I am Grut"}; // Access a service running on the local machine on port 50051 String target = "localhost:50051"; @@ -117,8 +120,9 @@ public static void main(String[] args) throws Exception { client.greet(user); } client.checkHealth("After all users"); - Thread.sleep(5000); - client.checkHealth("After 5 second wait"); + Thread.sleep(10000); + client.checkHealth("After 10 second wait"); + client.greet("Larry"); } finally { // ManagedChannels use resources like threads and TCP connections. To prevent leaking these // resources the channel should be shut down when it will no longer be used. If it may be used diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java index 4994f2666f7..81f5136df12 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java @@ -14,16 +14,17 @@ * limitations under the License. */ -package io.grpc.examples.healthservice.; +package io.grpc.examples.healthservice; import io.grpc.Grpc; +import io.grpc.Context; import io.grpc.InsecureServerCredentials; import io.grpc.Server; -import io.grpc.Context; +import io.grpc.Status; +import io.grpc.examples.helloworld.GreeterGrpc; import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; import io.grpc.health.v1.HealthCheckResponse.ServingStatus; -import io.grpc.examples.helloworld.GreeterGrpc; import io.grpc.protobuf.services.HealthStatusManager; import io.grpc.stub.StreamObserver; import java.io.IOException; @@ -64,7 +65,7 @@ public void run() { } }); - health.setStatus("Greeter", ServingStatus.SERVING); + health.setStatus("", ServingStatus.SERVING); } private void stop() throws InterruptedException { @@ -86,39 +87,57 @@ private void blockUntilShutdown() throws InterruptedException { * Main launches the server from the command line. */ public static void main(String[] args) throws IOException, InterruptedException { + System.setProperty("java.util.logging.SimpleFormatter.format", + "%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n"); + final HealthServiceServer server = new HealthServiceServer(); server.start(); server.blockUntilShutdown(); } private class GreeterImpl extends GreeterGrpc.GreeterImplBase { + boolean isServing = true; @Override public void sayHello(HelloRequest req, StreamObserver responseObserver) { - HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - - updateHealthStatus(req); + if (shouldServe(req)) { + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } else { + responseObserver.onError( + Status.INTERNAL.withDescription("Not Serving right now").asRuntimeException()); + } } - private void updateHealthStatus(HelloRequest req) { - if (req.getName().length() < 5) { - logger.warning("Tiny message received, throwing a temper tantrum"); + private boolean shouldServe(HelloRequest req) { + if (!isServing) { + return false; + } + if (req.getName().length() >= 5) { + return true; } - health.setStatus("Greeter", ServingStatus.NOT_SERVING); + logger.warning("Tiny message received, throwing a temper tantrum"); + health.setStatus("", ServingStatus.NOT_SERVING); + isServing = false; - // In 2 seconds set it back to serving - Context.current() - .withDeadlineAfter(2, TimeUnit.SECONDS, Executors.newSingleThreadScheduledExecutor()) - .run(new Runnable() { - @Override - public void run() { - health.setStatus("Greeter", ServingStatus.SERVING); - logger.info("tantrum complete"); - } - }); + // In 10 seconds set it back to serving + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + isServing = true; + health.setStatus("", ServingStatus.SERVING); + logger.info("tantrum complete"); + } + }).start(); + return false; } } } From 5a14ea8d2818194605b4e15ffcdb8810cb7615c2 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Mon, 27 Mar 2023 23:44:45 +0000 Subject: [PATCH 04/21] fix bazel --- examples/BUILD.bazel | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index c5770cf7294..8515875c1a0 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -64,6 +64,7 @@ java_library( "@io_grpc_grpc_java//netty", ], deps = [ + ":_health_java_grpc", ":hello_streaming_java_grpc", ":hello_streaming_java_proto", ":helloworld_java_grpc", @@ -211,3 +212,11 @@ java_binary( ":examples", ], ) + +java_grpc_library( + name = "_health_java_grpc", + srcs = ["@io_grpc_grpc_proto//:health_proto"], + visibility = ["//visibility:private"], + deps = ["@io_grpc_grpc_proto//:health_java_proto"], +) + From 9b25ddc59fa5568899cd494b5af7b3f944f88ed4 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Wed, 29 Mar 2023 14:45:06 -0700 Subject: [PATCH 05/21] Add round robin to the example. --- .../healthservice/HealthServiceClient.java | 83 +++++++++++++------ 1 file changed, 56 insertions(+), 27 deletions(-) diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java index bd7b20bbd90..9330fd951ac 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java @@ -19,6 +19,8 @@ import io.grpc.Channel; import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.StatusRuntimeException; import io.grpc.examples.helloworld.GreeterGrpc; @@ -28,6 +30,8 @@ import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthCheckResponse.ServingStatus; import io.grpc.health.v1.HealthGrpc; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -50,6 +54,9 @@ public HealthServiceClient(Channel channel) { healthStub = HealthGrpc.newStub(channel); healthBlockingStub = HealthGrpc.newBlockingStub(channel); healthRequest = HealthCheckRequest.getDefaultInstance(); + LoadBalancerProvider roundRobin = LoadBalancerRegistry.getDefaultRegistry() + .getProvider("round_robin"); + } private ServingStatus checkHealth(String prefix) { @@ -73,9 +80,45 @@ public void greet(String name) { logger.info("Greeting: " + response.getMessage()); } + + private static void runTest(String[] users, ManagedChannel channel) + throws InterruptedException { + try { + // Set a watch + HealthServiceClient client = new HealthServiceClient(channel); + client.checkHealth("Before call"); + client.greet(users[0]); + client.checkHealth("After user " + users[0]); + for (String user : users) { + client.greet(user); + Thread.sleep(100); // Since the health update is asynchronous give it time to propagate + } + client.checkHealth("After all users"); + Thread.sleep(10000); + client.checkHealth("After 10 second wait"); + client.greet("Larry"); + } finally { + // ManagedChannels use resources like threads and TCP connections. To prevent leaking these + // resources the channel should be shut down when it will no longer be used. If it may be used + // again leave it running. + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } + private static Map generateHealthConfig(String serviceName) { + Map config = new HashMap<>(); + Map serviceMap = new HashMap<>(); + + config.put("healthCheckConfig", serviceMap); + serviceMap.put("serviceName", serviceName); + return config; + } + /** * Greet server. If provided, the first element of {@code args} is the name to use in the - * greeting. The second argument is the target server. + * greeting. The second argument is the target server. The server should also provide the health + * service. This has an example of using the health service directly through the unary call check + * to get the current health and indirectly through the round robin load balancer, which uses the + * streaming rpc (see {@link io.grpc.protobuf.services.HealthCheckingLoadBalancerFactory}). */ public static void main(String[] args) throws Exception { System.setProperty("java.util.logging.SimpleFormatter.format", @@ -102,32 +145,18 @@ public static void main(String[] args) throws Exception { } } - // Create a communication channel to the server, known as a Channel. Channels are thread-safe - // and reusable. It is common to create channels at the beginning of your application and reuse - // them until the application shuts down. - // - // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To - // use TLS, use TlsChannelCredentials instead. - ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + // Will see failures because of server stopping processing + ManagedChannel channelSimple = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) .build(); - try { - // Set a watch - HealthServiceClient client = new HealthServiceClient(channel); - client.checkHealth("Before call"); - client.greet(users[0]); - client.checkHealth("After user " + users[0]); - for (String user : users) { - client.greet(user); - } - client.checkHealth("After all users"); - Thread.sleep(10000); - client.checkHealth("After 10 second wait"); - client.greet("Larry"); - } finally { - // ManagedChannels use resources like threads and TCP connections. To prevent leaking these - // resources the channel should be shut down when it will no longer be used. If it may be used - // again leave it running. - channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); - } + runTest(users, channelSimple); + + // Will block sending requests, so will not see failures since the round robin load balancer + // Uses the health service's watch rpc + ManagedChannel channelRR = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .defaultLoadBalancingPolicy("round_robin") + .defaultServiceConfig(generateHealthConfig("")) + .build(); + runTest(users, channelRR); + } } From aae877eb1a1860133e01fa064b8b614ce42dc43f Mon Sep 17 00:00:00 2001 From: larry-safran Date: Wed, 29 Mar 2023 22:34:54 +0000 Subject: [PATCH 06/21] fixed --- .../healthservice/HealthServiceClient.java | 56 +++++++++++++------ .../healthservice/HealthServiceServer.java | 12 +++- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java index 9330fd951ac..11c9e972b1d 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java @@ -22,6 +22,7 @@ import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.examples.helloworld.GreeterGrpc; import io.grpc.examples.helloworld.HelloReply; @@ -76,26 +77,53 @@ public void greet(String name) { } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; + } catch (Exception e) { + e.printStackTrace(); + return; } logger.info("Greeting: " + response.getMessage()); } - private static void runTest(String[] users, ManagedChannel channel) + private static void runTest(String target, String[] users, boolean useRoundRobin) throws InterruptedException { + ManagedChannelBuilder builder = + Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()); + + if (useRoundRobin) { + builder = builder + .defaultLoadBalancingPolicy("round_robin") + .defaultServiceConfig(generateHealthConfig("")); + } + + ManagedChannel channel = builder.build(); + + System.out.println("\nDoing test with" + (useRoundRobin ? "" : "out") + + " the Round Robin load balancer\n"); + try { // Set a watch HealthServiceClient client = new HealthServiceClient(channel); - client.checkHealth("Before call"); + if (!useRoundRobin) { + client.checkHealth("Before call"); + } client.greet(users[0]); - client.checkHealth("After user " + users[0]); + if (!useRoundRobin) { + client.checkHealth("After user " + users[0]); + } + for (String user : users) { client.greet(user); Thread.sleep(100); // Since the health update is asynchronous give it time to propagate } - client.checkHealth("After all users"); - Thread.sleep(10000); - client.checkHealth("After 10 second wait"); + + if (!useRoundRobin) { + client.checkHealth("After all users"); + Thread.sleep(10000); + client.checkHealth("After 10 second wait"); + } else { + Thread.sleep(10000); + } client.greet("Larry"); } finally { // ManagedChannels use resources like threads and TCP connections. To prevent leaking these @@ -145,18 +173,12 @@ public static void main(String[] args) throws Exception { } } - // Will see failures because of server stopping processing - ManagedChannel channelSimple = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) - .build(); - runTest(users, channelSimple); + // Will see failures of rpc's sent when server stops processing them that come from the server + runTest(target, users, false); - // Will block sending requests, so will not see failures since the round robin load balancer - // Uses the health service's watch rpc - ManagedChannel channelRR = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) - .defaultLoadBalancingPolicy("round_robin") - .defaultServiceConfig(generateHealthConfig("")) - .build(); - runTest(users, channelRR); + // The client will throw an error when sending the rpc to a non-serving service because the + // round robin load balancer uses the health service's watch rpc + runTest(target, users, true); } } diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java index 81f5136df12..25790b7013e 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java @@ -100,17 +100,23 @@ private class GreeterImpl extends GreeterGrpc.GreeterImplBase { @Override public void sayHello(HelloRequest req, StreamObserver responseObserver) { - if (shouldServe(req)) { + if (!isServing) { + responseObserver.onError( + Status.INTERNAL.withDescription("Not Serving right now").asRuntimeException()); + return; + } + + if (validateRequest(req)) { HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); responseObserver.onNext(reply); responseObserver.onCompleted(); } else { responseObserver.onError( - Status.INTERNAL.withDescription("Not Serving right now").asRuntimeException()); + Status.INVALID_ARGUMENT.withDescription("Offended by short name").asRuntimeException()); } } - private boolean shouldServe(HelloRequest req) { + private boolean validateRequest(HelloRequest req) { if (!isServing) { return false; } From 09efd9d00c6559708e8f68f63f35380ea308b684 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Tue, 21 Mar 2023 14:26:25 -0700 Subject: [PATCH 07/21] disable recording real-time metrics using in gcp-o11y --- .../main/java/io/grpc/gcp/observability/GcpObservability.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index 9470de77050..726472ab9f5 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -132,9 +132,9 @@ private void setProducer( } if (config.isEnableCloudMonitoring()) { clientInterceptors.add(getConditionalInterceptor( - InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true))); + InternalCensusStatsAccessor.getClientInterceptor(true, true, false, true))); tracerFactories.add( - InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, true)); + InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, false)); } if (config.isEnableCloudTracing()) { clientInterceptors.add( From caac3f007f586167d3ec2524d76d5c1022638a9a Mon Sep 17 00:00:00 2001 From: DNVindhya Date: Wed, 22 Mar 2023 08:40:05 -0700 Subject: [PATCH 08/21] gcp-o11y: add sleep in Observability close() This commit adds sleep in `close()` for metrics and/or traces to be flushed before closing observability. Currently sleep is set to 2 * [Metrics export interval (30 secs)]. --- .../grpc/gcp/observability/GcpObservability.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index 726472ab9f5..0f501da1fb1 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -53,11 +53,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; /** The main class for gRPC Google Cloud Platform Observability features. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869") public final class GcpObservability implements AutoCloseable { + + private static final Logger logger = Logger.getLogger(GcpObservability.class.getName()); private static final int METRICS_EXPORT_INTERVAL = 30; @VisibleForTesting static final ImmutableSet SERVICES_TO_EXCLUDE = ImmutableSet.of( @@ -117,6 +122,16 @@ public void close() { throw new IllegalStateException("GcpObservability already closed!"); } sink.close(); + if (config.isEnableCloudMonitoring() || config.isEnableCloudTracing()) { + try { + // Sleeping before shutdown to ensure all metrics and traces are flushed + Thread.sleep( + TimeUnit.MILLISECONDS.convert(2 * METRICS_EXPORT_INTERVAL, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.log(Level.SEVERE, "Caught exception during sleep", e); + } + } instance = null; } } From 7b6caef7f9c75866c0658a85bb2ea2bf262e255a Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 22 Mar 2023 11:28:16 -0700 Subject: [PATCH 09/21] test/android: fix the import for AndroidJUnit4 Everywhere else is using androidx.test.ext.junit.runners.AndroidJUnit4, and google internally only has that variant. --- .../io/grpc/android/integrationtest/UdsChannelInteropTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/android-interop-testing/src/androidTest/java/io/grpc/android/integrationtest/UdsChannelInteropTest.java b/android-interop-testing/src/androidTest/java/io/grpc/android/integrationtest/UdsChannelInteropTest.java index 40b954c00e2..62206138f6d 100644 --- a/android-interop-testing/src/androidTest/java/io/grpc/android/integrationtest/UdsChannelInteropTest.java +++ b/android-interop-testing/src/androidTest/java/io/grpc/android/integrationtest/UdsChannelInteropTest.java @@ -21,8 +21,8 @@ import android.net.LocalSocketAddress.Namespace; import androidx.test.InstrumentationRegistry; +import androidx.test.ext.junit.runners.AndroidJUnit4; import androidx.test.rule.ActivityTestRule; -import androidx.test.runner.AndroidJUnit4; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Server; import io.grpc.android.UdsChannelBuilder; From c5949da8689db9c71721c6c5c2cc428c4dce3dfa Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 22 Mar 2023 12:11:26 -0700 Subject: [PATCH 10/21] netty: implement GrpcHttp2InboundHeaders.iterator() This will be used to generate more useful debugging information in cases such as headers size exceeding the limit. --- .../io/grpc/netty/GrpcHttp2HeadersUtils.java | 24 ++++++++++--- .../grpc/netty/GrpcHttp2HeadersUtilsTest.java | 34 +++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersUtils.java b/netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersUtils.java index 4023fd1218f..c0d60721a1b 100644 --- a/netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersUtils.java +++ b/netty/src/main/java/io/grpc/netty/GrpcHttp2HeadersUtils.java @@ -46,9 +46,12 @@ import io.netty.handler.codec.http2.Http2Headers; import io.netty.util.AsciiString; import io.netty.util.internal.PlatformDependent; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; /** * A headers utils providing custom gRPC implementations of {@link DefaultHttp2HeadersDecoder}. @@ -288,6 +291,11 @@ public int size() { return numHeaders(); } + @Override + public Iterator> iterator() { + return namesAndValuesToImmutableList().iterator(); + } + protected static void appendNameAndValue(StringBuilder builder, CharSequence name, CharSequence value, boolean prependSeparator) { if (prependSeparator) { @@ -296,14 +304,22 @@ protected static void appendNameAndValue(StringBuilder builder, CharSequence nam builder.append(name).append(": ").append(value); } - protected final String namesAndValuesToString() { - StringBuilder builder = new StringBuilder(); - boolean prependSeparator = false; + private List> namesAndValuesToImmutableList() { + ArrayList> list = new ArrayList<>(values.length); for (int i = 0; i < namesAndValuesIdx; i += 2) { String name = new String(namesAndValues[i], US_ASCII); // If binary headers, the value is base64 encoded. AsciiString value = values[i / 2]; - appendNameAndValue(builder, name, value, prependSeparator); + list.add(new AbstractMap.SimpleImmutableEntry(name, value)); + } + return Collections.unmodifiableList(list); + } + + protected final String namesAndValuesToString() { + StringBuilder builder = new StringBuilder(); + boolean prependSeparator = false; + for (Map.Entry entry : namesAndValuesToImmutableList()) { + appendNameAndValue(builder, entry.getKey(), entry.getValue(), prependSeparator); prependSeparator = true; } return builder.toString(); diff --git a/netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersUtilsTest.java b/netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersUtilsTest.java index 48c6320f4c6..653b44a9d2d 100644 --- a/netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersUtilsTest.java +++ b/netty/src/test/java/io/grpc/netty/GrpcHttp2HeadersUtilsTest.java @@ -42,6 +42,8 @@ import io.netty.handler.codec.http2.Http2HeadersEncoder.SensitivityDetector; import io.netty.util.AsciiString; import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; @@ -301,6 +303,38 @@ public void headerGetAll_multiplePresent() { .containsExactly(AsciiString.of("value1"), AsciiString.of("value2")); } + @Test + public void headerIterator_empty() { + Http2Headers http2Headers = new GrpcHttp2RequestHeaders(2); + Iterator> it = http2Headers.iterator(); + assertThat(it.hasNext()).isFalse(); + } + + @Test + public void headerIteartor_nonEmpty() { + Http2Headers http2Headers = new GrpcHttp2RequestHeaders(2); + http2Headers.add(AsciiString.of("notit1"), AsciiString.of("val1")); + http2Headers.add(AsciiString.of("multiple"), AsciiString.of("value1")); + http2Headers.add(AsciiString.of("notit2"), AsciiString.of("val2")); + http2Headers.add(AsciiString.of("multiple"), AsciiString.of("value2")); + http2Headers.add(AsciiString.of("notit3"), AsciiString.of("val3")); + Iterator> it = http2Headers.iterator(); + + assertNextEntry(it, "notit1", AsciiString.of("val1")); + assertNextEntry(it, "multiple", AsciiString.of("value1")); + assertNextEntry(it, "notit2", AsciiString.of("val2")); + assertNextEntry(it, "multiple", AsciiString.of("value2")); + assertNextEntry(it, "notit3", AsciiString.of("val3")); + assertThat(it.hasNext()).isFalse(); + } + + private static void assertNextEntry( + Iterator> it, CharSequence key, CharSequence value) { + Map.Entry entry = it.next(); + assertThat(entry.getKey()).isEqualTo(key); + assertThat(entry.getValue()).isEqualTo(value); + } + @Test public void headerRemove_notPresent() { Http2Headers http2Headers = new GrpcHttp2RequestHeaders(2); From 4002b4ecd67d156b9d1cb47fc2bb4d2ccf18879c Mon Sep 17 00:00:00 2001 From: Larry Safran <107004254+larry-safran@users.noreply.github.com> Date: Wed, 22 Mar 2023 12:14:00 -0700 Subject: [PATCH 11/21] examples: waitForReady example (#9960) Add an example using waitForReady Part of fixit. Fixes b/259286751 --- examples/README.md | 2 + examples/build.gradle | 8 ++ .../waitforready/WaitForReadyClient.java | 132 ++++++++++++++++++ 3 files changed, 142 insertions(+) create mode 100644 examples/src/main/java/io/grpc/examples/waitforready/WaitForReadyClient.java diff --git a/examples/README.md b/examples/README.md index 42964d519b6..f83d04f08d9 100644 --- a/examples/README.md +++ b/examples/README.md @@ -23,6 +23,8 @@ before trying out the examples. - [Flow control](src/main/java/io/grpc/examples/manualflowcontrol) +- [Wait For Ready](src/main/java/io/grpc/examples/waitforready) + - [Json serialization](src/main/java/io/grpc/examples/advanced) -
diff --git a/examples/build.gradle b/examples/build.gradle index a842b2c026d..566699a5eb6 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -140,6 +140,13 @@ task manualFlowControlServer(type: CreateStartScripts) { classpath = startScripts.classpath } +task waitForReadyClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.waitforready.WaitForReadyClient' + applicationName = 'wait-for-ready-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + task loadBalanceServer(type: CreateStartScripts) { mainClass = 'io.grpc.examples.loadbalance.LoadBalanceServer' applicationName = 'load-balance-server' @@ -222,6 +229,7 @@ applicationDistribution.into('bin') { from(compressingHelloWorldClient) from(manualFlowControlClient) from(manualFlowControlServer) + from(waitForReadyClient) from(loadBalanceServer) from(loadBalanceClient) from(nameResolveServer) diff --git a/examples/src/main/java/io/grpc/examples/waitforready/WaitForReadyClient.java b/examples/src/main/java/io/grpc/examples/waitforready/WaitForReadyClient.java new file mode 100644 index 00000000000..98372395c6e --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/waitforready/WaitForReadyClient.java @@ -0,0 +1,132 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.waitforready; + +import io.grpc.Channel; +import io.grpc.Deadline; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.examples.helloworld.HelloReply; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This is an example of using waitForReady. This is a feature which can be used on a stub + * which will cause the rpcs to wait (until optional deadline is exceeded) for the + * server to become available before sending the request. This is useful for batch workflows + * where there is no need to fail fast. + * + * Below is a simple client that requests a greeting from the + * {@link io.grpc.examples.helloworld.HelloWorldServer} and defines waitForReady on the stub. + * To test, + * 1. run this client without a server running - client rpc should hang + * 2. start the server - client rpc should complete + * 3. run this client again - client rpc should complete nearly immediately + */ +public class WaitForReadyClient { + private static final Logger logger = Logger.getLogger(WaitForReadyClient.class.getName()); + + private final GreeterGrpc.GreeterBlockingStub blockingStub; + + /** + * Construct client for accessing HelloWorld server using the existing channel which will + * wait for the server to become ready, however long that may take, before sending the request. + */ + public WaitForReadyClient(Channel channel) { + // This is the only difference from the simple HelloWorld example + blockingStub = GreeterGrpc.newBlockingStub(channel).withWaitForReady(); + } + + /** + * Construct a client for accessing HelloWorld server using the existing channel which will + * wait for the server to become ready, up to the specified deadline, before sending the request. + * if the deadline is exceeded before the server becomes ready, then the rpc call will fail with + * a Status of DEADLINE_EXCEEDED without the request being sent. + */ + public WaitForReadyClient(Channel channel, Deadline deadline) { + blockingStub = GreeterGrpc.newBlockingStub(channel).withWaitForReady().withDeadline(deadline); + } + + + /** Say hello to server. */ + public void greet(String name) { + logger.info("Will try to greet " + name + " ..."); + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response; + try { + response = blockingStub.sayHello(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + logger.info("Greeting: " + response.getMessage()); + } + + /** + * Greet server. If provided, the first element of {@code args} is the name to use in the + * greeting. The second argument is the target server. + */ + public static void main(String[] args) throws Exception { + String user = "world"; + // Access a service running on the local machine on port 50051 + String target = "localhost:50051"; + // Allow passing in the user and target strings as command line arguments + if (args.length > 0) { + if ("--help".equals(args[0])) { + System.err.println("Usage: [name [target]]"); + System.err.println(); + System.err.println(" name The name you wish to be greeted by. Defaults to " + user); + System.err.println(" target The server to connect to. Defaults to " + target); + System.exit(1); + } + user = args[0]; + } + if (args.length > 1) { + target = args[1]; + } + + // Create a communication channel to the server, known as a Channel. Channels are thread-safe + // and reusable. It is common to create channels at the beginning of your application and reuse + // them until the application shuts down. + // + // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To + // use TLS, use TlsChannelCredentials instead. + ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .build(); + try { + // If server isn't running, this will fail after 5 seconds. Will also fail if the server is + // running particularly slowly and takes more than 5 minutes to respond. + WaitForReadyClient clientWithTimeout = + new WaitForReadyClient(channel, Deadline.after(5, TimeUnit.SECONDS)); + clientWithTimeout.greet(user); + + // This will wait forever until the server becomes ready + WaitForReadyClient client = new WaitForReadyClient(channel); + client.greet(user); + } finally { + // ManagedChannels use resources like threads and TCP connections. To prevent leaking these + // resources the channel should be shut down when it will no longer be used. If it may be used + // again leave it running. + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } +} From f618fee65d366b704424a2105196791e4cd11132 Mon Sep 17 00:00:00 2001 From: DNVindhya Date: Wed, 22 Mar 2023 16:44:58 -0700 Subject: [PATCH 12/21] gcp-o11y: add default custom tag for metrics exporter This PR adds a default custom tag for metrics, irrespective of custom tags being present in the observability configuration. OpenCensus by default adds a custom tag [opencenus_task](https://docs.google.com/document/d/1sWC-XD277cM0PXxAhzJKY2X0Uj2W7bVoSv-jvnA0N8Q/edit?resourcekey=0-l-wqh1fctxZXHCUrvZv2BQ#heading=h.xy85j580eik0) for metrics which gets overriden if custom tags are set. The unique custom tag is required to ensure the uniqueness of the Timeseries. The format of the default custom tag is: `java-{PID}@{HOSTNAME}`, if `{PID}` is not available a random number will be used. --- .../gcp/observability/GcpObservability.java | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index 0f501da1fb1..ff5e12be399 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -51,7 +51,12 @@ import io.opencensus.trace.Tracing; import io.opencensus.trace.config.TraceConfig; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.SecureRandom; import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -64,10 +69,13 @@ public final class GcpObservability implements AutoCloseable { private static final Logger logger = Logger.getLogger(GcpObservability.class.getName()); private static final int METRICS_EXPORT_INTERVAL = 30; + + static final String DEFAULT_METRIC_CUSTOM_TAG_KEY = "opencensus_task"; @VisibleForTesting static final ImmutableSet SERVICES_TO_EXCLUDE = ImmutableSet.of( "google.logging.v2.LoggingServiceV2", "google.monitoring.v3.MetricService", "google.devtools.cloudtrace.v2.TraceService"); + private static GcpObservability instance = null; private final Sink sink; private final ObservabilityConfig config; @@ -199,12 +207,17 @@ void registerStackDriverExporter(String projectId, Map customTag if (projectId != null) { statsConfigurationBuilder.setProjectId(projectId); } + Map constantLabels = new HashMap<>(); + constantLabels.put( + LabelKey.create(DEFAULT_METRIC_CUSTOM_TAG_KEY, DEFAULT_METRIC_CUSTOM_TAG_KEY), + LabelValue.create(generateDefaultMetricTagValue())); if (customTags != null) { - Map constantLabels = customTags.entrySet().stream() - .collect(Collectors.toMap(e -> LabelKey.create(e.getKey(), e.getKey()), - e -> LabelValue.create(e.getValue()))); - statsConfigurationBuilder.setConstantLabels(constantLabels); + for (Map.Entry mapEntry : customTags.entrySet()) { + constantLabels.putIfAbsent(LabelKey.create(mapEntry.getKey(), mapEntry.getKey()), + LabelValue.create(mapEntry.getValue())); + } } + statsConfigurationBuilder.setConstantLabels(constantLabels); statsConfigurationBuilder.setExportInterval(Duration.create(METRICS_EXPORT_INTERVAL, 0)); StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build()); } @@ -228,6 +241,20 @@ void registerStackDriverExporter(String projectId, Map customTag } } + private static String generateDefaultMetricTagValue() { + final String jvmName = ManagementFactory.getRuntimeMXBean().getName(); + if (jvmName.indexOf('@') < 1) { + String hostname = "localhost"; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + logger.log(Level.INFO, "Unable to get the hostname.", e); + } + return "java-" + new SecureRandom().nextInt() + "@" + hostname; + } + return "java-" + jvmName; + } + private GcpObservability( Sink sink, ObservabilityConfig config) { From 21fe4c2f33833f5e4d467a8dc8491455bfdf533e Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 22 Mar 2023 16:50:09 -0700 Subject: [PATCH 13/21] Remove sleep from Observability Interop Test binary now that its done in close() (#9977) After #9972, the `sleep()` is done inside Observability `close()`, we can remove this `sleep()` in the Observability Interop test binary. --- .../grpc/gcp/observability/interop/TestServiceInterop.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/gcp-observability/interop/src/main/java/io/grpc/gcp/observability/interop/TestServiceInterop.java b/gcp-observability/interop/src/main/java/io/grpc/gcp/observability/interop/TestServiceInterop.java index 402e4981121..f854d20eaaf 100644 --- a/gcp-observability/interop/src/main/java/io/grpc/gcp/observability/interop/TestServiceInterop.java +++ b/gcp-observability/interop/src/main/java/io/grpc/gcp/observability/interop/TestServiceInterop.java @@ -20,7 +20,6 @@ import io.grpc.testing.integration.TestServiceClient; import io.grpc.testing.integration.TestServiceServer; import java.util.Arrays; -import java.util.concurrent.TimeUnit; /** * Combined interop client and server for observability testing. @@ -47,11 +46,6 @@ public static void main(String[] args) throws Exception { } else { TestServiceServer.main(args); } - // TODO(stanleycheung): remove this once the observability exporter plugin is able to - // gracefully flush observability data to cloud at shutdown - final int o11yCloseSleepSeconds = 65; - System.out.println("Sleeping " + o11yCloseSleepSeconds + " seconds before exiting"); - Thread.sleep(TimeUnit.MILLISECONDS.convert(o11yCloseSleepSeconds, TimeUnit.SECONDS)); } } From 7581150cafeab5ad60a03041cd3ae8cc950cc6e2 Mon Sep 17 00:00:00 2001 From: DNVindhya Date: Wed, 22 Mar 2023 17:02:25 -0700 Subject: [PATCH 14/21] examples: add gcp-observability examples (#9967) * add examples for gcp-observability --- examples/example-gcp-observability/README.md | 39 ++++++++ .../example-gcp-observability/build.gradle | 68 +++++++++++++ .../example-gcp-observability/settings.gradle | 1 + .../GcpObservabilityClient.java | 93 ++++++++++++++++++ .../GcpObservabilityServer.java | 97 +++++++++++++++++++ .../main/proto/helloworld/helloworld.proto | 39 ++++++++ .../gcp_observability_client_config.json | 17 ++++ .../gcp_observability_server_config.json | 17 ++++ 8 files changed, 371 insertions(+) create mode 100644 examples/example-gcp-observability/README.md create mode 100644 examples/example-gcp-observability/build.gradle create mode 100644 examples/example-gcp-observability/settings.gradle create mode 100644 examples/example-gcp-observability/src/main/java/io/grpc/examples/gcpobservability/GcpObservabilityClient.java create mode 100644 examples/example-gcp-observability/src/main/java/io/grpc/examples/gcpobservability/GcpObservabilityServer.java create mode 100644 examples/example-gcp-observability/src/main/proto/helloworld/helloworld.proto create mode 100644 examples/example-gcp-observability/src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_client_config.json create mode 100644 examples/example-gcp-observability/src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_server_config.json diff --git a/examples/example-gcp-observability/README.md b/examples/example-gcp-observability/README.md new file mode 100644 index 00000000000..95270fa8140 --- /dev/null +++ b/examples/example-gcp-observability/README.md @@ -0,0 +1,39 @@ +gRPC GCP Observability Example +================ + +The GCP Observability example consists of a Hello World client and a Hello World server instrumented for logs, metrics and tracing. + +__Please refer to Microservices Observability user guide for setup.__ + +### Build the example + +Build the Observability client & server. From the `grpc-java/examples/example-gcp-observability` +directory: +``` +$ ../gradlew installDist +``` + +This creates the scripts `build/install/example-gcp-observability/bin/gcp-observability-client` and +`build/install/example-gcp-observability/bin/gcp-observability-server`. + +### Run the example with configuration + +To use Observability, you should first setup and configure authorization as mentioned in the user guide. + +You need to set the `GRPC_GCP_OBSERVABILITY_CONFIG_FILE` environment variable to point to the gRPC GCP Observability configuration file (preferred) or if that +is not set then `GRPC_GCP_OBSERVABILITY_CONFIG` environment variable to gRPC GCP Observability configuration value. This is needed by both +`build/install/example-gcp-observability/bin/gcp-observability-client` and +`build/install/example-gcp-observability/bin/gcp-observability-server`. + +1. To start the observability-enabled example server on its default port of 50051, run: +``` +$ export GRPC_GCP_OBSERVABILITY_CONFIG_FILE=src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_server_config.json +$ ./build/install/example-gcp-observability/bin/gcp-observability-server +``` + +2. In a different terminal window, run the observability-enabled example client: +``` +$ export GRPC_GCP_OBSERVABILITY_CONFIG_FILE=src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_client_config.json +$ ./build/install/example-gcp-observability/bin/gcp-observability-client +``` + diff --git a/examples/example-gcp-observability/build.gradle b/examples/example-gcp-observability/build.gradle new file mode 100644 index 00000000000..5d8571aefef --- /dev/null +++ b/examples/example-gcp-observability/build.gradle @@ -0,0 +1,68 @@ +plugins { + // Provide convenience executables for trying out the examples. + id 'application' + // ASSUMES GRADLE 5.6 OR HIGHER. Use plugin version 0.8.10 with earlier gradle versions + id 'com.google.protobuf' version '0.8.17' + // Generate IntelliJ IDEA's .idea & .iml project files + id 'idea' + id 'java' +} + +repositories { + maven { // The google mirror is less flaky than mavenCentral() + url "https://maven-central.storage-download.googleapis.com/maven2/" + } + mavenCentral() + mavenLocal() +} + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +// IMPORTANT: You probably want the non-SNAPSHOT version of gRPC. Make sure you +// are looking at a tagged version of the example and not "master"! + +// Feel free to delete the comment at the next line. It is just for safely +// updating the version in our release process. +def grpcVersion = '1.55.0-SNAPSHOT' // CURRENT_GRPC_VERSION +def protocVersion = '3.21.7' + +dependencies { + implementation "io.grpc:grpc-protobuf:${grpcVersion}" + implementation "io.grpc:grpc-stub:${grpcVersion}" + implementation "io.grpc:grpc-gcp-observability:${grpcVersion}" + compileOnly "org.apache.tomcat:annotations-api:6.0.53" + runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}" +} + +protobuf { + protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" } + plugins { + grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } + } + generateProtoTasks { + all()*.plugins { grpc {} } + } +} + +startScripts.enabled = false + +task ObservabilityHelloWorldServer(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.gcpobservability.GcpObservabilityServer' + applicationName = 'gcp-observability-server' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + +task ObservabilityHelloWorldClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.gcpobservability.GcpObservabilityClient' + applicationName = 'gcp-observability-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + +applicationDistribution.into('bin') { + from(ObservabilityHelloWorldServer) + from(ObservabilityHelloWorldClient) + fileMode = 0755 +} diff --git a/examples/example-gcp-observability/settings.gradle b/examples/example-gcp-observability/settings.gradle new file mode 100644 index 00000000000..1e4ba3812eb --- /dev/null +++ b/examples/example-gcp-observability/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'example-gcp-observability' diff --git a/examples/example-gcp-observability/src/main/java/io/grpc/examples/gcpobservability/GcpObservabilityClient.java b/examples/example-gcp-observability/src/main/java/io/grpc/examples/gcpobservability/GcpObservabilityClient.java new file mode 100644 index 00000000000..34112adb108 --- /dev/null +++ b/examples/example-gcp-observability/src/main/java/io/grpc/examples/gcpobservability/GcpObservabilityClient.java @@ -0,0 +1,93 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.gcpobservability; + +import io.grpc.Channel; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.gcp.observability.GcpObservability; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A simple observability client that requests a greeting from the {@link HelloWorldServer} and + * generates logs, metrics and traces based on the configuration. + */ +public class GcpObservabilityClient { + private static final Logger logger = Logger.getLogger(GcpObservabilityClient.class.getName()); + + private final GreeterGrpc.GreeterBlockingStub blockingStub; + + /** Construct client for accessing HelloWorld server using the existing channel. */ + public GcpObservabilityClient(Channel channel) { + blockingStub = GreeterGrpc.newBlockingStub(channel); + } + + /** Say hello to server. */ + public void greet(String name) { + logger.info("Will try to greet " + name + " ..."); + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response; + try { + response = blockingStub.sayHello(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + logger.info("Greeting: " + response.getMessage()); + } + + /** + * Greet server. If provided, the first element of {@code args} is the name to use in the + * greeting. The second argument is the target server. + */ + public static void main(String[] args) throws Exception { + String user = "world"; + String target = "localhost:50051"; + if (args.length > 0) { + if ("--help".equals(args[0])) { + System.err.println("Usage: [name [target]]"); + System.err.println(""); + System.err.println(" name The name you wish to be greeted by. Defaults to " + user); + System.err.println(" target The server to connect to. Defaults to " + target); + System.exit(1); + } + user = args[0]; + } + if (args.length > 1) { + target = args[1]; + } + + // Initialize observability + try (GcpObservability observability = GcpObservability.grpcInit()) { + ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .build(); + try { + GcpObservabilityClient client = new GcpObservabilityClient(channel); + client.greet(user); + } finally { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } // observability.close() called implicitly + } +} diff --git a/examples/example-gcp-observability/src/main/java/io/grpc/examples/gcpobservability/GcpObservabilityServer.java b/examples/example-gcp-observability/src/main/java/io/grpc/examples/gcpobservability/GcpObservabilityServer.java new file mode 100644 index 00000000000..c599e7047c5 --- /dev/null +++ b/examples/example-gcp-observability/src/main/java/io/grpc/examples/gcpobservability/GcpObservabilityServer.java @@ -0,0 +1,97 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.gcpobservability; + +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.Server; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.gcp.observability.GcpObservability; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * Observability server that manages startup/shutdown of a {@code Greeter} server and generates + * logs, metrics and traces based on the configuration. + */ +public class GcpObservabilityServer { + private static final Logger logger = Logger.getLogger(GcpObservabilityServer.class.getName()); + + private Server server; + + private void start() throws IOException { + int port = 50051; + server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(new GreeterImpl()) + .build() + .start(); + logger.info("Server started, listening on " + port); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + // Initialize observability + GcpObservability observability = GcpObservability.grpcInit(); + final GcpObservabilityServer server = new GcpObservabilityServer(); + server.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + server.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + // Shut down observability + observability.close(); + System.err.println("*** server shut down"); + } + }); + + server.blockUntilShutdown(); + } + + static class GreeterImpl extends GreeterGrpc.GreeterImplBase { + + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + } +} diff --git a/examples/example-gcp-observability/src/main/proto/helloworld/helloworld.proto b/examples/example-gcp-observability/src/main/proto/helloworld/helloworld.proto new file mode 100644 index 00000000000..64a8c09ee16 --- /dev/null +++ b/examples/example-gcp-observability/src/main/proto/helloworld/helloworld.proto @@ -0,0 +1,39 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.helloworld"; +option java_outer_classname = "HelloWorldProto"; +option objc_class_prefix = "HLW"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/examples/example-gcp-observability/src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_client_config.json b/examples/example-gcp-observability/src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_client_config.json new file mode 100644 index 00000000000..9c69d55e7f7 --- /dev/null +++ b/examples/example-gcp-observability/src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_client_config.json @@ -0,0 +1,17 @@ +{ + "cloud_monitoring": {}, + "cloud_trace": { + "sampling_rate": 1.0 + }, + "cloud_logging": { + "client_rpc_events": [{ + "methods": ["helloworld.Greeter/*"] + }], + "server_rpc_events": [{ + "methods": ["helloworld.Greeter/*"] + }] + }, + "labels": { + "environment" : "example-client" + } +} diff --git a/examples/example-gcp-observability/src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_server_config.json b/examples/example-gcp-observability/src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_server_config.json new file mode 100644 index 00000000000..78698c05faf --- /dev/null +++ b/examples/example-gcp-observability/src/main/resources/io/grpc/examples/gcpobservability/gcp_observability_server_config.json @@ -0,0 +1,17 @@ +{ + "cloud_monitoring": {}, + "cloud_trace": { + "sampling_rate": 1.0 + }, + "cloud_logging": { + "client_rpc_events": [{ + "methods": ["helloworld.Greeter/*"] + }], + "server_rpc_events": [{ + "methods": ["helloworld.Greeter/*"] + }] + }, + "labels": { + "environment" : "example-server" + } +} From c672d99bf97c85138f0d185aa942e5851b6afdfa Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 22 Mar 2023 18:11:32 -0700 Subject: [PATCH 15/21] examples: Add cancellation example It uses the echo service for both unary and bidi RPCs, to show the various cancellation circumstances and APIs. --- examples/BUILD.bazel | 18 ++ examples/build.gradle | 16 ++ .../cancellation/CancellationClient.java | 204 +++++++++++++++++ .../cancellation/CancellationServer.java | 206 ++++++++++++++++++ .../main/proto/grpc/examples/echo/echo.proto | 48 ++++ 5 files changed, 492 insertions(+) create mode 100644 examples/src/main/java/io/grpc/examples/cancellation/CancellationClient.java create mode 100644 examples/src/main/java/io/grpc/examples/cancellation/CancellationServer.java create mode 100644 examples/src/main/proto/grpc/examples/echo/echo.proto diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index 8515875c1a0..24b668879ce 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -51,6 +51,22 @@ java_grpc_library( deps = [":route_guide_java_proto"], ) +proto_library( + name = "echo_proto", + srcs = ["src/main/proto/grpc/examples/echo/echo.proto"], +) + +java_proto_library( + name = "echo_java_proto", + deps = [":echo_proto"], +) + +java_grpc_library( + name = "echo_java_grpc", + srcs = [":echo_proto"], + deps = [":echo_java_proto"], +) + java_library( name = "examples", testonly = 1, @@ -65,6 +81,8 @@ java_library( ], deps = [ ":_health_java_grpc", + ":echo_java_grpc", + ":echo_java_proto", ":hello_streaming_java_grpc", ":hello_streaming_java_proto", ":helloworld_java_grpc", diff --git a/examples/build.gradle b/examples/build.gradle index 566699a5eb6..56eea09a4ff 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -210,6 +210,13 @@ task healthServiceServer(type: CreateStartScripts) { classpath = startScripts.classpath } +task cancellationClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.cancellation.CancellationClient' + applicationName = 'cancellation-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + task healthServiceClient(type: CreateStartScripts) { mainClass = 'io.grpc.examples.healthservice.HealthServiceClient' applicationName = 'health-service-client' @@ -217,6 +224,13 @@ task healthServiceClient(type: CreateStartScripts) { classpath = startScripts.classpath } +task cancellationServer(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.cancellation.CancellationServer' + applicationName = 'cancellation-server' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + applicationDistribution.into('bin') { from(routeGuideServer) from(routeGuideClient) @@ -240,5 +254,7 @@ applicationDistribution.into('bin') { from(keepAliveClient) from(healthServiceServer) from(healthServiceClient) + from(cancellationClient) + from(cancellationServer) fileMode = 0755 } diff --git a/examples/src/main/java/io/grpc/examples/cancellation/CancellationClient.java b/examples/src/main/java/io/grpc/examples/cancellation/CancellationClient.java new file mode 100644 index 00000000000..fa31ee8e6c1 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/cancellation/CancellationClient.java @@ -0,0 +1,204 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.cancellation; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Channel; +import io.grpc.Context; +import io.grpc.Context.CancellableContext; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.echo.EchoGrpc; +import io.grpc.examples.echo.EchoRequest; +import io.grpc.examples.echo.EchoResponse; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.TimeUnit; + +/** + * A client that cancels RPCs to an Echo server. + */ +public class CancellationClient { + private final Channel channel; + + public CancellationClient(Channel channel) { + this.channel = channel; + } + + private void demonstrateCancellation() throws Exception { + echoBlocking("I'M A BLOCKING CLIENT! HEAR ME ROAR!"); + + // io.grpc.Context can be used to cancel RPCs using any of the stubs. It is the only way to + // cancel blocking stub RPCs. io.grpc.Context is a general-purpose alternative to thread + // interruption and can be used outside of gRPC, like to coordinate within your application. + // + // CancellableContext must always be cancelled or closed at the end of its lifetime, otherwise + // it could "leak" memory. + try (CancellableContext context = Context.current().withCancellation()) { + new Thread(() -> { + try { + Thread.sleep(500); // Do some work + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + // Cancellation reasons are never sent to the server. But they are echoed back to the + // client as the RPC failure reason. + context.cancel(new RuntimeException("Oops. Messed that up, let me try again")); + }).start(); + + // context.run() attaches the context to this thread for gRPC to observe. It also restores + // the previous context before returning. + context.run(() -> echoBlocking("RAAWRR haha lol hehe AWWRR GRRR")); + } + + // Futures cancelled with interruption cancel the RPC. + ListenableFuture future = echoFuture("Future clie*cough*nt was here!"); + Thread.sleep(500); // Do some work + // We realize we really don't want to hear that echo. + future.cancel(true); + Thread.sleep(100); // Make logs more obvious. Cancel is async + + ClientCallStreamObserver reqCallObserver = echoAsync("Testing, testing, 1, 2, 3"); + reqCallObserver.onCompleted(); + Thread.sleep(500); // Make logs more obvious. Wait for completion + + // Async's onError() will cancel. But the method can't be called concurrently with other calls + // on the StreamObserver. If you need thread-safety, use CancellableContext as above. + StreamObserver reqObserver = echoAsync("... async client... is the... best..."); + try { + Thread.sleep(500); // Do some work + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + // Since reqObserver.onCompleted() hasn't been called, we can use onError(). + reqObserver.onError(new RuntimeException("That was weak...")); + Thread.sleep(100); // Make logs more obvious. Cancel is async + + // Async's cancel() will cancel. Also may not be called concurrently with other calls on the + // StreamObserver. + reqCallObserver = echoAsync("Async client or bust!"); + reqCallObserver.onCompleted(); + try { + Thread.sleep(250); // Do some work + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + // Since onCompleted() has been called, we can't use onError(). It is safe to use cancel() + // regardless of onCompleted() being called. + reqCallObserver.cancel("That's enough. I'm bored", null); + Thread.sleep(100); // Make logs more obvious. Cancel is async + } + + /** Say hello to server, just like in helloworld example. */ + public void echoBlocking(String text) { + System.out.println("\nYelling: " + text); + EchoRequest request = EchoRequest.newBuilder().setMessage(text).build(); + EchoResponse response; + try { + response = EchoGrpc.newBlockingStub(channel).unaryEcho(request); + } catch (StatusRuntimeException e) { + System.out.println("RPC failed: " + e.getStatus()); + return; + } + System.out.println("Echo: " + response.getMessage()); + } + + /** Say hello to the server, but using future API. */ + public ListenableFuture echoFuture(String text) { + System.out.println("\nYelling: " + text); + EchoRequest request = EchoRequest.newBuilder().setMessage(text).build(); + ListenableFuture future = EchoGrpc.newFutureStub(channel).unaryEcho(request); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(EchoResponse response) { + System.out.println("Echo: " + response.getMessage()); + } + + @Override + public void onFailure(Throwable t) { + System.out.println("RPC failed: " + Status.fromThrowable(t)); + } + }, MoreExecutors.directExecutor()); + return future; + } + + /** Say hello to the server, but using async API and cancelling. */ + public ClientCallStreamObserver echoAsync(String text) { + System.out.println("\nYelling: " + text); + EchoRequest request = EchoRequest.newBuilder().setMessage(text).build(); + + // Client-streaming and bidirectional RPCs can cast the returned StreamObserver to + // ClientCallStreamObserver. + // + // Unary and server-streaming stub methods don't return a StreamObserver. For such RPCs, you can + // use ClientResponseObserver to get the ClientCallStreamObserver. For example: + // EchoGrpc.newStub(channel).unaryEcho(new ClientResponseObserver() {...}); + // Since ClientCallStreamObserver.cancel() is not thread-safe, it isn't safe to call from + // another thread until the RPC stub method (e.g., unaryEcho()) returns. + ClientCallStreamObserver reqObserver = (ClientCallStreamObserver) + EchoGrpc.newStub(channel).bidirectionalStreamingEcho(new StreamObserver() { + @Override + public void onNext(EchoResponse response) { + System.out.println("Echo: " + response.getMessage()); + } + + @Override + public void onCompleted() { + System.out.println("RPC completed"); + } + + @Override + public void onError(Throwable t) { + System.out.println("RPC failed: " + Status.fromThrowable(t)); + } + }); + + reqObserver.onNext(request); + return reqObserver; + } + + /** + * Cancel RPCs to a server. If provided, the first element of {@code args} is the target server. + */ + public static void main(String[] args) throws Exception { + String target = "localhost:50051"; + if (args.length > 0) { + if ("--help".equals(args[0])) { + System.err.println("Usage: [target]"); + System.err.println(""); + System.err.println(" target The server to connect to. Defaults to " + target); + System.exit(1); + } + target = args[0]; + } + + ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .build(); + try { + CancellationClient client = new CancellationClient(channel); + client.demonstrateCancellation(); + } finally { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } +} diff --git a/examples/src/main/java/io/grpc/examples/cancellation/CancellationServer.java b/examples/src/main/java/io/grpc/examples/cancellation/CancellationServer.java new file mode 100644 index 00000000000..cf26e974f5f --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/cancellation/CancellationServer.java @@ -0,0 +1,206 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.cancellation; + +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Context; +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.examples.echo.EchoGrpc; +import io.grpc.examples.echo.EchoRequest; +import io.grpc.examples.echo.EchoResponse; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * Server that manages startup/shutdown of a {@code Greeter} server. + * + *

Any abort of an ongoing RPC is considered "cancellation" of that RPC. The common causes of + * cancellation are the client explicitly cancelling, the deadline expires, and I/O failures. The + * service is not informed the reason for the cancellation. + * + *

There are two APIs for services to be notified of RPC cancellation: io.grpc.Context and + * ServerCallStreamObserver. Context listeners are called on a different thread, so need to be + * thread-safe. The ServerCallStreamObserver cancellation callback is called like other + * StreamObserver callbacks, so the application may not need thread-safe handling. Both APIs have + * thread-safe isCancelled() polling methods. + */ +public class CancellationServer { + public static void main(String[] args) throws IOException, InterruptedException { + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + int port = 50051; + Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(new SlowEcho(scheduler)) + .build() + .start(); + System.out.println("Server started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + } + }); + server.awaitTermination(); + scheduler.shutdown(); + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } + + static class SlowEcho extends EchoGrpc.EchoImplBase { + private final ScheduledExecutorService scheduler; + + /** {@code scheduler} must be single-threaded. */ + public SlowEcho(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + + /** + * Repeatedly echos each request until the client has no more requests. It performs all work + * asynchronously on a single thread. It uses ServerCallStreamObserver to be notified of RPC + * cancellation. + */ + @Override + public StreamObserver bidirectionalStreamingEcho( + StreamObserver responseObserver) { + // If the service is truly asynchronous, using ServerCallStreamObserver to receive + // cancellation notifications tends to work well. + + // It is safe to cast the provided observer to ServerCallStreamObserver. + ServerCallStreamObserver responseCallObserver = + (ServerCallStreamObserver) responseObserver; + System.out.println("\nBidi RPC started"); + class EchoObserver implements StreamObserver { + private static final int delayMs = 200; + private final List> echos = new ArrayList<>(); + + @Override + public void onNext(EchoRequest request) { + System.out.println("Bidi RPC received request: " + request.getMessage()); + EchoResponse response + = EchoResponse.newBuilder().setMessage(request.getMessage()).build(); + Runnable echo = () -> responseObserver.onNext(response); + echos.add(scheduler.scheduleAtFixedRate(echo, delayMs, delayMs, TimeUnit.MILLISECONDS)); + } + + @Override + public void onCompleted() { + System.out.println("Bidi RPC client finished"); + // Let each echo happen two more times, and then stop. + List> echosCopy = new ArrayList<>(echos); + Runnable complete = () -> { + stopEchos(echosCopy); + responseObserver.onCompleted(); + System.out.println("Bidi RPC completed"); + }; + echos.add(scheduler.schedule(complete, 2*delayMs, TimeUnit.MILLISECONDS)); + } + + @Override + public void onError(Throwable t) { + System.out.println("Bidi RPC failed: " + Status.fromThrowable(t)); + stopEchos(echos); + scheduler.execute(() -> responseObserver.onError(t)); + } + + public void onCancel() { + // If onCompleted() hasn't been called by this point, then this method and onError are + // both called. If onCompleted() has been called, then just this method is called. + System.out.println("Bidi RPC cancelled"); + stopEchos(echos); + } + + private void stopEchos(List> echos) { + for (Future echo : echos) { + echo.cancel(false); + } + } + } + + EchoObserver requestObserver = new EchoObserver(); + // onCancel() can be called even after the service completes or fails the RPC, because + // callbacks are racy and the response still has to be sent to the client. Use + // setOnCloseHandler() to be notified when the RPC completed without cancellation (as best as + // the server is able to tell). + responseCallObserver.setOnCancelHandler(requestObserver::onCancel); + return requestObserver; + } + + /** + * Echos the request after a delay. It processes the request in-line within the callback. It + * uses Context to be notified of RPC cancellation. + */ + @Override + public void unaryEcho(EchoRequest request, StreamObserver responseObserver) { + // ServerCallStreamObserver.setOnCancelHandler(Runnable) is not useful for this method, since + // this method only returns once it has a result. ServerCallStreamObserver guarantees the + // Runnable is not run at the same time as other RPC callback methods (including this method), + // so the cancellation notification would be guaranteed to occur too late. + System.out.println("\nUnary RPC started: " + request.getMessage()); + Context currentContext = Context.current(); + // Let's start a multi-part operation. We can check cancellation periodically. + for (int i = 0; i < 10; i++) { + // ServerCallStreamObserver.isCancelled() returns true only if the RPC is cancelled. + // Context.isCancelled() is similar, but also returns true when the RPC completes normally. + // It doesn't matter which API is used here. + if (currentContext.isCancelled()) { + System.out.println("Unary RPC cancelled"); + responseObserver.onError( + Status.CANCELLED.withDescription("RPC cancelled").asRuntimeException()); + return; + } + + FutureTask task = new FutureTask<>(() -> { + Thread.sleep(100); // Do some work + return null; + }); + // Some Java blocking APIs have a method to cancel an ongoing operation, like closing an + // InputStream or interrupting the thread. We can use a Context listener to call that API + // from another thread if the RPC is cancelled. + Context.CancellationListener listener = (Context context) -> task.cancel(true); + Context.current().addListener(listener, MoreExecutors.directExecutor()); + task.run(); // A cancellable operation + Context.current().removeListener(listener); + + // gRPC stubs observe io.grpc.Context cancellation, so cancellation is automatically + // propagated when performing an RPC. You can use a different Context or use Context.fork() + // to disable the automatic propagation. For example, + // Context.ROOT.call(() -> futureStub.unaryEcho(request)); + // context.fork().call(() -> futureStub.unaryEcho(request)); + } + responseObserver.onNext( + EchoResponse.newBuilder().setMessage(request.getMessage()).build()); + responseObserver.onCompleted(); + System.out.println("Unary RPC completed"); + } + } +} diff --git a/examples/src/main/proto/grpc/examples/echo/echo.proto b/examples/src/main/proto/grpc/examples/echo/echo.proto new file mode 100644 index 00000000000..b18e295a9a5 --- /dev/null +++ b/examples/src/main/proto/grpc/examples/echo/echo.proto @@ -0,0 +1,48 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; + +option go_package = "google.golang.org/grpc/examples/features/proto/echo"; +option java_multiple_files = true; +option java_package = "io.grpc.examples.echo"; +option java_outer_classname = "EchoProto"; + +package grpc.examples.echo; + +// EchoRequest is the request for echo. +message EchoRequest { + string message = 1; +} + +// EchoResponse is the response for echo. +message EchoResponse { + string message = 1; +} + +// Echo is the echo service. +service Echo { + // UnaryEcho is unary echo. + rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} + // ServerStreamingEcho is server side streaming. + rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} + // ClientStreamingEcho is client side streaming. + rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} + // BidirectionalStreamingEcho is bidi streaming. + rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} +} From 2d38926f1033eaa8651ed09a6e97e1c55b07477d Mon Sep 17 00:00:00 2001 From: Dirk Haubenreisser Date: Thu, 23 Mar 2023 21:21:31 +0100 Subject: [PATCH 16/21] Add support for cross-compiling for s390x platform (#9455) * Added s390x platform support * Adapt to existing platform naming scheme * Updated s390_64 library whitelist * Use g++ compiler version 8.x for s390x * Introduced dedicated Docker container for building s390x artifacts Minor fix --------- Signed-off-by: Dirk Haubenreisser Co-authored-by: Eric Anderson --- buildscripts/build_docker.sh | 4 +++- buildscripts/build_s390x_artifacts_in_docker.sh | 10 ++++++++++ .../grpc-java-artifacts/Dockerfile.multiarch.base | 15 +++++++++++++++ buildscripts/kokoro/linux_artifacts.sh | 8 +++++++- buildscripts/kokoro/unix.sh | 4 +++- buildscripts/kokoro/upload_artifacts.sh | 3 +++ buildscripts/make_dependencies.sh | 2 ++ buildscripts/run_in_docker.sh | 4 +++- compiler/build.gradle | 5 ++++- compiler/check-artifact.sh | 7 +++++++ 10 files changed, 57 insertions(+), 5 deletions(-) create mode 100755 buildscripts/build_s390x_artifacts_in_docker.sh create mode 100644 buildscripts/grpc-java-artifacts/Dockerfile.multiarch.base diff --git a/buildscripts/build_docker.sh b/buildscripts/build_docker.sh index ce89d348ca0..fa75c07c1eb 100755 --- a/buildscripts/build_docker.sh +++ b/buildscripts/build_docker.sh @@ -2,4 +2,6 @@ set -eu -o pipefail readonly buildscripts_dir="$(dirname "$(readlink -f "$0")")" -docker build -t grpc-java-artifacts "$buildscripts_dir"/grpc-java-artifacts +docker build -t grpc-java-artifacts-x86 "$buildscripts_dir"/grpc-java-artifacts +docker build -t grpc-java-artifacts-multiarch -f "$buildscripts_dir"/grpc-java-artifacts/Dockerfile.multiarch.base "$buildscripts_dir"/grpc-java-artifacts + diff --git a/buildscripts/build_s390x_artifacts_in_docker.sh b/buildscripts/build_s390x_artifacts_in_docker.sh new file mode 100755 index 00000000000..1d3c2cffcc7 --- /dev/null +++ b/buildscripts/build_s390x_artifacts_in_docker.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -exu -o pipefail + +# first we need to install the prerequisites required for s390x cross compilation +apt-get update && apt-get install -y g++-s390x-linux-gnu + +# now kick off the build for the mvn artifacts for s390x +# mvn artifacts are stored in grpc-java/mvn-artifacts/ +SKIP_TESTS=true ARCH=s390_64 "$(dirname $0)"/kokoro/unix.sh + diff --git a/buildscripts/grpc-java-artifacts/Dockerfile.multiarch.base b/buildscripts/grpc-java-artifacts/Dockerfile.multiarch.base new file mode 100644 index 00000000000..eb91e06c936 --- /dev/null +++ b/buildscripts/grpc-java-artifacts/Dockerfile.multiarch.base @@ -0,0 +1,15 @@ +FROM ubuntu:22.04 + +# make sure apt-get works in unattended mode +ENV DEBIAN_FRONTEND=noninteractive + +# install the OS-level prerequisites for building protobuf and running the gradle build +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install -y --no-install-recommends ca-certificates build-essential wget curl openjdk-8-jdk && \ + apt-get autoclean -y && \ + apt-get autoremove -y && \ + rm -rf /var/lib/apt/lists/* + +ENV JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64 + diff --git a/buildscripts/kokoro/linux_artifacts.sh b/buildscripts/kokoro/linux_artifacts.sh index 5bea57dcfe7..18956ee7ce7 100755 --- a/buildscripts/kokoro/linux_artifacts.sh +++ b/buildscripts/kokoro/linux_artifacts.sh @@ -11,7 +11,7 @@ readonly GRPC_JAVA_DIR="$(cd "$(dirname "$0")"/../.. && pwd)" trap spongify_logs EXIT "$GRPC_JAVA_DIR"/buildscripts/build_docker.sh -"$GRPC_JAVA_DIR"/buildscripts/run_in_docker.sh /grpc-java/buildscripts/build_artifacts_in_docker.sh +"$GRPC_JAVA_DIR"/buildscripts/run_in_docker.sh grpc-java-artifacts-x86 /grpc-java/buildscripts/build_artifacts_in_docker.sh # grpc-android, grpc-cronet and grpc-binder require the Android SDK, so build outside of Docker and # use --include-build for its grpc-core dependency @@ -59,3 +59,9 @@ SKIP_TESTS=true ARCH=aarch_64 "$GRPC_JAVA_DIR"/buildscripts/kokoro/unix.sh # for ppc64le platform sudo apt-get install -y g++-powerpc64le-linux-gnu SKIP_TESTS=true ARCH=ppcle_64 "$GRPC_JAVA_DIR"/buildscripts/kokoro/unix.sh + +# for s390x platform +# building these artifacts inside a Docker container as we have specific requirements +# for GCC (version 11.x needed) which in turn requires Ubuntu 22.04 LTS +"$GRPC_JAVA_DIR"/buildscripts/run_in_docker.sh grpc-java-artifacts-multiarch /grpc-java/buildscripts/build_s390x_artifacts_in_docker.sh + diff --git a/buildscripts/kokoro/unix.sh b/buildscripts/kokoro/unix.sh index d5a85c7404a..38566e0bb59 100755 --- a/buildscripts/kokoro/unix.sh +++ b/buildscripts/kokoro/unix.sh @@ -11,6 +11,8 @@ # ARCH=aarch_64 ./buildscripts/kokoro/unix.sh # For ppc64le arch: # ARCH=ppcle_64 ./buildscripts/kokoro/unix.sh +# For s390x arch: +# ARCH=s390_64 ./buildscripts/kokoro/unix.sh # This script assumes `set -e`. Removing it may lead to undefined behavior. set -exu -o pipefail @@ -80,7 +82,7 @@ fi LOCAL_MVN_TEMP=$(mktemp -d) # Note that this disables parallel=true from GRADLE_FLAGS if [[ -z "${ALL_ARTIFACTS:-}" ]]; then - if [[ "$ARCH" = "aarch_64" || "$ARCH" = "ppcle_64" ]]; then + if [[ "$ARCH" = "aarch_64" || "$ARCH" = "ppcle_64" || "$ARCH" = "s390_64" ]]; then GRADLE_FLAGS+=" -x grpc-compiler:generateTestProto -x grpc-compiler:generateTestLiteProto" GRADLE_FLAGS+=" -x grpc-compiler:testGolden -x grpc-compiler:testLiteGolden" GRADLE_FLAGS+=" -x grpc-compiler:testDeprecatedGolden -x grpc-compiler:testDeprecatedLiteGolden" diff --git a/buildscripts/kokoro/upload_artifacts.sh b/buildscripts/kokoro/upload_artifacts.sh index ade37ee89bb..39e27eff522 100644 --- a/buildscripts/kokoro/upload_artifacts.sh +++ b/buildscripts/kokoro/upload_artifacts.sh @@ -37,6 +37,9 @@ LOCAL_OTHER_ARTIFACTS="$KOKORO_GFILE_DIR"/github/grpc-java/artifacts/ # for linux ppc64le platform [[ "$(find "$LOCAL_MVN_ARTIFACTS" -type f -iname 'protoc-gen-grpc-java-*-linux-ppcle_64.exe' | wc -l)" != '0' ]] +# for linux s390x platform +[[ "$(find "$LOCAL_MVN_ARTIFACTS" -type f -iname 'protoc-gen-grpc-java-*-linux-s390_64.exe' | wc -l)" != '0' ]] + # from macos job: [[ "$(find "$LOCAL_MVN_ARTIFACTS" -type f -iname 'protoc-gen-grpc-java-*-osx-x86_64.exe' | wc -l)" != '0' ]] # copy all x86 artifacts to aarch until native artifacts are built diff --git a/buildscripts/make_dependencies.sh b/buildscripts/make_dependencies.sh index 0940132eea2..3d02a72f4eb 100755 --- a/buildscripts/make_dependencies.sh +++ b/buildscripts/make_dependencies.sh @@ -38,6 +38,8 @@ else ./configure --disable-shared --host=aarch64-linux-gnu --prefix="$INSTALL_DIR" elif [[ "$ARCH" == ppc* ]]; then ./configure --disable-shared --host=powerpc64le-linux-gnu --prefix="$INSTALL_DIR" + elif [[ "$ARCH" == s390* ]]; then + ./configure --disable-shared --host=s390x-linux-gnu --prefix="$INSTALL_DIR" elif [[ "$ARCH" == loongarch* ]]; then ./configure --disable-shared --host=loongarch64-unknown-linux-gnu --prefix="$INSTALL_DIR" fi diff --git a/buildscripts/run_in_docker.sh b/buildscripts/run_in_docker.sh index 3963368f895..60af68f3bec 100755 --- a/buildscripts/run_in_docker.sh +++ b/buildscripts/run_in_docker.sh @@ -10,6 +10,8 @@ quote() { done } +readonly docker_image=$1; shift + readonly grpc_java_dir="$(dirname "$(readlink -f "$0")")/.." if [[ -t 0 ]]; then DOCKER_ARGS="-it" @@ -21,5 +23,5 @@ fi # the original exit code. $DOCKER_ARGS can not be quoted, otherwise it becomes a '' which confuses # docker. exec docker run $DOCKER_ARGS --rm=true -v "${grpc_java_dir}":/grpc-java -w /grpc-java \ - grpc-java-artifacts \ + $docker_image \ bash -c "function fixFiles() { chown -R $(id -u):$(id -g) /grpc-java; }; trap fixFiles EXIT; $(quote "$@")" diff --git a/compiler/build.gradle b/compiler/build.gradle index c942442e6de..ef4c2ad7790 100644 --- a/compiler/build.gradle +++ b/compiler/build.gradle @@ -58,7 +58,10 @@ model { cppCompiler.executable = 'aarch64-linux-gnu-g++' linker.executable = 'aarch64-linux-gnu-g++' } - target("s390_64") + target("s390_64") { + cppCompiler.executable = 's390x-linux-gnu-g++' + linker.executable = 's390x-linux-gnu-g++' + } target("loongarch_64") } clang(Clang) { diff --git a/compiler/check-artifact.sh b/compiler/check-artifact.sh index 67f01aa97cd..a5f33a35b1c 100755 --- a/compiler/check-artifact.sh +++ b/compiler/check-artifact.sh @@ -68,6 +68,10 @@ checkArch () format="$(powerpc64le-linux-gnu-objdump -f "$1" | grep -o "file format .*$" | grep -o "[^ ]*$")" echo Format=$format assertEq "$format" "elf64-powerpcle" $LINENO + elif [[ "$ARCH" == s390_64 ]]; then + format="$(s390x-linux-gnu-objdump -f "$1" | grep -o "file format .*$" | grep -o "[^ ]*$")" + echo Format=$format + assertEq "$format" "elf64-s390" $LINENO else fail "Unsupported arch: $ARCH" fi @@ -121,6 +125,9 @@ checkDependencies () elif [[ "$ARCH" == ppcle_64 ]]; then dump_cmd='powerpc64le-linux-gnu-objdump -x '"$1"' |grep "NEEDED"' white_list="linux-vdso64\.so\.1\|libpthread\.so\.0\|libm\.so\.6\|libc\.so\.6\|ld64\.so\.2" + elif [[ "$ARCH" == s390_64 ]]; then + dump_cmd='s390x-linux-gnu-objdump -x '"$1"' |grep "NEEDED"' + white_list="linux-vdso64\.so\.1\|libpthread\.so\.0\|libm\.so\.6\|libc\.so\.6\|ld64\.so\.1" fi elif [[ "$OS" == osx ]]; then dump_cmd='otool -L '"$1"' | fgrep dylib' From 490de36d3f44f8a0c8a04465b1730f51eed5bd7b Mon Sep 17 00:00:00 2001 From: larry-safran Date: Wed, 22 Mar 2023 14:31:41 -0700 Subject: [PATCH 17/21] [Examples] health service example Add round robin to the example. fixed --- examples/build.gradle | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/examples/build.gradle b/examples/build.gradle index 56eea09a4ff..7360b1e2bed 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -231,6 +231,20 @@ task cancellationServer(type: CreateStartScripts) { classpath = startScripts.classpath } +task healthServiceServer(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.healthservice.HealthServiceServer' + applicationName = 'health-service-server' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + +task healthServiceClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.healthservice.HealthServiceClient' + applicationName = 'health-service-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + applicationDistribution.into('bin') { from(routeGuideServer) from(routeGuideClient) @@ -256,5 +270,7 @@ applicationDistribution.into('bin') { from(healthServiceClient) from(cancellationClient) from(cancellationServer) + from(healthServiceServer) + from(healthServiceClient) fileMode = 0755 } From c43c4fe40ceb08ca2a6a2e9426cf50e1dd3ef857 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Wed, 29 Mar 2023 23:58:47 +0000 Subject: [PATCH 18/21] fixed --- examples/build.gradle | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/examples/build.gradle b/examples/build.gradle index 7360b1e2bed..56eea09a4ff 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -231,20 +231,6 @@ task cancellationServer(type: CreateStartScripts) { classpath = startScripts.classpath } -task healthServiceServer(type: CreateStartScripts) { - mainClass = 'io.grpc.examples.healthservice.HealthServiceServer' - applicationName = 'health-service-server' - outputDir = new File(project.buildDir, 'tmp/scripts/' + name) - classpath = startScripts.classpath -} - -task healthServiceClient(type: CreateStartScripts) { - mainClass = 'io.grpc.examples.healthservice.HealthServiceClient' - applicationName = 'health-service-client' - outputDir = new File(project.buildDir, 'tmp/scripts/' + name) - classpath = startScripts.classpath -} - applicationDistribution.into('bin') { from(routeGuideServer) from(routeGuideClient) @@ -270,7 +256,5 @@ applicationDistribution.into('bin') { from(healthServiceClient) from(cancellationClient) from(cancellationServer) - from(healthServiceServer) - from(healthServiceClient) fileMode = 0755 } From db31ca7b977095608f0fa24dd82a122a716d9396 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Thu, 30 Mar 2023 00:19:56 +0000 Subject: [PATCH 19/21] fix build --- examples/BUILD.bazel | 1 + .../io/grpc/examples/healthservice/HealthServiceClient.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index 24b668879ce..969cec6fa53 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -98,6 +98,7 @@ java_library( "@io_grpc_grpc_java//services:healthlb", "@io_grpc_grpc_java//stub", "@io_grpc_grpc_proto//:health_proto", + "@io_grpc_grpc_proto//:health_java_proto", "@maven//:com_google_api_grpc_proto_google_common_protos", "@maven//:com_google_code_findbugs_jsr305", "@maven//:com_google_code_gson_gson", diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java index 11c9e972b1d..2b30eb7d033 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java @@ -31,6 +31,7 @@ import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthCheckResponse.ServingStatus; import io.grpc.health.v1.HealthGrpc; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -161,7 +162,7 @@ public static void main(String[] args) throws Exception { System.err.println("Usage: [target [name] [name] ...]"); System.err.println(""); System.err.println(" target The server to connect to. Defaults to " + target); - System.err.println(" name The names you wish to be greeted by. Defaults to " + users); + System.err.println(" name The names you wish to be greeted by. Defaults to " + Arrays.toString(users)); System.exit(1); } target = args[0]; From cebe3e87ad4840571027709f60c6f07fcfcce146 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Wed, 29 Mar 2023 17:41:55 -0700 Subject: [PATCH 20/21] Add health service to README.md. --- examples/README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/examples/README.md b/examples/README.md index f83d04f08d9..ae849fa7d6d 100644 --- a/examples/README.md +++ b/examples/README.md @@ -119,6 +119,20 @@ before trying out the examples.

+-
+ Health Service + + The [health service example](src/main/java/io/grpc/examples/healthservice) + provides a HelloWorld gRPC server that doesn't like short names along with a + health service. It also provides a client application which makes HelloWorld + calls and checks the health status. + + The client application also shows how the round robin load balancer can + utilize the health status to avoid making calls to a service that is + not actively serving. +
+ + - [Keep Alive](src/main/java/io/grpc/examples/keepalive) ### To build the examples From 11f8d067408ea0c9910a568014f08a5a28a1a7e7 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Thu, 30 Mar 2023 12:40:57 -0700 Subject: [PATCH 21/21] Address review comments. --- .../healthservice/HealthServiceClient.java | 25 ++++++--- .../healthservice/HealthServiceServer.java | 53 ++++++++----------- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java index 2b30eb7d033..471084feab6 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java @@ -91,6 +91,11 @@ private static void runTest(String target, String[] users, boolean useRoundRobin ManagedChannelBuilder builder = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()); + // Round Robin, when a healthCheckConfig is present in the default service configuration, runs + // a watch on the health service and when picking an endpoint will + // consider a transport to a server whose service is not in SERVING state to be unavailable. + // Since we only have a single server we are connecting to, then the load balancer will + // return an error without sending the RPC. if (useRoundRobin) { builder = builder .defaultLoadBalancingPolicy("round_robin") @@ -103,7 +108,6 @@ private static void runTest(String target, String[] users, boolean useRoundRobin + " the Round Robin load balancer\n"); try { - // Set a watch HealthServiceClient client = new HealthServiceClient(channel); if (!useRoundRobin) { client.checkHealth("Before call"); @@ -143,11 +147,15 @@ private static Map generateHealthConfig(String serviceName) { } /** - * Greet server. If provided, the first element of {@code args} is the name to use in the - * greeting. The second argument is the target server. The server should also provide the health - * service. This has an example of using the health service directly through the unary call check - * to get the current health and indirectly through the round robin load balancer, which uses the - * streaming rpc (see {@link io.grpc.protobuf.services.HealthCheckingLoadBalancerFactory}). + * Uses a server with both a greet service and the health service. + * If provided, the first element of {@code args} is the name to use in the + * greeting. The second argument is the target server. + * This has an example of using the health service directly through the unary call + * check + * to get the current health. It also utilizes the health of the server's greet service + * indirectly through the round robin load balancer, which uses the streaming rpc + * watch (you can see how it is done in + * {@link io.grpc.protobuf.services.HealthCheckingLoadBalancerFactory}). */ public static void main(String[] args) throws Exception { System.setProperty("java.util.logging.SimpleFormatter.format", @@ -174,11 +182,12 @@ public static void main(String[] args) throws Exception { } } - // Will see failures of rpc's sent when server stops processing them that come from the server + // Will see failures of rpc's sent while server service is not serving, where the failures come + // from the server runTest(target, users, false); // The client will throw an error when sending the rpc to a non-serving service because the - // round robin load balancer uses the health service's watch rpc + // round robin load balancer uses the health service's watch rpc. runTest(target, users, true); } diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java index 25790b7013e..f6547c11103 100644 --- a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java @@ -17,7 +17,6 @@ package io.grpc.examples.healthservice; import io.grpc.Grpc; -import io.grpc.Context; import io.grpc.InsecureServerCredentials; import io.grpc.Server; import io.grpc.Status; @@ -28,7 +27,6 @@ import io.grpc.protobuf.services.HealthStatusManager; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -106,44 +104,37 @@ public void sayHello(HelloRequest req, StreamObserver responseObserv return; } - if (validateRequest(req)) { + if (isNameLongEnough(req)) { HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); responseObserver.onNext(reply); responseObserver.onCompleted(); } else { + logger.warning("Tiny message received, throwing a temper tantrum"); + health.setStatus("", ServingStatus.NOT_SERVING); + isServing = false; + + // In 10 seconds set it back to serving + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + isServing = true; + health.setStatus("", ServingStatus.SERVING); + logger.info("tantrum complete"); + } + }).start(); responseObserver.onError( Status.INVALID_ARGUMENT.withDescription("Offended by short name").asRuntimeException()); } } - private boolean validateRequest(HelloRequest req) { - if (!isServing) { - return false; - } - if (req.getName().length() >= 5) { - return true; - } - - logger.warning("Tiny message received, throwing a temper tantrum"); - health.setStatus("", ServingStatus.NOT_SERVING); - isServing = false; - - // In 10 seconds set it back to serving - new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - isServing = true; - health.setStatus("", ServingStatus.SERVING); - logger.info("tantrum complete"); - } - }).start(); - return false; + private boolean isNameLongEnough(HelloRequest req) { + return isServing && req.getName().length() >= 5; } } }