From c065b809b544f71d56d353e19d531020fb263b24 Mon Sep 17 00:00:00 2001 From: Smityz Date: Sat, 19 Nov 2022 03:18:28 +0800 Subject: [PATCH 1/4] add examples of name resolve and load balance --- examples/BUILD.bazel | 19 ++++ examples/build.gradle | 16 +++ examples/pom.xml | 6 +- .../loadbalance/ExampleNameResolver.java | 94 ++++++++++++++++++ .../ExampleNameResolverProvider.java | 47 +++++++++ .../loadbalance/LoadBalanceClient.java | 88 +++++++++++++++++ .../loadbalance/LoadBalanceServer.java | 97 +++++++++++++++++++ 7 files changed, 364 insertions(+), 3 deletions(-) create mode 100644 examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java create mode 100644 examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolverProvider.java create mode 100644 examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java create mode 100644 examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceServer.java diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index e7f00381ad1..c8b0452f135 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -135,3 +135,22 @@ java_binary( ":examples", ], ) + +java_binary( + name = "load-balance-client", + testonly = 1, + main_class = "io.grpc.examples.loadbalance.LoadBalanceClient", + runtime_deps = [ + ":examples", + ], +) + +java_binary( + name = "load-balance-server", + testonly = 1, + main_class = "io.grpc.examples.loadbalance.LoadBalanceServer", + runtime_deps = [ + ":examples", + ], +) + diff --git a/examples/build.gradle b/examples/build.gradle index db21f6edd2e..30eb70ea3b4 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -140,6 +140,20 @@ task manualFlowControlServer(type: CreateStartScripts) { classpath = startScripts.classpath } +task loadBalanceServer(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.loadbalance.LoadBalanceServer' + applicationName = 'load-balance-server' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + +task loadBalanceClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.loadbalance.LoadBalanceClient' + applicationName = 'load-balance-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + applicationDistribution.into('bin') { from(routeGuideServer) from(routeGuideClient) @@ -152,5 +166,7 @@ applicationDistribution.into('bin') { from(compressingHelloWorldClient) from(manualFlowControlClient) from(manualFlowControlServer) + from(loadBalanceServer) + from(loadBalanceClient) fileMode = 0755 } diff --git a/examples/pom.xml b/examples/pom.xml index c4ecdfe4f93..9e57a781612 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -15,9 +15,9 @@ 1.52.0-SNAPSHOT 3.21.7 3.21.7 - - 1.7 - 1.7 + + 1.8 + 1.8 diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java new file mode 100644 index 00000000000..51e998170c8 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java @@ -0,0 +1,94 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.loadbalance; + +import io.grpc.EquivalentAddressGroup; +import io.grpc.NameResolver; +import io.grpc.Status; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class ExampleNameResolver extends NameResolver { + + private Listener2 listener; + + private final String authority; + + // authority is the string from the target URI passed to gRPC + public ExampleNameResolver(String authority) { + this.authority = authority; + } + + @Override + public String getServiceAuthority() { + return authority; + } + + @Override + public void shutdown() { + } + + @Override + public void start(Listener2 listener) { + this.listener = listener; + this.resolve(); + } + + @Override + public void refresh() { + this.resolve(); + } + + private void resolve() { + List addresses = new ArrayList<>(); + for (int i = 0; i < LoadBalanceServer.serverCount; i++) { + addresses.add(new InetSocketAddress("localhost", LoadBalanceServer.startPort + i)); + } + try { + List equivalentAddressGroup = addresses.stream() + // convert to socket address + .map(this::toSocketAddress) + // every socket address is a single EquivalentAddressGroup, so they can be accessed randomly + .map(address -> Arrays.asList(address)) + .map(this::addrToEquivalentAddressGroup) + .collect(Collectors.toList()); + + ResolutionResult resolutionResult = ResolutionResult.newBuilder() + .setAddresses(equivalentAddressGroup) + .build(); + + this.listener.onResult(resolutionResult); + + } catch (Exception e){ + // when error occurs, notify listener + this.listener.onError(Status.UNAVAILABLE.withDescription("Unable to resolve host ").withCause(e)); + } + } + + private SocketAddress toSocketAddress(InetSocketAddress address) { + return new InetSocketAddress(address.getHostName(), address.getPort()); + } + + private EquivalentAddressGroup addrToEquivalentAddressGroup(List addrList) { + return new EquivalentAddressGroup(addrList); + } +} \ No newline at end of file diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolverProvider.java b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolverProvider.java new file mode 100644 index 00000000000..6c9af404723 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolverProvider.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.loadbalance; + +import io.grpc.NameResolver; +import io.grpc.NameResolverProvider; + +import java.net.URI; + +public class ExampleNameResolverProvider extends NameResolverProvider { + public static final String exampleScheme = "example"; + + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + return new ExampleNameResolver(targetUri.toString()); + } + + @Override + protected boolean isAvailable() { + return true; + } + + @Override + protected int priority() { + return 5; + } + + @Override + // gRPC choose the first NameResolverProvider that supports the target URI scheme. + public String getDefaultScheme() { + return exampleScheme; + } +} diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java new file mode 100644 index 00000000000..c7e2c389b8a --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java @@ -0,0 +1,88 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.loadbalance; + +import io.grpc.*; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; + + +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static io.grpc.examples.loadbalance.ExampleNameResolverProvider.exampleScheme; + +public class LoadBalanceClient { + private static final Logger logger = Logger.getLogger(LoadBalanceClient.class.getName()); + + private final GreeterGrpc.GreeterBlockingStub blockingStub; + + public LoadBalanceClient(Channel channel) { + blockingStub = GreeterGrpc.newBlockingStub(channel); + } + + public void greet(String name) { + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response; + try { + response = blockingStub.sayHello(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + logger.info("Greeting: " + response.getMessage()); + } + + + public static void main(String[] args) throws Exception { + NameResolverRegistry.getDefaultRegistry().register(new ExampleNameResolverProvider()); + + logger.info("Use default DNS resolver"); + ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:50051") + .defaultLoadBalancingPolicy("round_robin") + .usePlaintext() + .build(); + try { + LoadBalanceClient client = new LoadBalanceClient(channel); + for (int i = 0; i < 5; i++) { + client.greet("request" + i); + } + } finally { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + + logger.info("Change to use example name resolver"); + /** + * Dial to "example:///resolver.example.grpc.io", use {@link ExampleNameResolver} to create connection + */ + channel = ManagedChannelBuilder.forTarget( + String.format("%s:///%s", exampleScheme, "resolver.example.grpc.io")) + .defaultLoadBalancingPolicy("round_robin") + .usePlaintext() + .build(); + try { + LoadBalanceClient client = new LoadBalanceClient(channel); + for (int i = 0; i < 5; i++) { + client.greet("request" + i); + } + } finally { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } +} diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceServer.java b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceServer.java new file mode 100644 index 00000000000..3d3311a3d6e --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceServer.java @@ -0,0 +1,97 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.loadbalance; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.stub.StreamObserver; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class LoadBalanceServer { + private static final Logger logger = Logger.getLogger(LoadBalanceServer.class.getName()); + static public final int serverCount = 3; + static public final int startPort = 50051; + private Server[] servers; + + private void start() throws IOException { + servers = new Server[serverCount]; + for (int i = 0; i < serverCount; i++) { + int port = startPort + i; + servers[i] = ServerBuilder.forPort(port) + .addService(new GreeterImpl(port)) + .build() + .start(); + logger.info("Server started, listening on " + port); + } + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + LoadBalanceServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + } + }); + } + + private void stop() throws InterruptedException { + for (int i = 0; i < serverCount; i++) { + if (servers[i] != null) { + servers[i].shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + } + + private void blockUntilShutdown() throws InterruptedException { + for (int i = 0; i < serverCount; i++) { + if (servers[i] != null) { + servers[i].awaitTermination(); + } + } + } + + public static void main(String[] args) throws IOException, InterruptedException { + final LoadBalanceServer server = new LoadBalanceServer(); + server.start(); + server.blockUntilShutdown(); + } + + static class GreeterImpl extends GreeterGrpc.GreeterImplBase { + + int port; + + public GreeterImpl(int port) { + this.port = port; + } + + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName() + " from server<" + this.port + ">").build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + } +} From 52277a3a1fd3691f779911bef43207a9630994dc Mon Sep 17 00:00:00 2001 From: Smityz Date: Sat, 19 Nov 2022 03:25:09 +0800 Subject: [PATCH 2/4] add a newline --- .../java/io/grpc/examples/loadbalance/ExampleNameResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java index 51e998170c8..cba7df495f6 100644 --- a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java +++ b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java @@ -91,4 +91,4 @@ private SocketAddress toSocketAddress(InetSocketAddress address) { private EquivalentAddressGroup addrToEquivalentAddressGroup(List addrList) { return new EquivalentAddressGroup(addrList); } -} \ No newline at end of file +} From a4dde5ad4353c88dc62d17ed9368bcc1ae5a492c Mon Sep 17 00:00:00 2001 From: Smityz Date: Thu, 24 Nov 2022 19:31:45 +0800 Subject: [PATCH 3/4] be consistent with behavior in grpc-go --- .../loadbalance/ExampleNameResolver.java | 38 ++++++++++++++----- .../ExampleNameResolverProvider.java | 6 +-- .../loadbalance/LoadBalanceClient.java | 8 ++-- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java index cba7df495f6..8ddab205257 100644 --- a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java +++ b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java @@ -16,31 +16,52 @@ package io.grpc.examples.loadbalance; +import com.google.common.collect.ImmutableMap; import io.grpc.EquivalentAddressGroup; import io.grpc.NameResolver; import io.grpc.Status; +import jdk.nashorn.internal.ir.annotations.Immutable; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.grpc.examples.loadbalance.LoadBalanceClient.exampleServiceName; public class ExampleNameResolver extends NameResolver { private Listener2 listener; - private final String authority; - - // authority is the string from the target URI passed to gRPC - public ExampleNameResolver(String authority) { - this.authority = authority; + private final URI uri; + + private final Map> addrStore; + + public ExampleNameResolver(URI targetUri) { + this.uri = targetUri; + // This is a fake name resolver, so we just hard code the address here. + addrStore = ImmutableMap.>builder() + .put(exampleServiceName, + Stream.iterate(LoadBalanceServer.startPort,p->p+1) + .limit(LoadBalanceServer.serverCount) + .map(port->new InetSocketAddress("localhost",port)) + .collect(Collectors.toList()) + ) + .build(); } @Override public String getServiceAuthority() { - return authority; + // Be consistent with behavior in grpc-go, authority is saved in Host field of URI. + if (uri.getHost() != null) { + return uri.getHost(); + } + return "no host"; } @Override @@ -59,10 +80,7 @@ public void refresh() { } private void resolve() { - List addresses = new ArrayList<>(); - for (int i = 0; i < LoadBalanceServer.serverCount; i++) { - addresses.add(new InetSocketAddress("localhost", LoadBalanceServer.startPort + i)); - } + List addresses = addrStore.get(uri.getPath().substring(1)); try { List equivalentAddressGroup = addresses.stream() // convert to socket address diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolverProvider.java b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolverProvider.java index 6c9af404723..ee966fd044c 100644 --- a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolverProvider.java +++ b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolverProvider.java @@ -21,12 +21,12 @@ import java.net.URI; -public class ExampleNameResolverProvider extends NameResolverProvider { - public static final String exampleScheme = "example"; +import static io.grpc.examples.loadbalance.LoadBalanceClient.exampleScheme; +public class ExampleNameResolverProvider extends NameResolverProvider { @Override public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { - return new ExampleNameResolver(targetUri.toString()); + return new ExampleNameResolver(targetUri); } @Override diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java index c7e2c389b8a..82b07d4879a 100644 --- a/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java +++ b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java @@ -26,11 +26,12 @@ import java.util.logging.Level; import java.util.logging.Logger; -import static io.grpc.examples.loadbalance.ExampleNameResolverProvider.exampleScheme; - public class LoadBalanceClient { private static final Logger logger = Logger.getLogger(LoadBalanceClient.class.getName()); + public static final String exampleScheme = "example"; + public static final String exampleServiceName = "lb.example.grpc.io"; + private final GreeterGrpc.GreeterBlockingStub blockingStub; public LoadBalanceClient(Channel channel) { @@ -70,9 +71,10 @@ public static void main(String[] args) throws Exception { logger.info("Change to use example name resolver"); /** * Dial to "example:///resolver.example.grpc.io", use {@link ExampleNameResolver} to create connection + * "resolver.example.grpc.io" is converted to {@link java.net.URI.path} */ channel = ManagedChannelBuilder.forTarget( - String.format("%s:///%s", exampleScheme, "resolver.example.grpc.io")) + String.format("%s:///%s", exampleScheme, exampleServiceName)) .defaultLoadBalancingPolicy("round_robin") .usePlaintext() .build(); From 9bd897c311b0243e5e2b1fb1342c798f30ee5f70 Mon Sep 17 00:00:00 2001 From: Smityz Date: Wed, 30 Nov 2022 22:42:25 +0800 Subject: [PATCH 4/4] split examples & fix warnings --- examples/BUILD.bazel | 18 +++ examples/build.gradle | 16 +++ .../loadbalance/ExampleNameResolver.java | 4 +- .../loadbalance/LoadBalanceClient.java | 17 +-- .../loadbalance/LoadBalanceServer.java | 19 ++- .../nameresolve/ExampleNameResolver.java | 108 ++++++++++++++++++ .../ExampleNameResolverProvider.java | 47 ++++++++ .../nameresolve/NameResolveClient.java | 85 ++++++++++++++ .../nameresolve/NameResolveServer.java | 94 +++++++++++++++ 9 files changed, 383 insertions(+), 25 deletions(-) create mode 100644 examples/src/main/java/io/grpc/examples/nameresolve/ExampleNameResolver.java create mode 100644 examples/src/main/java/io/grpc/examples/nameresolve/ExampleNameResolverProvider.java create mode 100644 examples/src/main/java/io/grpc/examples/nameresolve/NameResolveClient.java create mode 100644 examples/src/main/java/io/grpc/examples/nameresolve/NameResolveServer.java diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index c8b0452f135..2a5ed52b35c 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -154,3 +154,21 @@ java_binary( ], ) +java_binary( + name = "name-resolve-client", + testonly = 1, + main_class = "io.grpc.examples.nameresolve.NameResolveClient", + runtime_deps = [ + ":examples", + ], +) + +java_binary( + name = "name-resolve-server", + testonly = 1, + main_class = "io.grpc.examples.nameresolve.NameResolveServer", + runtime_deps = [ + ":examples", + ], +) + diff --git a/examples/build.gradle b/examples/build.gradle index 30eb70ea3b4..79ab93a242f 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -154,6 +154,20 @@ task loadBalanceClient(type: CreateStartScripts) { classpath = startScripts.classpath } +task nameResolveServer(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.nameresolve.NameResolveServer' + applicationName = 'name-resolve-server' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + +task nameResolveClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.nameresolve.NameResolveClient' + applicationName = 'name-resolve-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + applicationDistribution.into('bin') { from(routeGuideServer) from(routeGuideClient) @@ -168,5 +182,7 @@ applicationDistribution.into('bin') { from(manualFlowControlServer) from(loadBalanceServer) from(loadBalanceClient) + from(nameResolveServer) + from(nameResolveClient) fileMode = 0755 } diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java index 8ddab205257..f562f0ac107 100644 --- a/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java +++ b/examples/src/main/java/io/grpc/examples/loadbalance/ExampleNameResolver.java @@ -20,12 +20,10 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.NameResolver; import io.grpc.Status; -import jdk.nashorn.internal.ir.annotations.Immutable; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -86,7 +84,7 @@ private void resolve() { // convert to socket address .map(this::toSocketAddress) // every socket address is a single EquivalentAddressGroup, so they can be accessed randomly - .map(address -> Arrays.asList(address)) + .map(Arrays::asList) .map(this::addrToEquivalentAddressGroup) .collect(Collectors.toList()); diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java index 82b07d4879a..97444922871 100644 --- a/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java +++ b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceClient.java @@ -21,7 +21,6 @@ import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; - import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -54,9 +53,10 @@ public void greet(String name) { public static void main(String[] args) throws Exception { NameResolverRegistry.getDefaultRegistry().register(new ExampleNameResolverProvider()); - logger.info("Use default DNS resolver"); - ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:50051") - .defaultLoadBalancingPolicy("round_robin") + String target = String.format("%s:///%s", exampleScheme, exampleServiceName); + + logger.info("Use default first_pick load balance policy"); + ManagedChannel channel = ManagedChannelBuilder.forTarget(target) .usePlaintext() .build(); try { @@ -68,13 +68,8 @@ public static void main(String[] args) throws Exception { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } - logger.info("Change to use example name resolver"); - /** - * Dial to "example:///resolver.example.grpc.io", use {@link ExampleNameResolver} to create connection - * "resolver.example.grpc.io" is converted to {@link java.net.URI.path} - */ - channel = ManagedChannelBuilder.forTarget( - String.format("%s:///%s", exampleScheme, exampleServiceName)) + logger.info("Change to round_robin policy"); + channel = ManagedChannelBuilder.forTarget(target) .defaultLoadBalancingPolicy("round_robin") .usePlaintext() .build(); diff --git a/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceServer.java b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceServer.java index 3d3311a3d6e..c97d209497a 100644 --- a/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceServer.java +++ b/examples/src/main/java/io/grpc/examples/loadbalance/LoadBalanceServer.java @@ -43,18 +43,15 @@ private void start() throws IOException { .start(); logger.info("Server started, listening on " + port); } - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - try { - LoadBalanceServer.this.stop(); - } catch (InterruptedException e) { - e.printStackTrace(System.err); - } - System.err.println("*** server shut down"); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + LoadBalanceServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); } - }); + System.err.println("*** server shut down"); + })); } private void stop() throws InterruptedException { diff --git a/examples/src/main/java/io/grpc/examples/nameresolve/ExampleNameResolver.java b/examples/src/main/java/io/grpc/examples/nameresolve/ExampleNameResolver.java new file mode 100644 index 00000000000..95bf20dd580 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/nameresolve/ExampleNameResolver.java @@ -0,0 +1,108 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.nameresolve; + +import com.google.common.collect.ImmutableMap; +import io.grpc.EquivalentAddressGroup; +import io.grpc.NameResolver; +import io.grpc.Status; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.grpc.examples.loadbalance.LoadBalanceClient.exampleServiceName; + +public class ExampleNameResolver extends NameResolver { + + private final URI uri; + private final Map> addrStore; + private Listener2 listener; + + public ExampleNameResolver(URI targetUri) { + this.uri = targetUri; + // This is a fake name resolver, so we just hard code the address here. + addrStore = ImmutableMap.>builder() + .put(exampleServiceName, + Stream.iterate(NameResolveServer.startPort, p -> p + 1) + .limit(NameResolveServer.serverCount) + .map(port -> new InetSocketAddress("localhost", port)) + .collect(Collectors.toList()) + ) + .build(); + } + + @Override + public String getServiceAuthority() { + // Be consistent with behavior in grpc-go, authority is saved in Host field of URI. + if (uri.getHost() != null) { + return uri.getHost(); + } + return "no host"; + } + + @Override + public void shutdown() { + } + + @Override + public void start(Listener2 listener) { + this.listener = listener; + this.resolve(); + } + + @Override + public void refresh() { + this.resolve(); + } + + private void resolve() { + List addresses = addrStore.get(uri.getPath().substring(1)); + try { + List equivalentAddressGroup = addresses.stream() + // convert to socket address + .map(this::toSocketAddress) + // every socket address is a single EquivalentAddressGroup, so they can be accessed randomly + .map(Arrays::asList) + .map(this::addrToEquivalentAddressGroup) + .collect(Collectors.toList()); + + ResolutionResult resolutionResult = ResolutionResult.newBuilder() + .setAddresses(equivalentAddressGroup) + .build(); + + this.listener.onResult(resolutionResult); + + } catch (Exception e) { + // when error occurs, notify listener + this.listener.onError(Status.UNAVAILABLE.withDescription("Unable to resolve host ").withCause(e)); + } + } + + private SocketAddress toSocketAddress(InetSocketAddress address) { + return new InetSocketAddress(address.getHostName(), address.getPort()); + } + + private EquivalentAddressGroup addrToEquivalentAddressGroup(List addrList) { + return new EquivalentAddressGroup(addrList); + } +} diff --git a/examples/src/main/java/io/grpc/examples/nameresolve/ExampleNameResolverProvider.java b/examples/src/main/java/io/grpc/examples/nameresolve/ExampleNameResolverProvider.java new file mode 100644 index 00000000000..cd05f3214f6 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/nameresolve/ExampleNameResolverProvider.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.nameresolve; + +import io.grpc.NameResolver; +import io.grpc.NameResolverProvider; + +import java.net.URI; + +import static io.grpc.examples.loadbalance.LoadBalanceClient.exampleScheme; + +public class ExampleNameResolverProvider extends NameResolverProvider { + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + return new ExampleNameResolver(targetUri); + } + + @Override + protected boolean isAvailable() { + return true; + } + + @Override + protected int priority() { + return 5; + } + + @Override + // gRPC choose the first NameResolverProvider that supports the target URI scheme. + public String getDefaultScheme() { + return exampleScheme; + } +} diff --git a/examples/src/main/java/io/grpc/examples/nameresolve/NameResolveClient.java b/examples/src/main/java/io/grpc/examples/nameresolve/NameResolveClient.java new file mode 100644 index 00000000000..ac6fdd32549 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/nameresolve/NameResolveClient.java @@ -0,0 +1,85 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.nameresolve; + +import io.grpc.*; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; + +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class NameResolveClient { + public static final String exampleScheme = "example"; + public static final String exampleServiceName = "lb.example.grpc.io"; + private static final Logger logger = Logger.getLogger(NameResolveClient.class.getName()); + private final GreeterGrpc.GreeterBlockingStub blockingStub; + + public NameResolveClient(Channel channel) { + blockingStub = GreeterGrpc.newBlockingStub(channel); + } + + public static void main(String[] args) throws Exception { + NameResolverRegistry.getDefaultRegistry().register(new ExampleNameResolverProvider()); + + logger.info("Use default DNS resolver"); + ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:50051") + .usePlaintext() + .build(); + try { + NameResolveClient client = new NameResolveClient(channel); + for (int i = 0; i < 5; i++) { + client.greet("request" + i); + } + } finally { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + + logger.info("Change to use example name resolver"); + /* + Dial to "example:///resolver.example.grpc.io", use {@link ExampleNameResolver} to create connection + "resolver.example.grpc.io" is converted to {@link java.net.URI.path} + */ + channel = ManagedChannelBuilder.forTarget( + String.format("%s:///%s", exampleScheme, exampleServiceName)) + .defaultLoadBalancingPolicy("round_robin") + .usePlaintext() + .build(); + try { + NameResolveClient client = new NameResolveClient(channel); + for (int i = 0; i < 5; i++) { + client.greet("request" + i); + } + } finally { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } + + public void greet(String name) { + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response; + try { + response = blockingStub.sayHello(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + logger.info("Greeting: " + response.getMessage()); + } +} diff --git a/examples/src/main/java/io/grpc/examples/nameresolve/NameResolveServer.java b/examples/src/main/java/io/grpc/examples/nameresolve/NameResolveServer.java new file mode 100644 index 00000000000..0a402485906 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/nameresolve/NameResolveServer.java @@ -0,0 +1,94 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.nameresolve; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.stub.StreamObserver; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class NameResolveServer { + static public final int serverCount = 3; + static public final int startPort = 50051; + private static final Logger logger = Logger.getLogger(NameResolveServer.class.getName()); + private Server[] servers; + + public static void main(String[] args) throws IOException, InterruptedException { + final NameResolveServer server = new NameResolveServer(); + server.start(); + server.blockUntilShutdown(); + } + + private void start() throws IOException { + servers = new Server[serverCount]; + for (int i = 0; i < serverCount; i++) { + int port = startPort + i; + servers[i] = ServerBuilder.forPort(port) + .addService(new GreeterImpl(port)) + .build() + .start(); + logger.info("Server started, listening on " + port); + } + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + NameResolveServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + })); + } + + private void stop() throws InterruptedException { + for (int i = 0; i < serverCount; i++) { + if (servers[i] != null) { + servers[i].shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + } + + private void blockUntilShutdown() throws InterruptedException { + for (int i = 0; i < serverCount; i++) { + if (servers[i] != null) { + servers[i].awaitTermination(); + } + } + } + + static class GreeterImpl extends GreeterGrpc.GreeterImplBase { + + int port; + + public GreeterImpl(int port) { + this.port = port; + } + + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName() + " from server<" + this.port + ">").build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + } +}