From 69e8204066fd4bfc442b7abda80a0802ea21ae61 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 11 Sep 2020 14:56:03 -0700 Subject: [PATCH] xds: promote XdsNameResolver2 (#7416) --- .../grpc/xds/ClusterManagerLoadBalancer.java | 2 +- .../java/io/grpc/xds/XdsNameResolver.java | 496 +++++++------ .../java/io/grpc/xds/XdsNameResolver2.java | 374 ---------- .../io/grpc/xds/XdsNameResolverProvider.java | 65 +- .../io/grpc/xds/XdsNameResolverProvider2.java | 132 ---- .../xds/ClusterManagerLoadBalancerTest.java | 2 +- .../io/grpc/xds/XdsNameResolver2Test.java | 554 -------------- .../xds/XdsNameResolverIntegrationTest.java | 570 -------------- .../java/io/grpc/xds/XdsNameResolverTest.java | 698 ++++++++++++------ 9 files changed, 804 insertions(+), 2089 deletions(-) delete mode 100644 xds/src/main/java/io/grpc/xds/XdsNameResolver2.java delete mode 100644 xds/src/main/java/io/grpc/xds/XdsNameResolverProvider2.java delete mode 100644 xds/src/test/java/io/grpc/xds/XdsNameResolver2Test.java delete mode 100644 xds/src/test/java/io/grpc/xds/XdsNameResolverIntegrationTest.java diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java index f8901fb0ebe..91b7a93151f 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java @@ -145,7 +145,7 @@ private void updateOverallBalancingState() { @Override public PickResult pickSubchannel(PickSubchannelArgs args) { String clusterName = - args.getCallOptions().getOption(XdsNameResolver2.CLUSTER_SELECTION_KEY); + args.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY); SubchannelPicker delegate = childPickers.get(clusterName); if (delegate == null) { return diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index a94360848fe..22f74f7f795 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -19,44 +19,38 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import com.google.gson.Gson; -import com.google.re2j.Pattern; import io.grpc.Attributes; -import io.grpc.EquivalentAddressGroup; +import io.grpc.CallOptions; +import io.grpc.InternalConfigSelector; import io.grpc.InternalLogId; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.Metadata; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.SynchronizationContext; -import io.grpc.internal.BackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.ClusterWeight; -import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.EnvoyProtoData.Route; import io.grpc.xds.EnvoyProtoData.RouteAction; -import io.grpc.xds.RouteMatch.FractionMatcher; -import io.grpc.xds.RouteMatch.HeaderMatcher; -import io.grpc.xds.RouteMatch.PathMatcher; +import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; import io.grpc.xds.XdsClient.ConfigUpdate; import io.grpc.xds.XdsClient.ConfigWatcher; -import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool; -import io.grpc.xds.XdsClient.XdsChannelFactory; -import io.grpc.xds.XdsClient.XdsClientFactory; +import io.grpc.xds.XdsClient.XdsClientPoolFactory; import io.grpc.xds.XdsLogger.XdsLogLevel; -import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import javax.annotation.Nullable; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; /** * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme. @@ -68,36 +62,45 @@ */ final class XdsNameResolver extends NameResolver { + static final CallOptions.Key CLUSTER_SELECTION_KEY = + CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY"); + private final XdsLogger logger; private final String authority; - private final XdsChannelFactory channelFactory; - private final SynchronizationContext syncContext; - private final ScheduledExecutorService timeService; private final ServiceConfigParser serviceConfigParser; - private final BackoffPolicy.Provider backoffPolicyProvider; - private final Supplier stopwatchSupplier; + private final SynchronizationContext syncContext; private final Bootstrapper bootstrapper; + private final XdsClientPoolFactory xdsClientPoolFactory; + private final ThreadSafeRandom random; + private final ConcurrentMap clusterRefs = new ConcurrentHashMap<>(); + private final ConfigSelector configSelector = new ConfigSelector(); - @Nullable + private volatile List routes = Collections.emptyList(); + private Listener2 listener; private ObjectPool xdsClientPool; - @Nullable private XdsClient xdsClient; + XdsNameResolver(String name, + ServiceConfigParser serviceConfigParser, + SynchronizationContext syncContext, + XdsClientPoolFactory xdsClientPoolFactory) { + this(name, serviceConfigParser, syncContext, Bootstrapper.getInstance(), + xdsClientPoolFactory, ThreadSafeRandomImpl.instance); + } + XdsNameResolver( String name, - Args args, - BackoffPolicy.Provider backoffPolicyProvider, - Supplier stopwatchSupplier, - XdsChannelFactory channelFactory, - Bootstrapper bootstrapper) { + ServiceConfigParser serviceConfigParser, + SynchronizationContext syncContext, + Bootstrapper bootstrapper, + XdsClientPoolFactory xdsClientPoolFactory, + ThreadSafeRandom random) { authority = GrpcUtil.checkAuthority(checkNotNull(name, "name")); - this.channelFactory = checkNotNull(channelFactory, "channelFactory"); - this.syncContext = checkNotNull(args.getSynchronizationContext(), "syncContext"); - this.timeService = checkNotNull(args.getScheduledExecutorService(), "timeService"); - this.serviceConfigParser = checkNotNull(args.getServiceConfigParser(), "serviceConfigParser"); - this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); - this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); + this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser"); + this.syncContext = checkNotNull(syncContext, "syncContext"); this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper"); + this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); + this.random = checkNotNull(random, "random"); logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name)); logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name); } @@ -109,78 +112,242 @@ public String getServiceAuthority() { @Override public void start(Listener2 listener) { + this.listener = checkNotNull(listener, "listener"); BootstrapInfo bootstrapInfo; try { bootstrapInfo = bootstrapper.readBootstrap(); } catch (Exception e) { - listener.onError(Status.UNAVAILABLE.withDescription("Failed to bootstrap").withCause(e)); - return; - } - final List serverList = bootstrapInfo.getServers(); - final Node node = bootstrapInfo.getNode(); - if (serverList.isEmpty()) { listener.onError( - Status.UNAVAILABLE.withDescription("No management server provided by bootstrap")); + Status.UNAVAILABLE.withDescription("Failed to load xDS bootstrap").withCause(e)); return; } - - XdsClientFactory xdsClientFactory = new XdsClientFactory() { - @Override - XdsClient createXdsClient() { - return - new XdsClientImpl( - authority, - serverList, - channelFactory, - node, - syncContext, - timeService, - backoffPolicyProvider, - stopwatchSupplier); - } - }; - xdsClientPool = new RefCountedXdsClientObjectPool(xdsClientFactory); + xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo); xdsClient = xdsClientPool.getObject(); - xdsClient.watchConfigData(authority, new ConfigWatcherImpl(listener)); + xdsClient.watchConfigData(authority, new ConfigWatcherImpl()); } - private class ConfigWatcherImpl implements ConfigWatcher { + @Override + public void shutdown() { + logger.log(XdsLogLevel.INFO, "Shutdown"); + if (xdsClient != null) { + xdsClient = xdsClientPool.returnObject(xdsClient); + } + } - final Listener2 listener; + @VisibleForTesting + static Map generateServiceConfigWithMethodTimeoutConfig(long timeoutNano) { + String timeout = timeoutNano / 1_000_000_000.0 + "s"; + Map methodConfig = new HashMap<>(); + methodConfig.put( + "name", Collections.singletonList(Collections.emptyMap())); + methodConfig.put("timeout", timeout); + return Collections.singletonMap( + "methodConfig", Collections.singletonList(Collections.unmodifiableMap(methodConfig))); + } - ConfigWatcherImpl(Listener2 listener) { - this.listener = listener; + @VisibleForTesting + static Map generateServiceConfigWithLoadBalancingConfig(Collection clusters) { + Map childPolicy = new HashMap<>(); + for (String cluster : clusters) { + List>> lbPolicy = + Collections.singletonList( + Collections.singletonMap( + "cds_experimental", Collections.singletonMap("cluster", cluster))); + childPolicy.put(cluster, Collections.singletonMap("lbPolicy", lbPolicy)); } + return Collections.singletonMap("loadBalancingConfig", + Collections.singletonList( + Collections.singletonMap( + "cluster_manager_experimental", Collections.singletonMap( + "childPolicy", Collections.unmodifiableMap(childPolicy))))); + } - @Override - public void onConfigChanged(ConfigUpdate update) { + @VisibleForTesting + XdsClient getXdsClient() { + return xdsClient; + } + + private void updateResolutionResult() { + Map rawServiceConfig = + generateServiceConfigWithLoadBalancingConfig(clusterRefs.keySet()); + if (logger.isLoggable(XdsLogLevel.INFO)) { logger.log( - XdsLogLevel.INFO, - "Received config update with {0} routes from xDS client {1}", - update.getRoutes().size(), - xdsClient); - Map rawLbConfig = generateXdsRoutingRawConfig(update.getRoutes()); - Map serviceConfig = - ImmutableMap.of("loadBalancingConfig", ImmutableList.of(rawLbConfig)); + XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig)); + } + ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig); + Attributes attrs = + Attributes.newBuilder() + .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) + .set(InternalConfigSelector.KEY, configSelector) + .build(); + ResolutionResult result = + ResolutionResult.newBuilder() + .setAttributes(attrs) + .setServiceConfig(parsedServiceConfig) + .build(); + listener.onResult(result); + } + + private final class ConfigSelector extends InternalConfigSelector { + @Override + public Result selectConfig(PickSubchannelArgs args) { + // Index ASCII headers by keys. + Map> asciiHeaders = new HashMap<>(); + Metadata headers = args.getHeaders(); + for (String headerName : headers.keys()) { + if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + continue; + } + Metadata.Key key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER); + asciiHeaders.put(headerName, headers.getAll(key)); + } + String cluster = null; + Route selectedRoute = null; + do { + for (Route route : routes) { + if (route.getRouteMatch().matches( + "/" + args.getMethodDescriptor().getFullMethodName(), asciiHeaders)) { + selectedRoute = route; + break; + } + } + if (selectedRoute == null) { + return Result.forError( + Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC")); + } + RouteAction action = selectedRoute.getRouteAction(); + if (action.getCluster() != null) { + cluster = action.getCluster(); + } else if (action.getWeightedCluster() != null) { + int totalWeight = 0; + for (ClusterWeight weightedCluster : action.getWeightedCluster()) { + totalWeight += weightedCluster.getWeight(); + } + int select = random.nextInt(totalWeight); + int accumulator = 0; + for (ClusterWeight weightedCluster : action.getWeightedCluster()) { + accumulator += weightedCluster.getWeight(); + if (select < accumulator) { + cluster = weightedCluster.getName(); + break; + } + } + } + } while (!retainCluster(cluster)); + // TODO(chengyuanzhang): avoid service config generation and parsing for each call. + Map rawServiceConfig = + generateServiceConfigWithMethodTimeoutConfig( + selectedRoute.getRouteAction().getTimeoutNano()); if (logger.isLoggable(XdsLogLevel.INFO)) { - logger.log( - XdsLogLevel.INFO, - "Generated service config:\n{0}", - new Gson().toJson(serviceConfig)); + logger.log(XdsLogLevel.INFO, + "Generated service config (method config):\n{0}", new Gson().toJson(rawServiceConfig)); + } + ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig); + Object config = parsedServiceConfig.getConfig(); + if (config == null) { + releaseCluster(cluster); + return Result.forError( + parsedServiceConfig.getError().augmentDescription( + "Failed to parse service config (method config)")); + } + final String finalCluster = cluster; + class SelectionCompleted implements Runnable { + @Override + public void run() { + releaseCluster(finalCluster); + } } - Attributes attrs = - Attributes.newBuilder() - .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) - .build(); - ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(serviceConfig); - ResolutionResult result = - ResolutionResult.newBuilder() - .setAddresses(ImmutableList.of()) - .setAttributes(attrs) - .setServiceConfig(parsedServiceConfig) + return + Result.newBuilder() + .setCallOptions(args.getCallOptions().withOption(CLUSTER_SELECTION_KEY, cluster)) + .setConfig(config) + .setCommittedCallback(new SelectionCompleted()) .build(); - listener.onResult(result); + } + + private boolean retainCluster(String cluster) { + AtomicInteger refCount = clusterRefs.get(cluster); + if (refCount == null) { + return false; + } + int count; + do { + count = refCount.get(); + if (count == 0) { + return false; + } + } while (!refCount.compareAndSet(count, count + 1)); + return true; + } + + private void releaseCluster(final String cluster) { + int count = clusterRefs.get(cluster).decrementAndGet(); + if (count == 0) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (clusterRefs.get(cluster).get() == 0) { + clusterRefs.remove(cluster); + updateResolutionResult(); + } + } + }); + } + } + } + + // https://github.com/google/error-prone/issues/1767 + @SuppressWarnings("ModifyCollectionInEnhancedForLoop") + private class ConfigWatcherImpl implements ConfigWatcher { + private Set existingClusters; + + @Override + public void onConfigChanged(ConfigUpdate update) { + Set clusters = new HashSet<>(); + for (Route route : update.getRoutes()) { + RouteAction action = route.getRouteAction(); + if (action.getCluster() != null) { + clusters.add(action.getCluster()); + } else if (action.getWeightedCluster() != null) { + for (ClusterWeight weighedCluster : action.getWeightedCluster()) { + clusters.add(weighedCluster.getName()); + } + } + } + Set addedClusters = + existingClusters == null ? clusters : Sets.difference(clusters, existingClusters); + Set deletedClusters = + existingClusters == null + ? Collections.emptySet() : Sets.difference(existingClusters, clusters); + existingClusters = clusters; + boolean shouldUpdateResult = false; + for (String cluster : addedClusters) { + if (clusterRefs.containsKey(cluster)) { + clusterRefs.get(cluster).incrementAndGet(); + } else { + clusterRefs.put(cluster, new AtomicInteger(1)); + shouldUpdateResult = true; + } + } + // Update service config to include newly added clusters. + if (shouldUpdateResult) { + updateResolutionResult(); + } + // Make newly added clusters selectable by config selector and deleted clusters no longer + // selectable. + routes = update.getRoutes(); + shouldUpdateResult = false; + for (String cluster : deletedClusters) { + int count = clusterRefs.get(cluster).decrementAndGet(); + if (count == 0) { + clusterRefs.remove(cluster); + shouldUpdateResult = true; + } + } + if (shouldUpdateResult) { + updateResolutionResult(); + } } @Override @@ -191,6 +358,7 @@ public void onResourceDoesNotExist(String resourceName) { ResolutionResult result = ResolutionResult.newBuilder() .setServiceConfig(parsedServiceConfig) + // let channel take action for no config selector .build(); listener.onResult(result); } @@ -203,152 +371,4 @@ public void onError(Status error) { listener.onError(error); } } - - @VisibleForTesting - static ImmutableMap generateXdsRoutingRawConfig(List routes) { - List rawRoutes = new ArrayList<>(); - Map rawActions = new LinkedHashMap<>(); - Map existingActions = new HashMap<>(); - for (Route route : routes) { - RouteAction routeAction = route.getRouteAction(); - String actionName; - Map actionPolicy; - if (existingActions.containsKey(routeAction)) { - actionName = existingActions.get(routeAction); - } else { - if (routeAction.getCluster() != null) { - actionName = "cds:" + routeAction.getCluster(); - actionPolicy = generateCdsRawConfig(routeAction.getCluster()); - } else { - StringBuilder sb = new StringBuilder("weighted:"); - List clusterWeights = routeAction.getWeightedCluster(); - for (ClusterWeight clusterWeight : clusterWeights) { - sb.append(clusterWeight.getName()).append('_'); - } - sb.append(routeAction.hashCode()); - actionName = sb.toString(); - if (rawActions.containsKey(actionName)) { - // Just in case of hash collision, append existingActions.size() to make actionName - // unique. However, in case of collision, when new ConfigUpdate is received, actions - // and actionNames might be associated differently from the previous update, but it - // is just suboptimal and won't cause a problem. - actionName = actionName + "_" + existingActions.size(); - } - actionPolicy = generateWeightedTargetRawConfig(clusterWeights); - } - existingActions.put(routeAction, actionName); - List childPolicies = ImmutableList.of(actionPolicy); - rawActions.put(actionName, ImmutableMap.of("childPolicy", childPolicies)); - } - ImmutableMap configRoute = convertToRawRoute(route.getRouteMatch(), actionName); - rawRoutes.add(configRoute); - } - return ImmutableMap.of( - XdsLbPolicies.XDS_ROUTING_POLICY_NAME, - ImmutableMap.of( - "route", Collections.unmodifiableList(rawRoutes), - "action", Collections.unmodifiableMap(rawActions))); - } - - @VisibleForTesting - static ImmutableMap convertToRawRoute(RouteMatch routeMatch, String actionName) { - ImmutableMap.Builder configRouteBuilder = new ImmutableMap.Builder<>(); - - PathMatcher pathMatcher = routeMatch.getPathMatch(); - String path = pathMatcher.getPath(); - String prefix = pathMatcher.getPrefix(); - Pattern regex = pathMatcher.getRegEx(); - if (path != null) { - configRouteBuilder.put("path", path); - } - if (prefix != null) { - configRouteBuilder.put("prefix", prefix); - } - if (regex != null) { - configRouteBuilder.put("regex", regex.pattern()); - } - - ImmutableList.Builder rawHeaderMatcherListBuilder = new ImmutableList.Builder<>(); - List headerMatchers = routeMatch.getHeaderMatchers(); - for (HeaderMatcher headerMatcher : headerMatchers) { - ImmutableMap.Builder rawHeaderMatcherBuilder = new ImmutableMap.Builder<>(); - rawHeaderMatcherBuilder.put("name", headerMatcher.getName()); - String exactMatch = headerMatcher.getExactMatch(); - Pattern regexMatch = headerMatcher.getRegExMatch(); - HeaderMatcher.Range rangeMatch = headerMatcher.getRangeMatch(); - Boolean presentMatch = headerMatcher.getPresentMatch(); - String prefixMatch = headerMatcher.getPrefixMatch(); - String suffixMatch = headerMatcher.getSuffixMatch(); - if (exactMatch != null) { - rawHeaderMatcherBuilder.put("exactMatch", exactMatch); - } - if (regexMatch != null) { - rawHeaderMatcherBuilder.put("regexMatch", regexMatch.pattern()); - } - if (rangeMatch != null) { - rawHeaderMatcherBuilder - .put( - "rangeMatch", - ImmutableMap.of("start", rangeMatch.getStart(), "end", rangeMatch.getEnd())); - } - if (presentMatch != null) { - rawHeaderMatcherBuilder.put("presentMatch", presentMatch); - } - if (prefixMatch != null) { - rawHeaderMatcherBuilder.put("prefixMatch", prefixMatch); - } - if (suffixMatch != null) { - rawHeaderMatcherBuilder.put("suffixMatch", suffixMatch); - } - rawHeaderMatcherBuilder.put("invertMatch", headerMatcher.isInvertedMatch()); - rawHeaderMatcherListBuilder.add(rawHeaderMatcherBuilder.build()); - } - ImmutableList rawHeaderMatchers = rawHeaderMatcherListBuilder.build(); - if (!rawHeaderMatchers.isEmpty()) { - configRouteBuilder.put("headers", rawHeaderMatchers); - } - - FractionMatcher matchFraction = routeMatch.getFractionMatch(); - if (matchFraction != null) { - configRouteBuilder - .put( - "matchFraction", - ImmutableMap.of( - "numerator", matchFraction.getNumerator(), - "denominator", matchFraction.getDenominator())); - } - - configRouteBuilder.put("action", actionName); - return configRouteBuilder.build(); - } - - @VisibleForTesting - static ImmutableMap generateWeightedTargetRawConfig( - List clusterWeights) { - Map targets = new LinkedHashMap<>(); - for (ClusterWeight clusterWeight : clusterWeights) { - Map childPolicy = generateCdsRawConfig(clusterWeight.getName()); - Map weightedConfig = ImmutableMap.of( - "weight", - (double) clusterWeight.getWeight(), - "childPolicy", - ImmutableList.of(childPolicy)); - targets.put(clusterWeight.getName(), weightedConfig); - } - return ImmutableMap.of( - XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME, - ImmutableMap.of("targets", targets)); - } - - private static ImmutableMap generateCdsRawConfig(String clusterName) { - return ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of("cluster", clusterName)); - } - - @Override - public void shutdown() { - logger.log(XdsLogLevel.INFO, "Shutdown"); - if (xdsClient != null) { - xdsClient = xdsClientPool.returnObject(xdsClient); - } - } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver2.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver2.java deleted file mode 100644 index 123ddd53f48..00000000000 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver2.java +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import io.grpc.Attributes; -import io.grpc.CallOptions; -import io.grpc.InternalConfigSelector; -import io.grpc.InternalLogId; -import io.grpc.LoadBalancer.PickSubchannelArgs; -import io.grpc.Metadata; -import io.grpc.NameResolver; -import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.internal.GrpcUtil; -import io.grpc.internal.ObjectPool; -import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.EnvoyProtoData.ClusterWeight; -import io.grpc.xds.EnvoyProtoData.Route; -import io.grpc.xds.EnvoyProtoData.RouteAction; -import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; -import io.grpc.xds.XdsClient.ConfigUpdate; -import io.grpc.xds.XdsClient.ConfigWatcher; -import io.grpc.xds.XdsClient.XdsClientPoolFactory; -import io.grpc.xds.XdsLogger.XdsLogLevel; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme. - * - *

Resolving a gRPC target involves contacting the control plane management server via xDS - * protocol to retrieve service information and produce a service config to the caller. - * - * @see XdsNameResolverProvider2 - */ -final class XdsNameResolver2 extends NameResolver { - - static final CallOptions.Key CLUSTER_SELECTION_KEY = - CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY"); - - private final XdsLogger logger; - private final String authority; - private final ServiceConfigParser serviceConfigParser; - private final SynchronizationContext syncContext; - private final Bootstrapper bootstrapper; - private final XdsClientPoolFactory xdsClientPoolFactory; - private final ThreadSafeRandom random; - private final ConcurrentMap clusterRefs = new ConcurrentHashMap<>(); - private final ConfigSelector configSelector = new ConfigSelector(); - - private volatile List routes = Collections.emptyList(); - private Listener2 listener; - private ObjectPool xdsClientPool; - private XdsClient xdsClient; - - XdsNameResolver2(String name, - ServiceConfigParser serviceConfigParser, - SynchronizationContext syncContext, - XdsClientPoolFactory xdsClientPoolFactory) { - this(name, serviceConfigParser, syncContext, Bootstrapper.getInstance(), - xdsClientPoolFactory, ThreadSafeRandomImpl.instance); - } - - XdsNameResolver2( - String name, - ServiceConfigParser serviceConfigParser, - SynchronizationContext syncContext, - Bootstrapper bootstrapper, - XdsClientPoolFactory xdsClientPoolFactory, - ThreadSafeRandom random) { - authority = GrpcUtil.checkAuthority(checkNotNull(name, "name")); - this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser"); - this.syncContext = checkNotNull(syncContext, "syncContext"); - this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper"); - this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); - this.random = checkNotNull(random, "random"); - logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name)); - logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name); - } - - @Override - public String getServiceAuthority() { - return authority; - } - - @Override - public void start(Listener2 listener) { - this.listener = checkNotNull(listener, "listener"); - BootstrapInfo bootstrapInfo; - try { - bootstrapInfo = bootstrapper.readBootstrap(); - } catch (Exception e) { - listener.onError( - Status.UNAVAILABLE.withDescription("Failed to load xDS bootstrap").withCause(e)); - return; - } - xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo); - xdsClient = xdsClientPool.getObject(); - xdsClient.watchConfigData(authority, new ConfigWatcherImpl()); - } - - @Override - public void shutdown() { - logger.log(XdsLogLevel.INFO, "Shutdown"); - if (xdsClient != null) { - xdsClient = xdsClientPool.returnObject(xdsClient); - } - } - - @VisibleForTesting - static Map generateServiceConfigWithMethodTimeoutConfig(long timeoutNano) { - String timeout = timeoutNano / 1_000_000_000.0 + "s"; - Map methodConfig = new HashMap<>(); - methodConfig.put( - "name", Collections.singletonList(Collections.emptyMap())); - methodConfig.put("timeout", timeout); - return Collections.singletonMap( - "methodConfig", Collections.singletonList(Collections.unmodifiableMap(methodConfig))); - } - - @VisibleForTesting - static Map generateServiceConfigWithLoadBalancingConfig(Collection clusters) { - Map childPolicy = new HashMap<>(); - for (String cluster : clusters) { - List>> lbPolicy = - Collections.singletonList( - Collections.singletonMap( - "cds_experimental", Collections.singletonMap("cluster", cluster))); - childPolicy.put(cluster, Collections.singletonMap("lbPolicy", lbPolicy)); - } - return Collections.singletonMap("loadBalancingConfig", - Collections.singletonList( - Collections.singletonMap( - "cluster_manager_experimental", Collections.singletonMap( - "childPolicy", Collections.unmodifiableMap(childPolicy))))); - } - - @VisibleForTesting - XdsClient getXdsClient() { - return xdsClient; - } - - private void updateResolutionResult() { - Map rawServiceConfig = - generateServiceConfigWithLoadBalancingConfig(clusterRefs.keySet()); - if (logger.isLoggable(XdsLogLevel.INFO)) { - logger.log( - XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig)); - } - ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig); - Attributes attrs = - Attributes.newBuilder() - .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) - .set(InternalConfigSelector.KEY, configSelector) - .build(); - ResolutionResult result = - ResolutionResult.newBuilder() - .setAttributes(attrs) - .setServiceConfig(parsedServiceConfig) - .build(); - listener.onResult(result); - } - - private final class ConfigSelector extends InternalConfigSelector { - @Override - public Result selectConfig(PickSubchannelArgs args) { - // Index ASCII headers by keys. - Map> asciiHeaders = new HashMap<>(); - Metadata headers = args.getHeaders(); - for (String headerName : headers.keys()) { - if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { - continue; - } - Metadata.Key key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER); - asciiHeaders.put(headerName, headers.getAll(key)); - } - String cluster = null; - Route selectedRoute = null; - do { - for (Route route : routes) { - if (route.getRouteMatch().matches( - "/" + args.getMethodDescriptor().getFullMethodName(), asciiHeaders)) { - selectedRoute = route; - break; - } - } - if (selectedRoute == null) { - return Result.forError( - Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC")); - } - RouteAction action = selectedRoute.getRouteAction(); - if (action.getCluster() != null) { - cluster = action.getCluster(); - } else if (action.getWeightedCluster() != null) { - int totalWeight = 0; - for (ClusterWeight weightedCluster : action.getWeightedCluster()) { - totalWeight += weightedCluster.getWeight(); - } - int select = random.nextInt(totalWeight); - int accumulator = 0; - for (ClusterWeight weightedCluster : action.getWeightedCluster()) { - accumulator += weightedCluster.getWeight(); - if (select < accumulator) { - cluster = weightedCluster.getName(); - break; - } - } - } - } while (!retainCluster(cluster)); - // TODO(chengyuanzhang): avoid service config generation and parsing for each call. - Map rawServiceConfig = - generateServiceConfigWithMethodTimeoutConfig( - selectedRoute.getRouteAction().getTimeoutNano()); - if (logger.isLoggable(XdsLogLevel.INFO)) { - logger.log(XdsLogLevel.INFO, - "Generated service config (method config):\n{0}", new Gson().toJson(rawServiceConfig)); - } - ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig); - Object config = parsedServiceConfig.getConfig(); - if (config == null) { - releaseCluster(cluster); - return Result.forError( - parsedServiceConfig.getError().augmentDescription( - "Failed to parse service config (method config)")); - } - final String finalCluster = cluster; - class SelectionCompleted implements Runnable { - @Override - public void run() { - releaseCluster(finalCluster); - } - } - - return - Result.newBuilder() - .setCallOptions(args.getCallOptions().withOption(CLUSTER_SELECTION_KEY, cluster)) - .setConfig(config) - .setCommittedCallback(new SelectionCompleted()) - .build(); - } - - private boolean retainCluster(String cluster) { - AtomicInteger refCount = clusterRefs.get(cluster); - if (refCount == null) { - return false; - } - int count; - do { - count = refCount.get(); - if (count == 0) { - return false; - } - } while (!refCount.compareAndSet(count, count + 1)); - return true; - } - - private void releaseCluster(final String cluster) { - int count = clusterRefs.get(cluster).decrementAndGet(); - if (count == 0) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (clusterRefs.get(cluster).get() == 0) { - clusterRefs.remove(cluster); - updateResolutionResult(); - } - } - }); - } - } - } - - // https://github.com/google/error-prone/issues/1767 - @SuppressWarnings("ModifyCollectionInEnhancedForLoop") - private class ConfigWatcherImpl implements ConfigWatcher { - private Set existingClusters; - - @Override - public void onConfigChanged(ConfigUpdate update) { - Set clusters = new HashSet<>(); - for (Route route : update.getRoutes()) { - RouteAction action = route.getRouteAction(); - if (action.getCluster() != null) { - clusters.add(action.getCluster()); - } else if (action.getWeightedCluster() != null) { - for (ClusterWeight weighedCluster : action.getWeightedCluster()) { - clusters.add(weighedCluster.getName()); - } - } - } - Set addedClusters = - existingClusters == null ? clusters : Sets.difference(clusters, existingClusters); - Set deletedClusters = - existingClusters == null - ? Collections.emptySet() : Sets.difference(existingClusters, clusters); - existingClusters = clusters; - boolean shouldUpdateResult = false; - for (String cluster : addedClusters) { - if (clusterRefs.containsKey(cluster)) { - clusterRefs.get(cluster).incrementAndGet(); - } else { - clusterRefs.put(cluster, new AtomicInteger(1)); - shouldUpdateResult = true; - } - } - // Update service config to include newly added clusters. - if (shouldUpdateResult) { - updateResolutionResult(); - } - // Make newly added clusters selectable by config selector and deleted clusters no longer - // selectable. - routes = update.getRoutes(); - shouldUpdateResult = false; - for (String cluster : deletedClusters) { - int count = clusterRefs.get(cluster).decrementAndGet(); - if (count == 0) { - clusterRefs.remove(cluster); - shouldUpdateResult = true; - } - } - if (shouldUpdateResult) { - updateResolutionResult(); - } - } - - @Override - public void onResourceDoesNotExist(String resourceName) { - logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName); - ConfigOrError parsedServiceConfig = - serviceConfigParser.parseServiceConfig(Collections.emptyMap()); - ResolutionResult result = - ResolutionResult.newBuilder() - .setServiceConfig(parsedServiceConfig) - // let channel take action for no config selector - .build(); - listener.onResult(result); - } - - @Override - public void onError(Status error) { - logger.log( - XdsLogLevel.WARNING, - "Received error from xDS client {0}: {1}", xdsClient, error.getDescription()); - listener.onError(error); - } - } -} diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java index 4a5e2e5df1f..d00929cc38f 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java @@ -16,14 +16,26 @@ package io.grpc.xds; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import io.grpc.Internal; import io.grpc.NameResolver.Args; import io.grpc.NameResolverProvider; +import io.grpc.SynchronizationContext; +import io.grpc.internal.BackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ObjectPool; +import io.grpc.xds.Bootstrapper.BootstrapInfo; +import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool; import io.grpc.xds.XdsClient.XdsChannelFactory; +import io.grpc.xds.XdsClient.XdsClientFactory; +import io.grpc.xds.XdsClient.XdsClientPoolFactory; import java.net.URI; +import java.util.concurrent.ScheduledExecutorService; /** * A provider for {@link XdsNameResolver}. @@ -43,21 +55,23 @@ public final class XdsNameResolverProvider extends NameResolverProvider { @Override public XdsNameResolver newNameResolver(URI targetUri, Args args) { if (SCHEME.equals(targetUri.getScheme())) { - String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath"); + String targetPath = checkNotNull(targetUri.getPath(), "targetPath"); Preconditions.checkArgument( targetPath.startsWith("/"), "the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri); String name = targetPath.substring(1); - return - new XdsNameResolver( + XdsClientPoolFactory xdsClientPoolFactory = + new RefCountedXdsClientPoolFactory( name, - args, - new ExponentialBackoffPolicy.Provider(), - GrpcUtil.STOPWATCH_SUPPLIER, XdsChannelFactory.getInstance(), - Bootstrapper.getInstance()); + args.getSynchronizationContext(), args.getScheduledExecutorService(), + new ExponentialBackoffPolicy.Provider(), + GrpcUtil.STOPWATCH_SUPPLIER); + return new XdsNameResolver( + name, args.getServiceConfigParser(), + args.getSynchronizationContext(), xdsClientPoolFactory); } return null; } @@ -78,4 +92,41 @@ protected int priority() { // resolver. return 4; } + + static class RefCountedXdsClientPoolFactory implements XdsClientPoolFactory { + private final String serviceName; + private final XdsChannelFactory channelFactory; + private final SynchronizationContext syncContext; + private final ScheduledExecutorService timeService; + private final BackoffPolicy.Provider backoffPolicyProvider; + private final Supplier stopwatchSupplier; + + RefCountedXdsClientPoolFactory( + String serviceName, + XdsChannelFactory channelFactory, + SynchronizationContext syncContext, + ScheduledExecutorService timeService, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier) { + this.serviceName = checkNotNull(serviceName, "serviceName"); + this.channelFactory = checkNotNull(channelFactory, "channelFactory"); + this.syncContext = checkNotNull(syncContext, "syncContext"); + this.timeService = checkNotNull(timeService, "timeService"); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); + this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); + } + + @Override + public ObjectPool newXdsClientObjectPool(final BootstrapInfo bootstrapInfo) { + XdsClientFactory xdsClientFactory = new XdsClientFactory() { + @Override + XdsClient createXdsClient() { + return new XdsClientImpl( + serviceName, bootstrapInfo.getServers(), channelFactory, bootstrapInfo.getNode(), + syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); + } + }; + return new RefCountedXdsClientObjectPool(xdsClientFactory); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider2.java b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider2.java deleted file mode 100644 index 44c97ef85d3..00000000000 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider2.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; -import io.grpc.Internal; -import io.grpc.NameResolver.Args; -import io.grpc.NameResolverProvider; -import io.grpc.SynchronizationContext; -import io.grpc.internal.BackoffPolicy; -import io.grpc.internal.ExponentialBackoffPolicy; -import io.grpc.internal.GrpcUtil; -import io.grpc.internal.ObjectPool; -import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool; -import io.grpc.xds.XdsClient.XdsChannelFactory; -import io.grpc.xds.XdsClient.XdsClientFactory; -import io.grpc.xds.XdsClient.XdsClientPoolFactory; -import java.net.URI; -import java.util.concurrent.ScheduledExecutorService; - -/** - * A provider for {@link XdsNameResolver2}. - * - *

It resolves a target URI whose scheme is {@code "xds"}. The authority of the - * target URI is never used for current release. The path of the target URI, excluding the leading - * slash {@code '/'}, will indicate the name to use in the VHDS query. - * - *

This class should not be directly referenced in code. The resolver should be accessed - * through {@link io.grpc.NameResolverRegistry} with the URI scheme "xds". - */ -@Internal -public final class XdsNameResolverProvider2 extends NameResolverProvider { - - private static final String SCHEME = "xds"; - - @Override - public XdsNameResolver2 newNameResolver(URI targetUri, Args args) { - if (SCHEME.equals(targetUri.getScheme())) { - String targetPath = checkNotNull(targetUri.getPath(), "targetPath"); - Preconditions.checkArgument( - targetPath.startsWith("/"), - "the path component (%s) of the target (%s) must start with '/'", - targetPath, - targetUri); - String name = targetPath.substring(1); - XdsClientPoolFactory xdsClientPoolFactory = - new RefCountedXdsClientPoolFactory( - name, - XdsChannelFactory.getInstance(), - args.getSynchronizationContext(), args.getScheduledExecutorService(), - new ExponentialBackoffPolicy.Provider(), - GrpcUtil.STOPWATCH_SUPPLIER); - return new XdsNameResolver2( - name, args.getServiceConfigParser(), - args.getSynchronizationContext(), xdsClientPoolFactory); - } - return null; - } - - @Override - public String getDefaultScheme() { - return SCHEME; - } - - @Override - protected boolean isAvailable() { - return true; - } - - @Override - protected int priority() { - // Set priority value to be < 5 as we still want DNS resolver to be the primary default - // resolver. - return 4; - } - - static class RefCountedXdsClientPoolFactory implements XdsClientPoolFactory { - private final String serviceName; - private final XdsChannelFactory channelFactory; - private final SynchronizationContext syncContext; - private final ScheduledExecutorService timeService; - private final BackoffPolicy.Provider backoffPolicyProvider; - private final Supplier stopwatchSupplier; - - RefCountedXdsClientPoolFactory( - String serviceName, - XdsChannelFactory channelFactory, - SynchronizationContext syncContext, - ScheduledExecutorService timeService, - BackoffPolicy.Provider backoffPolicyProvider, - Supplier stopwatchSupplier) { - this.serviceName = checkNotNull(serviceName, "serviceName"); - this.channelFactory = checkNotNull(channelFactory, "channelFactory"); - this.syncContext = checkNotNull(syncContext, "syncContext"); - this.timeService = checkNotNull(timeService, "timeService"); - this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); - this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); - } - - @Override - public ObjectPool newXdsClientObjectPool(final BootstrapInfo bootstrapInfo) { - XdsClientFactory xdsClientFactory = new XdsClientFactory() { - @Override - XdsClient createXdsClient() { - return new XdsClientImpl( - serviceName, bootstrapInfo.getServers(), channelFactory, bootstrapInfo.getNode(), - syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); - } - }; - return new RefCountedXdsClientObjectPool(xdsClientFactory); - } - } -} diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java index 3e89a8b0231..9bd7a8d08fc 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java @@ -276,7 +276,7 @@ private static PickResult pickSubchannel(SubchannelPicker picker, String name) { .build(), new Metadata(), CallOptions.DEFAULT.withOption( - XdsNameResolver2.CLUSTER_SELECTION_KEY, name)); + XdsNameResolver.CLUSTER_SELECTION_KEY, name)); return picker.pickSubchannel(args); } diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolver2Test.java b/xds/src/test/java/io/grpc/xds/XdsNameResolver2Test.java deleted file mode 100644 index 31872a385ee..00000000000 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolver2Test.java +++ /dev/null @@ -1,554 +0,0 @@ -/* - * Copyright 2020 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.truth.Truth.assertThat; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import io.grpc.CallOptions; -import io.grpc.InternalConfigSelector; -import io.grpc.InternalConfigSelector.Result; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.MethodDescriptor.MethodType; -import io.grpc.NameResolver; -import io.grpc.NameResolver.ConfigOrError; -import io.grpc.NameResolver.ResolutionResult; -import io.grpc.NameResolver.ServiceConfigParser; -import io.grpc.Status; -import io.grpc.Status.Code; -import io.grpc.SynchronizationContext; -import io.grpc.internal.JsonParser; -import io.grpc.internal.JsonUtil; -import io.grpc.internal.ObjectPool; -import io.grpc.internal.PickSubchannelArgsImpl; -import io.grpc.testing.TestMethodDescriptors; -import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.EnvoyProtoData.ClusterWeight; -import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.EnvoyProtoData.Route; -import io.grpc.xds.EnvoyProtoData.RouteAction; -import io.grpc.xds.XdsClient.XdsClientPoolFactory; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -/** Unit tests for {@link XdsNameResolver2}. */ -// TODO(chengyuanzhang): should do tests with ManagedChannelImpl. -@RunWith(JUnit4.class) -public class XdsNameResolver2Test { - private static final String AUTHORITY = "foo.googleapis.com:80"; - @Rule - public final MockitoRule mocks = MockitoJUnit.rule(); - private final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - throw new AssertionError(e); - } - }); - private final ServiceConfigParser serviceConfigParser = new ServiceConfigParser() { - @Override - public ConfigOrError parseServiceConfig(Map rawServiceConfig) { - return ConfigOrError.fromConfig(rawServiceConfig); - } - }; - private final FakeXdsClientPoolFactory xdsClientPoolFactory = new FakeXdsClientPoolFactory(); - private final String cluster1 = "cluster-foo.googleapis.com"; - private final String cluster2 = "cluster-bar.googleapis.com"; - private final CallInfo call1 = new CallInfo("HelloService", "hi"); - private final CallInfo call2 = new CallInfo("GreetService", "bye"); - - @Mock - private ThreadSafeRandom mockRandom; - @Mock - private NameResolver.Listener2 mockListener; - @Captor - private ArgumentCaptor resolutionResultCaptor; - @Captor - ArgumentCaptor errorCaptor; - private XdsNameResolver2 resolver; - - @Before - public void setUp() { - Bootstrapper bootstrapper = new Bootstrapper() { - @Override - public BootstrapInfo readBootstrap() { - return new BootstrapInfo( - ImmutableList.of( - new ServerInfo( - "trafficdirector.googleapis.com", - ImmutableList.of(), ImmutableList.of())), - Node.newBuilder().build(), - null); - } - }; - resolver = new XdsNameResolver2(AUTHORITY, serviceConfigParser, syncContext, bootstrapper, - xdsClientPoolFactory, mockRandom); - } - - @Test - public void resolve_failToBootstrap() { - Bootstrapper bootstrapper = new Bootstrapper() { - @Override - public BootstrapInfo readBootstrap() throws IOException { - throw new IOException("Fail to read bootstrap file"); - } - }; - resolver = new XdsNameResolver2(AUTHORITY, serviceConfigParser, syncContext, bootstrapper, - xdsClientPoolFactory, mockRandom); - resolver.start(mockListener); - verify(mockListener).onError(errorCaptor.capture()); - Status error = errorCaptor.getValue(); - assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Failed to load xDS bootstrap"); - assertThat(error.getCause()).hasMessageThat().isEqualTo("Fail to read bootstrap file"); - } - - @SuppressWarnings("unchecked") - @Test - public void resolve_resourceNotFound() { - resolver.start(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverResourceNotFound(); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - assertThat(result.getAddresses()).isEmpty(); - assertThat((Map) result.getServiceConfig().getConfig()).isEmpty(); - } - - @Test - public void resolve_encounterError() { - resolver.start(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverError(Status.UNAVAILABLE.withDescription("server unreachable")); - verify(mockListener).onError(errorCaptor.capture()); - Status error = errorCaptor.getValue(); - assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("server unreachable"); - } - - @Test - public void resolve_simpleCallSucceeds() { - InternalConfigSelector configSelector = resolveToClusters(); - Result selectResult = assertCallSelectResult(call1, configSelector, cluster1, 15.0); - selectResult.getCommittedCallback().run(); - verifyNoMoreInteractions(mockListener); - } - - @Test - public void resolve_simpleCallFailedToRoute() { - InternalConfigSelector configSelector = resolveToClusters(); - CallInfo call = new CallInfo("FooService", "barMethod"); - Result selectResult = configSelector.selectConfig( - new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); - Status status = selectResult.getStatus(); - assertThat(status.isOk()).isFalse(); - assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(status.getDescription()).isEqualTo("Could not find xDS route matching RPC"); - verifyNoMoreInteractions(mockListener); - } - - @SuppressWarnings("unchecked") - @Test - public void resolve_resourceUpdateAfterCallStarted() { - InternalConfigSelector configSelector = resolveToClusters(); - Result selectResult = assertCallSelectResult(call1, configSelector, cluster1, 15.0); - - reset(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverRoutes( - Arrays.asList( - new Route( - new RouteMatch(null, call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(20L), "another-cluster", null)), - new Route( - new RouteMatch(null, call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - // Updated service config still contains cluster1 while it is removed resource. New calls no - // longer routed to cluster1. - assertServiceConfigForLoadBalancingConfig( - Arrays.asList(cluster1, cluster2, "another-cluster"), - (Map) result.getServiceConfig().getConfig()); - assertThat(result.getAttributes().get(InternalConfigSelector.KEY)) - .isSameInstanceAs(configSelector); - assertCallSelectResult(call1, configSelector, "another-cluster", 20.0); - - selectResult.getCommittedCallback().run(); // completes previous call - verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); - result = resolutionResultCaptor.getValue(); - assertServiceConfigForLoadBalancingConfig( - Arrays.asList(cluster2, "another-cluster"), - (Map) result.getServiceConfig().getConfig()); - verifyNoMoreInteractions(mockListener); - } - - @SuppressWarnings("unchecked") - @Test - public void resolve_resourceUpdatedBeforeCallStarted() { - InternalConfigSelector configSelector = resolveToClusters(); - reset(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverRoutes( - Arrays.asList( - new Route( - new RouteMatch(null, call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(20L), "another-cluster", null)), - new Route( - new RouteMatch(null, call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); - // Two consecutive service config updates: one for removing clcuster1, - // one for adding "another=cluster". - verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - assertServiceConfigForLoadBalancingConfig( - Arrays.asList(cluster2, "another-cluster"), - (Map) result.getServiceConfig().getConfig()); - assertThat(result.getAttributes().get(InternalConfigSelector.KEY)) - .isSameInstanceAs(configSelector); - assertCallSelectResult(call1, configSelector, "another-cluster", 20.0); - - verifyNoMoreInteractions(mockListener); - } - - @SuppressWarnings("unchecked") - @Test - public void resolve_raceBetweenCallAndRepeatedResourceUpdate() { - InternalConfigSelector configSelector = resolveToClusters(); - assertCallSelectResult(call1, configSelector, cluster1, 15.0); - - reset(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverRoutes( - Arrays.asList( - new Route( - new RouteMatch(null, call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(20L), "another-cluster", null)), - new Route( - new RouteMatch(null, call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); - - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - assertServiceConfigForLoadBalancingConfig( - Arrays.asList(cluster1, cluster2, "another-cluster"), - (Map) result.getServiceConfig().getConfig()); - - xdsClient.deliverRoutes( - Arrays.asList( - new Route( - new RouteMatch(null, call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), "another-cluster", null)), - new Route( - new RouteMatch(null, call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); - verifyNoMoreInteractions(mockListener); // no cluster added/deleted - assertCallSelectResult(call1, configSelector, "another-cluster", 15.0); - } - - @Test - public void resolve_raceBetweenClusterReleasedAndResourceUpdateAddBackAgain() { - InternalConfigSelector configSelector = resolveToClusters(); - Result result = assertCallSelectResult(call1, configSelector, cluster1, 15.0); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverRoutes( - Collections.singletonList( - new Route( - new RouteMatch(null, call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); - xdsClient.deliverRoutes( - Arrays.asList( - new Route( - new RouteMatch(null, call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster1, null)), - new Route( - new RouteMatch(null, call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); - result.getCommittedCallback().run(); - verifyNoMoreInteractions(mockListener); - } - - @SuppressWarnings("unchecked") - @Test - public void resolve_simpleCallSucceeds_routeToWeightedCluster() { - when(mockRandom.nextInt(anyInt())).thenReturn(90, 10); - resolver.start(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverRoutes( - Arrays.asList( - new Route( - new RouteMatch(null, call1.getFullMethodNameForPath()), - new RouteAction( - TimeUnit.SECONDS.toNanos(20L), null, - Arrays.asList( - new ClusterWeight(cluster1, 20), new ClusterWeight(cluster2, 80)))))); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - assertThat(result.getAddresses()).isEmpty(); - assertServiceConfigForLoadBalancingConfig( - Arrays.asList(cluster1, cluster2), (Map) result.getServiceConfig().getConfig()); - assertThat(result.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL)).isNotNull(); - InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - Result selectResult = configSelector.selectConfig( - new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); - assertThat(selectResult.getStatus().isOk()).isTrue(); - assertThat(selectResult.getCallOptions().getOption(XdsNameResolver2.CLUSTER_SELECTION_KEY)) - .isEqualTo(cluster2); - assertServiceConfigForMethodConfig(20.0, (Map) selectResult.getConfig()); - - selectResult = configSelector.selectConfig( - new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); - assertThat(selectResult.getStatus().isOk()).isTrue(); - assertThat(selectResult.getCallOptions().getOption(XdsNameResolver2.CLUSTER_SELECTION_KEY)) - .isEqualTo(cluster1); - assertServiceConfigForMethodConfig(20.0, (Map) selectResult.getConfig()); - } - - @SuppressWarnings("unchecked") - private static Result assertCallSelectResult( - CallInfo call, InternalConfigSelector configSelector, String expectedCluster, - double expectedTimeoutSec) { - Result result = configSelector.selectConfig( - new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); - assertThat(result.getStatus().isOk()).isTrue(); - assertThat(result.getCallOptions().getOption(XdsNameResolver2.CLUSTER_SELECTION_KEY)) - .isEqualTo(expectedCluster); - assertServiceConfigForMethodConfig(expectedTimeoutSec, (Map) result.getConfig()); - return result; - } - - @SuppressWarnings("unchecked") - private InternalConfigSelector resolveToClusters() { - resolver.start(mockListener); - FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverRoutes( - Arrays.asList( - new Route( - new RouteMatch(null, call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster1, null)), - new Route( - new RouteMatch(null, call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - assertThat(result.getAddresses()).isEmpty(); - assertServiceConfigForLoadBalancingConfig( - Arrays.asList(cluster1, cluster2), (Map) result.getServiceConfig().getConfig()); - assertThat(result.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL)).isNotNull(); - return result.getAttributes().get(InternalConfigSelector.KEY); - } - - /** - * Verifies the raw service config contains a single method config for method with the - * specified timeout. - */ - private static void assertServiceConfigForMethodConfig( - double timeoutSec, Map actualServiceConfig) { - List> rawMethodConfigs = - JsonUtil.getListOfObjects(actualServiceConfig, "methodConfig"); - Map methodConfig = Iterables.getOnlyElement(rawMethodConfigs); - List> methods = JsonUtil.getListOfObjects(methodConfig, "name"); - assertThat(Iterables.getOnlyElement(methods)).isEmpty(); - assertThat(JsonUtil.getString(methodConfig, "timeout")).isEqualTo(timeoutSec + "s"); - } - - /** - * Verifies the raw service config contains an xDS load balancing config for the given clusters. - */ - private static void assertServiceConfigForLoadBalancingConfig( - List clusters, Map actualServiceConfig) { - List> rawLbConfigs = - JsonUtil.getListOfObjects(actualServiceConfig, "loadBalancingConfig"); - Map lbConfig = Iterables.getOnlyElement(rawLbConfigs); - assertThat(lbConfig.keySet()).containsExactly("cluster_manager_experimental"); - Map clusterManagerLbConfig = - JsonUtil.getObject(lbConfig, "cluster_manager_experimental"); - Map clusterManagerChildLbPolicies = - JsonUtil.getObject(clusterManagerLbConfig, "childPolicy"); - assertThat(clusterManagerChildLbPolicies.keySet()).containsExactlyElementsIn(clusters); - for (String cluster : clusters) { - Map childLbConfig = JsonUtil.getObject(clusterManagerChildLbPolicies, cluster); - assertThat(childLbConfig.keySet()).containsExactly("lbPolicy"); - List> childLbConfigValues = - JsonUtil.getListOfObjects(childLbConfig, "lbPolicy"); - Map cdsLbPolicy = Iterables.getOnlyElement(childLbConfigValues); - assertThat(cdsLbPolicy.keySet()).containsExactly("cds_experimental"); - assertThat(JsonUtil.getObject(cdsLbPolicy, "cds_experimental")) - .containsExactly("cluster", cluster); - } - } - - @SuppressWarnings("unchecked") - @Test - public void generateServiceConfig_forLoadBalancingConfig() throws IOException { - List clusters = Arrays.asList("cluster-foo", "cluster-bar", "cluster-baz"); - String expectedServiceConfigJson = "{\n" - + " \"loadBalancingConfig\": [{\n" - + " \"cluster_manager_experimental\": {\n" - + " \"childPolicy\": {\n" - + " \"cluster-foo\": {\n" - + " \"lbPolicy\": [{\n" - + " \"cds_experimental\": {\n" - + " \"cluster\": \"cluster-foo\"\n" - + " }\n" - + " }]\n" - + " },\n" - + " \"cluster-bar\": {\n" - + " \"lbPolicy\": [{\n" - + " \"cds_experimental\": {\n" - + " \"cluster\": \"cluster-bar\"\n" - + " }\n" - + " }]\n" - + " },\n" - + " \"cluster-baz\": {\n" - + " \"lbPolicy\": [{\n" - + " \"cds_experimental\": {\n" - + " \"cluster\": \"cluster-baz\"\n" - + " }\n" - + " }]\n" - + " }\n" - + " }\n" - + " }\n" - + " }]\n" - + "}"; - Map expectedServiceConfig = - (Map) JsonParser.parse(expectedServiceConfigJson); - assertThat(XdsNameResolver2.generateServiceConfigWithLoadBalancingConfig(clusters)) - .isEqualTo(expectedServiceConfig); - } - - @SuppressWarnings("unchecked") - @Test - public void generateServiceConfig_forMethodTimeoutConfig() throws IOException { - long timeoutNano = TimeUnit.SECONDS.toNanos(1L) + 1L; // 1.0000000001s - String expectedServiceConfigJson = "{\n" - + " \"methodConfig\": [{\n" - + " \"name\": [ {} ],\n" - + " \"timeout\": \"1.000000001s\"\n" - + " }]\n" - + "}"; - Map expectedServiceConfig = - (Map) JsonParser.parse(expectedServiceConfigJson); - assertThat(XdsNameResolver2.generateServiceConfigWithMethodTimeoutConfig(timeoutNano)) - .isEqualTo(expectedServiceConfig); - } - - private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory { - @Override - public ObjectPool newXdsClientObjectPool(BootstrapInfo bootstrapInfo) { - return new ObjectPool() { - @Override - public XdsClient getObject() { - return new FakeXdsClient(); - } - - @Override - public XdsClient returnObject(Object object) { - return null; - } - }; - } - } - - private class FakeXdsClient extends XdsClient { - private String resource; - private ConfigWatcher watcher; - - @Override - void watchConfigData(String targetAuthority, ConfigWatcher watcher) { - resource = targetAuthority; - this.watcher = watcher; - } - - @Override - void shutdown() { - // no-op - } - - void deliverRoutes(final List routes) { - syncContext.execute(new Runnable() { - @Override - public void run() { - watcher.onConfigChanged(ConfigUpdate.newBuilder().addRoutes(routes).build()); - } - }); - } - - void deliverError(final Status error) { - syncContext.execute(new Runnable() { - @Override - public void run() { - watcher.onError(error); - } - }); - } - - void deliverResourceNotFound() { - Preconditions.checkState(resource != null, "no resource subscribed"); - syncContext.execute(new Runnable() { - @Override - public void run() { - watcher.onResourceDoesNotExist(resource); - } - }); - } - } - - private static final class CallInfo { - private final String service; - private final String method; - private final MethodDescriptor methodDescriptor; - - private CallInfo(String service, String method) { - this.service = service; - this.method = method; - methodDescriptor = - MethodDescriptor.newBuilder() - .setType(MethodType.UNARY).setFullMethodName(service + "/" + method) - .setRequestMarshaller(TestMethodDescriptors.voidMarshaller()) - .setResponseMarshaller(TestMethodDescriptors.voidMarshaller()) - .build(); - } - - private String getFullMethodNameForPath() { - return "/" + service + "/" + method; - } - } -} diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverIntegrationTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverIntegrationTest.java deleted file mode 100644 index e843fd14a40..00000000000 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverIntegrationTest.java +++ /dev/null @@ -1,570 +0,0 @@ -/* - * Copyright 2020 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.truth.Truth.assertThat; -import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponseV2; -import static io.grpc.xds.XdsClientTestHelper.buildListenerV2; -import static io.grpc.xds.XdsClientTestHelper.buildRouteConfigurationV2; -import static io.grpc.xds.XdsClientTestHelper.buildVirtualHostV2; -import static io.grpc.xds.XdsNameResolverTest.assertCdsPolicy; -import static io.grpc.xds.XdsNameResolverTest.assertWeightedTargetPolicy; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.protobuf.Any; -import com.google.protobuf.UInt32Value; -import io.envoyproxy.envoy.api.v2.DiscoveryRequest; -import io.envoyproxy.envoy.api.v2.DiscoveryResponse; -import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource; -import io.envoyproxy.envoy.api.v2.core.ConfigSource; -import io.envoyproxy.envoy.api.v2.route.Route; -import io.envoyproxy.envoy.api.v2.route.RouteAction; -import io.envoyproxy.envoy.api.v2.route.RouteMatch; -import io.envoyproxy.envoy.api.v2.route.VirtualHost; -import io.envoyproxy.envoy.api.v2.route.WeightedCluster; -import io.envoyproxy.envoy.api.v2.route.WeightedCluster.ClusterWeight; -import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager; -import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds; -import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; -import io.grpc.ChannelLogger; -import io.grpc.ManagedChannel; -import io.grpc.NameResolver; -import io.grpc.NameResolver.ConfigOrError; -import io.grpc.NameResolver.ResolutionResult; -import io.grpc.NameResolver.ServiceConfigParser; -import io.grpc.Status; -import io.grpc.Status.Code; -import io.grpc.SynchronizationContext; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.internal.BackoffPolicy; -import io.grpc.internal.FakeClock; -import io.grpc.internal.GrpcUtil; -import io.grpc.internal.ObjectPool; -import io.grpc.stub.StreamObserver; -import io.grpc.testing.GrpcCleanupRule; -import io.grpc.xds.Bootstrapper.ServerInfo; -import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.XdsClient.XdsChannel; -import io.grpc.xds.XdsClient.XdsChannelFactory; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -/** Tests for {@link XdsNameResolver} with xDS service. */ -@RunWith(JUnit4.class) -// TODO(creamsoup) use parsed service config -public class XdsNameResolverIntegrationTest { - private static final String AUTHORITY = "foo.googleapis.com:80"; - private static final Node FAKE_BOOTSTRAP_NODE = - Node.newBuilder().setId("XdsNameResolverTest").build(); - - @Rule - public final MockitoRule mocks = MockitoJUnit.rule(); - @Rule - public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - - private final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - throw new AssertionError(e); - } - }); - - private final FakeClock fakeClock = new FakeClock(); - private final Queue> responseObservers = new ArrayDeque<>(); - private final ServiceConfigParser serviceConfigParser = new ServiceConfigParser() { - @Override - public ConfigOrError parseServiceConfig(Map rawServiceConfig) { - return ConfigOrError.fromConfig(rawServiceConfig); - } - }; - - private final NameResolver.Args args = - NameResolver.Args.newBuilder() - .setDefaultPort(8080) - .setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR) - .setSynchronizationContext(syncContext) - .setServiceConfigParser(serviceConfigParser) - .setScheduledExecutorService(fakeClock.getScheduledExecutorService()) - .setChannelLogger(mock(ChannelLogger.class)) - .build(); - - ArgumentCaptor resolutionResultCaptor = ArgumentCaptor.forClass(null); - - @Mock - private BackoffPolicy.Provider backoffPolicyProvider; - @Mock - private NameResolver.Listener2 mockListener; - - private XdsChannelFactory channelFactory; - private XdsNameResolver xdsNameResolver; - - @Before - public void setUp() throws IOException { - final String serverName = InProcessServerBuilder.generateName(); - AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() { - @Override - public StreamObserver streamAggregatedResources( - final StreamObserver responseObserver) { - responseObservers.offer(responseObserver); - @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); - return requestObserver; - } - }; - - cleanupRule.register( - InProcessServerBuilder - .forName(serverName) - .addService(serviceImpl) - .directExecutor() - .build() - .start()); - final ManagedChannel channel = - cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); - - channelFactory = new XdsChannelFactory() { - @Override - XdsChannel createChannel(List servers) { - assertThat(Iterables.getOnlyElement(servers).getServerUri()).isEqualTo(serverName); - return new XdsChannel(channel, false); - } - }; - Bootstrapper bootstrapper = new Bootstrapper() { - @Override - public BootstrapInfo readBootstrap() { - List serverList = - ImmutableList.of(new ServerInfo(serverName, ImmutableList.of(), null)); - return new BootstrapInfo(serverList, FAKE_BOOTSTRAP_NODE, null); - } - }; - xdsNameResolver = - new XdsNameResolver( - AUTHORITY, - args, - backoffPolicyProvider, - fakeClock.getStopwatchSupplier(), - channelFactory, - bootstrapper); - assertThat(responseObservers).isEmpty(); - } - - @After - public void tearDown() { - xdsNameResolver.shutdown(); - } - - @Test - public void resolve_bootstrapProvidesNoTrafficDirectorInfo() { - Bootstrapper bootstrapper = new Bootstrapper() { - @Override - public BootstrapInfo readBootstrap() { - return new BootstrapInfo(ImmutableList.of(), FAKE_BOOTSTRAP_NODE, null); - } - }; - - XdsNameResolver resolver = - new XdsNameResolver( - AUTHORITY, - args, - backoffPolicyProvider, - fakeClock.getStopwatchSupplier(), - channelFactory, - bootstrapper); - resolver.start(mockListener); - ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null); - verify(mockListener).onError(statusCaptor.capture()); - assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(statusCaptor.getValue().getDescription()) - .isEqualTo("No management server provided by bootstrap"); - } - - @Test - public void resolve_failToBootstrap() { - Bootstrapper bootstrapper = new Bootstrapper() { - @Override - public BootstrapInfo readBootstrap() throws IOException { - throw new IOException("Fail to read bootstrap file"); - } - }; - - XdsNameResolver resolver = - new XdsNameResolver( - AUTHORITY, - args, - backoffPolicyProvider, - fakeClock.getStopwatchSupplier(), - channelFactory, - bootstrapper); - resolver.start(mockListener); - ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(null); - verify(mockListener).onError(errorCaptor.capture()); - Status error = errorCaptor.getValue(); - assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Failed to bootstrap"); - assertThat(error.getCause()).hasMessageThat().isEqualTo("Fail to read bootstrap file"); - } - - @Test - public void resolve_passXdsClientPoolInResult() { - xdsNameResolver.start(mockListener); - assertThat(responseObservers).hasSize(1); - StreamObserver responseObserver = responseObservers.poll(); - - // Simulate receiving an LDS response that contains cluster resolution directly in-line. - String clusterName = "cluster-foo.googleapis.com"; - responseObserver.onNext( - buildLdsResponseForCluster("0", AUTHORITY, clusterName, "0000")); - - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - ObjectPool xdsClientPool = result.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL); - assertThat(xdsClientPool).isNotNull(); - } - - @SuppressWarnings("unchecked") - @Test - public void resolve_ResourceNotFound() { - xdsNameResolver.start(mockListener); - assertThat(responseObservers).hasSize(1); - StreamObserver responseObserver = responseObservers.poll(); - - // Simulate receiving an LDS response that does not contain requested resource. - String clusterName = "cluster-bar.googleapis.com"; - responseObserver.onNext( - buildLdsResponseForCluster("0", "bar.googleapis.com", clusterName, "0000")); - - fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - assertThat(result.getAddresses()).isEmpty(); - assertThat((Map) result.getServiceConfig().getConfig()).isEmpty(); - } - - @Test - @SuppressWarnings("unchecked") - public void resolve_resourceUpdated() { - xdsNameResolver.start(mockListener); - assertThat(responseObservers).hasSize(1); - StreamObserver responseObserver = responseObservers.poll(); - - // Simulate receiving an LDS response that contains cluster resolution directly in-line. - responseObserver.onNext( - buildLdsResponseForCluster("0", AUTHORITY, "cluster-foo.googleapis.com", "0000")); - - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - assertThat(result.getAddresses()).isEmpty(); - Map serviceConfig = (Map) result.getServiceConfig().getConfig(); - - List> rawLbConfigs = - (List>) serviceConfig.get("loadBalancingConfig"); - Map lbConfig = Iterables.getOnlyElement(rawLbConfigs); - assertThat(lbConfig.keySet()).containsExactly("xds_routing_experimental"); - Map rawConfigValues = (Map) lbConfig.get("xds_routing_experimental"); - Map> actions = - (Map>) rawConfigValues.get("action"); - List> routes = (List>) rawConfigValues.get("route"); - Map route = Iterables.getOnlyElement(routes); - assertThat(route.keySet()).containsExactly("prefix", "action"); - assertThat((String) route.get("prefix")).isEqualTo(""); - assertCdsPolicy(actions.get(route.get("action")), "cluster-foo.googleapis.com"); - - // Simulate receiving another LDS response that tells client to do RDS. - String routeConfigName = "route-foo.googleapis.com"; - responseObserver.onNext( - buildLdsResponseForRdsResource("1", AUTHORITY, routeConfigName, "0001")); - - // Client sent an RDS request for resource "route-foo.googleapis.com" (Omitted in this test). - - // Simulate receiving an RDS response that contains the resource "route-foo.googleapis.com" - // with cluster resolution for "foo.googleapis.com". - responseObserver.onNext( - buildRdsResponseForCluster("0", routeConfigName, AUTHORITY, - "cluster-blade.googleapis.com", "0000")); - - verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); - result = resolutionResultCaptor.getValue(); - assertThat(result.getAddresses()).isEmpty(); - serviceConfig = (Map) result.getServiceConfig().getConfig(); - rawLbConfigs = (List>) serviceConfig.get("loadBalancingConfig"); - lbConfig = Iterables.getOnlyElement(rawLbConfigs); - assertThat(lbConfig.keySet()).containsExactly("xds_routing_experimental"); - rawConfigValues = (Map) lbConfig.get("xds_routing_experimental"); - actions = (Map>) rawConfigValues.get("action"); - routes = (List>) rawConfigValues.get("route"); - route = Iterables.getOnlyElement(routes); - assertThat(route.keySet()).containsExactly("prefix", "action"); - assertThat((String) route.get("prefix")).isEqualTo(""); - assertCdsPolicy(actions.get(route.get("action")), "cluster-blade.googleapis.com"); - } - - @Test - @SuppressWarnings("unchecked") - public void resolve_xdsRoutingLoadBalancing() { - xdsNameResolver.start(mockListener); - assertThat(responseObservers).hasSize(1); - StreamObserver responseObserver = responseObservers.poll(); - - // Simulate receiving an LDS response that contains routes resolution directly in-line. - List protoRoutes = - ImmutableList.of( - // path match, routed to cluster - Route.newBuilder() - .setMatch(RouteMatch.newBuilder().setPath("/fooSvc/hello")) - .setRoute(buildClusterRoute("cluster-hello.googleapis.com")) - .build(), - // prefix match, routed to cluster - Route.newBuilder() - .setMatch(RouteMatch.newBuilder().setPrefix("/fooSvc/")) - .setRoute(buildClusterRoute("cluster-foo.googleapis.com")) - .build(), - // path match, routed to weighted clusters - Route.newBuilder() - .setMatch(RouteMatch.newBuilder().setPath("/barSvc/hello")) - .setRoute(buildWeightedClusterRoute(ImmutableMap.of( - "cluster-hello.googleapis.com", 40, "cluster-hello2.googleapis.com", 60))) - .build(), - // prefix match, routed to weighted clusters - Route.newBuilder() - .setMatch(RouteMatch.newBuilder().setPrefix("/barSvc/")) - .setRoute( - buildWeightedClusterRoute( - ImmutableMap.of( - "cluster-bar.googleapis.com", 30, "cluster-bar2.googleapis.com", 70))) - .build(), - // default with prefix = "/", routed to cluster - Route.newBuilder() - .setMatch(RouteMatch.newBuilder().setPrefix("/")) - .setRoute(buildClusterRoute("cluster-hello.googleapis.com")) - .build()); - HttpConnectionManager httpConnectionManager = - HttpConnectionManager.newBuilder() - .setRouteConfig( - buildRouteConfigurationV2( - "route-foo.googleapis.com", // doesn't matter - ImmutableList.of(buildVirtualHostForRoutes(AUTHORITY, protoRoutes)))) - .build(); - List listeners = - ImmutableList.of(Any.pack(buildListenerV2(AUTHORITY, Any.pack(httpConnectionManager)))); - responseObserver.onNext( - buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000")); - - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - assertThat(result.getAddresses()).isEmpty(); - Map serviceConfig = (Map) result.getServiceConfig().getConfig(); - - List> rawLbConfigs = - (List>) serviceConfig.get("loadBalancingConfig"); - Map lbConfig = Iterables.getOnlyElement(rawLbConfigs); - assertThat(lbConfig.keySet()).containsExactly("xds_routing_experimental"); - Map rawConfigValues = (Map) lbConfig.get("xds_routing_experimental"); - assertThat(rawConfigValues.keySet()).containsExactly("action", "route"); - Map> actions = - (Map>) rawConfigValues.get("action"); - List> routes = (List>) rawConfigValues.get("route"); - assertThat(actions).hasSize(4); - assertThat(routes).hasSize(5); - - Map route0 = routes.get(0); - assertThat(route0.keySet()).containsExactly("path", "action"); - assertThat((String) route0.get("path")).isEqualTo("/fooSvc/hello"); - assertCdsPolicy(actions.get(route0.get("action")), "cluster-hello.googleapis.com"); - - Map route1 = routes.get(1); - assertThat(route1.keySet()).containsExactly("prefix", "action"); - assertThat((String) route1.get("prefix")).isEqualTo("/fooSvc/"); - assertCdsPolicy(actions.get(route1.get("action")), "cluster-foo.googleapis.com"); - - Map route2 = routes.get(2); - assertThat(route2.keySet()).containsExactly("path", "action"); - assertThat((String) route2.get("path")).isEqualTo("/barSvc/hello"); - assertWeightedTargetPolicy( - actions.get(route2.get("action")), - ImmutableMap.of( - "cluster-hello.googleapis.com", 40, "cluster-hello2.googleapis.com", 60)); - - Map route3 = routes.get(3); - assertThat(route3.keySet()).containsExactly("prefix", "action"); - assertThat((String) route3.get("prefix")).isEqualTo("/barSvc/"); - assertWeightedTargetPolicy( - actions.get(route3.get("action")), - ImmutableMap.of( - "cluster-bar.googleapis.com", 30, "cluster-bar2.googleapis.com", 70)); - - Map route4 = routes.get(4); - assertThat(route4.keySet()).containsExactly("prefix", "action"); - assertThat((String) route4.get("prefix")).isEqualTo("/"); - assertCdsPolicy(actions.get(route4.get("action")), "cluster-hello.googleapis.com"); - } - - @Test - @SuppressWarnings("unchecked") - public void resolve_resourceNewlyAdded() { - xdsNameResolver.start(mockListener); - assertThat(responseObservers).hasSize(1); - StreamObserver responseObserver = responseObservers.poll(); - - // Simulate receiving an LDS response that does not contain requested resource. - responseObserver.onNext( - buildLdsResponseForCluster("0", "bar.googleapis.com", - "cluster-bar.googleapis.com", "0000")); - - fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - verify(mockListener).onResult(resolutionResultCaptor.capture()); - ResolutionResult result = resolutionResultCaptor.getValue(); - assertThat(result.getAddresses()).isEmpty(); - assertThat((Map) result.getServiceConfig().getConfig()).isEmpty(); - - // Simulate receiving another LDS response that contains cluster resolution directly in-line. - responseObserver.onNext( - buildLdsResponseForCluster("1", AUTHORITY, "cluster-foo.googleapis.com", - "0001")); - - verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); - result = resolutionResultCaptor.getValue(); - assertThat(result.getAddresses()).isEmpty(); - Map serviceConfig = (Map) result.getServiceConfig().getConfig(); - List> rawLbConfigs = - (List>) serviceConfig.get("loadBalancingConfig"); - Map lbConfig = Iterables.getOnlyElement(rawLbConfigs); - assertThat(lbConfig.keySet()).containsExactly("xds_routing_experimental"); - Map rawConfigValues = (Map) lbConfig.get("xds_routing_experimental"); - Map> actions = - (Map>) rawConfigValues.get("action"); - List> routes = (List>) rawConfigValues.get("route"); - Map route = Iterables.getOnlyElement(routes); - assertThat(route.keySet()).containsExactly("prefix", "action"); - assertThat((String) route.get("prefix")).isEqualTo(""); - assertCdsPolicy(actions.get(route.get("action")), "cluster-foo.googleapis.com"); - } - - /** - * Builds an LDS DiscoveryResponse containing the mapping of given host to - * the given cluster name directly in-line. Clients receiving this response is - * able to resolve cluster name for the given host immediately. - */ - private static DiscoveryResponse buildLdsResponseForCluster( - String versionInfo, String host, String clusterName, String nonce) { - List listeners = ImmutableList.of( - Any.pack(buildListenerV2(host, // target Listener resource - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig( - buildRouteConfigurationV2("route-foo.googleapis.com", // doesn't matter - ImmutableList.of( - buildVirtualHostV2( - ImmutableList.of(host), // exact match - clusterName)))) - .build())))); - return buildDiscoveryResponseV2( - versionInfo, listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, nonce); - } - - /** - * Builds an LDS DiscoveryResponse containing the mapping of given host to - * the given RDS resource name. Clients receiving this response is able to - * send an RDS request for resolving the cluster name for the given host. - */ - private static DiscoveryResponse buildLdsResponseForRdsResource( - String versionInfo, String host, String routeConfigName, String nonce) { - Rds rdsConfig = - Rds.newBuilder() - // Must set to use ADS. - .setConfigSource( - ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) - .setRouteConfigName(routeConfigName) - .build(); - - List listeners = ImmutableList.of( - Any.pack( - buildListenerV2( - host, Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))); - return buildDiscoveryResponseV2( - versionInfo, listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, nonce); - } - - /** - * Builds an RDS DiscoveryResponse containing route configuration with the given name and a - * virtual host that matches the given host to the given cluster name. - */ - private static DiscoveryResponse buildRdsResponseForCluster( - String versionInfo, - String routeConfigName, - String host, - String clusterName, - String nonce) { - List routeConfigs = ImmutableList.of( - Any.pack( - buildRouteConfigurationV2( - routeConfigName, - ImmutableList.of( - buildVirtualHostV2(ImmutableList.of(host), clusterName))))); - return buildDiscoveryResponseV2( - versionInfo, routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS_V2, nonce); - } - - private static RouteAction buildClusterRoute(String clusterName) { - return RouteAction.newBuilder().setCluster(clusterName).build(); - } - - /** - * Builds a RouteAction for a weighted cluster route. The given map is keyed by cluster name and - * valued by the weight of the cluster. - */ - private static RouteAction buildWeightedClusterRoute(Map clusterWeights) { - WeightedCluster.Builder builder = WeightedCluster.newBuilder(); - for (Map.Entry entry : clusterWeights.entrySet()) { - builder.addClusters( - ClusterWeight.newBuilder() - .setName(entry.getKey()) - .setWeight(UInt32Value.of(entry.getValue()))); - } - return RouteAction.newBuilder() - .setWeightedClusters(builder) - .build(); - } - - private static VirtualHost buildVirtualHostForRoutes(String domain, List routes) { - return VirtualHost.newBuilder() - .setName("virtualhost00.googleapis.com") // don't care - .addAllDomains(ImmutableList.of(domain)) - .addAllRoutes(routes) - .build(); - } -} diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 16ce37a6e51..b1142380a6f 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -17,264 +17,538 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; -import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME; -import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; -import static io.grpc.xds.XdsLbPolicies.XDS_ROUTING_POLICY_NAME; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableMap; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.re2j.Pattern; +import io.grpc.CallOptions; +import io.grpc.InternalConfigSelector; +import io.grpc.InternalConfigSelector.Result; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.NameResolver; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.NameResolver.ResolutionResult; +import io.grpc.NameResolver.ServiceConfigParser; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; import io.grpc.internal.JsonParser; +import io.grpc.internal.JsonUtil; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.PickSubchannelArgsImpl; +import io.grpc.testing.TestMethodDescriptors; +import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.EnvoyProtoData.ClusterWeight; +import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.EnvoyProtoData.Route; import io.grpc.xds.EnvoyProtoData.RouteAction; -import io.grpc.xds.RouteMatch.FractionMatcher; -import io.grpc.xds.RouteMatch.HeaderMatcher; -import io.grpc.xds.RouteMatch.PathMatcher; +import io.grpc.xds.XdsClient.XdsClientPoolFactory; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; /** Unit tests for {@link XdsNameResolver}. */ +// TODO(chengyuanzhang): should do tests with ManagedChannelImpl. @RunWith(JUnit4.class) public class XdsNameResolverTest { + private static final String AUTHORITY = "foo.googleapis.com:80"; + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final ServiceConfigParser serviceConfigParser = new ServiceConfigParser() { + @Override + public ConfigOrError parseServiceConfig(Map rawServiceConfig) { + return ConfigOrError.fromConfig(rawServiceConfig); + } + }; + private final FakeXdsClientPoolFactory xdsClientPoolFactory = new FakeXdsClientPoolFactory(); + private final String cluster1 = "cluster-foo.googleapis.com"; + private final String cluster2 = "cluster-bar.googleapis.com"; + private final CallInfo call1 = new CallInfo("HelloService", "hi"); + private final CallInfo call2 = new CallInfo("GreetService", "bye"); + + @Mock + private ThreadSafeRandom mockRandom; + @Mock + private NameResolver.Listener2 mockListener; + @Captor + private ArgumentCaptor resolutionResultCaptor; + @Captor + ArgumentCaptor errorCaptor; + private XdsNameResolver resolver; + + @Before + public void setUp() { + Bootstrapper bootstrapper = new Bootstrapper() { + @Override + public BootstrapInfo readBootstrap() { + return new BootstrapInfo( + ImmutableList.of( + new ServerInfo( + "trafficdirector.googleapis.com", + ImmutableList.of(), ImmutableList.of())), + Node.newBuilder().build(), + null); + } + }; + resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper, + xdsClientPoolFactory, mockRandom); + } @Test - public void generateWeightedTargetRawConfig() throws IOException { - List clusterWeights = + public void resolve_failToBootstrap() { + Bootstrapper bootstrapper = new Bootstrapper() { + @Override + public BootstrapInfo readBootstrap() throws IOException { + throw new IOException("Fail to read bootstrap file"); + } + }; + resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper, + xdsClientPoolFactory, mockRandom); + resolver.start(mockListener); + verify(mockListener).onError(errorCaptor.capture()); + Status error = errorCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(error.getDescription()).isEqualTo("Failed to load xDS bootstrap"); + assertThat(error.getCause()).hasMessageThat().isEqualTo("Fail to read bootstrap file"); + } + + @SuppressWarnings("unchecked") + @Test + public void resolve_resourceNotFound() { + resolver.start(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverResourceNotFound(); + verify(mockListener).onResult(resolutionResultCaptor.capture()); + ResolutionResult result = resolutionResultCaptor.getValue(); + assertThat(result.getAddresses()).isEmpty(); + assertThat((Map) result.getServiceConfig().getConfig()).isEmpty(); + } + + @Test + public void resolve_encounterError() { + resolver.start(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverError(Status.UNAVAILABLE.withDescription("server unreachable")); + verify(mockListener).onError(errorCaptor.capture()); + Status error = errorCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(error.getDescription()).isEqualTo("server unreachable"); + } + + @Test + public void resolve_simpleCallSucceeds() { + InternalConfigSelector configSelector = resolveToClusters(); + Result selectResult = assertCallSelectResult(call1, configSelector, cluster1, 15.0); + selectResult.getCommittedCallback().run(); + verifyNoMoreInteractions(mockListener); + } + + @Test + public void resolve_simpleCallFailedToRoute() { + InternalConfigSelector configSelector = resolveToClusters(); + CallInfo call = new CallInfo("FooService", "barMethod"); + Result selectResult = configSelector.selectConfig( + new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + Status status = selectResult.getStatus(); + assertThat(status.isOk()).isFalse(); + assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(status.getDescription()).isEqualTo("Could not find xDS route matching RPC"); + verifyNoMoreInteractions(mockListener); + } + + @SuppressWarnings("unchecked") + @Test + public void resolve_resourceUpdateAfterCallStarted() { + InternalConfigSelector configSelector = resolveToClusters(); + Result selectResult = assertCallSelectResult(call1, configSelector, cluster1, 15.0); + + reset(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverRoutes( Arrays.asList( - new ClusterWeight("cluster-foo", 30), new ClusterWeight("cluster-bar", 50)); - Map config = XdsNameResolver.generateWeightedTargetRawConfig(clusterWeights); - String expectedJson = "{\n" - + " \"weighted_target_experimental\": {\n" - + " \"targets\": {\n" - + " \"cluster-foo\": {\n" - + " \"weight\": 30,\n" - + " \"childPolicy\": [{\n" - + " \"cds_experimental\": {\n" - + " \"cluster\": \"cluster-foo\"\n" - + " }\n" - + " }]\n" - + " },\n" - + " \"cluster-bar\": {\n" - + " \"weight\": 50,\n" - + " \"childPolicy\": [{\n" - + " \"cds_experimental\": {\n" - + " \"cluster\": \"cluster-bar\"\n" - + " }\n" - + " }]\n" - + " }\n" - + " }\n" - + " }\n" - + "}"; - assertThat(config).isEqualTo(JsonParser.parse(expectedJson)); + new Route( + new RouteMatch(null, call1.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(20L), "another-cluster", null)), + new Route( + new RouteMatch(null, call2.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + verify(mockListener).onResult(resolutionResultCaptor.capture()); + ResolutionResult result = resolutionResultCaptor.getValue(); + // Updated service config still contains cluster1 while it is removed resource. New calls no + // longer routed to cluster1. + assertServiceConfigForLoadBalancingConfig( + Arrays.asList(cluster1, cluster2, "another-cluster"), + (Map) result.getServiceConfig().getConfig()); + assertThat(result.getAttributes().get(InternalConfigSelector.KEY)) + .isSameInstanceAs(configSelector); + assertCallSelectResult(call1, configSelector, "another-cluster", 20.0); + + selectResult.getCommittedCallback().run(); // completes previous call + verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); + result = resolutionResultCaptor.getValue(); + assertServiceConfigForLoadBalancingConfig( + Arrays.asList(cluster2, "another-cluster"), + (Map) result.getServiceConfig().getConfig()); + verifyNoMoreInteractions(mockListener); } @SuppressWarnings("unchecked") @Test - public void generateXdsRoutingRawConfig() { - Route r1 = - new Route( - new RouteMatch( - new PathMatcher(null, "", null), Collections.emptyList(), - new FractionMatcher(10, 20)), - new RouteAction(15L, "cluster-foo", null)); - Route r2 = - new Route( - new RouteMatch( - new PathMatcher("/service/method", null, null), - Arrays.asList( - new HeaderMatcher(":scheme", "https", null, null, null, null, null, false)), - null), - new RouteAction( - 15L, - null, - Arrays.asList( - new ClusterWeight("cluster-foo", 20), - new ClusterWeight("cluster-bar", 20)))); - - Map config = - XdsNameResolver.generateXdsRoutingRawConfig(Arrays.asList(r1, r2)); - assertThat(config.keySet()).containsExactly("xds_routing_experimental"); - Map content = (Map) config.get(XDS_ROUTING_POLICY_NAME); - assertThat(content.keySet()).containsExactly("action", "route"); - Map> actions = (Map>) content.get("action"); - List> routes = (List>) content.get("route"); - assertThat(actions).hasSize(2); - assertThat(routes).hasSize(2); - - Map route0 = routes.get(0); - assertThat(route0.keySet()).containsExactly("prefix", "matchFraction", "action"); - assertThat((String) route0.get("prefix")).isEqualTo(""); - assertThat((Map) route0.get("matchFraction")) - .containsExactly("numerator", 10, "denominator", 20); - assertCdsPolicy(actions.get(route0.get("action")), "cluster-foo"); - - Map route1 = routes.get(1); - assertThat(route1.keySet()).containsExactly("path", "headers", "action"); - assertThat((String) route1.get("path")).isEqualTo("/service/method"); - Map header = Iterables.getOnlyElement((List>) route1.get("headers")); - assertThat(header) - .containsExactly("name", ":scheme", "exactMatch", "https", "invertMatch", false); - assertWeightedTargetPolicy( - actions.get(route1.get("action")), - ImmutableMap.of( - "cluster-foo", 20, "cluster-bar", 20)); + public void resolve_resourceUpdatedBeforeCallStarted() { + InternalConfigSelector configSelector = resolveToClusters(); + reset(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverRoutes( + Arrays.asList( + new Route( + new RouteMatch(null, call1.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(20L), "another-cluster", null)), + new Route( + new RouteMatch(null, call2.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + // Two consecutive service config updates: one for removing clcuster1, + // one for adding "another=cluster". + verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); + ResolutionResult result = resolutionResultCaptor.getValue(); + assertServiceConfigForLoadBalancingConfig( + Arrays.asList(cluster2, "another-cluster"), + (Map) result.getServiceConfig().getConfig()); + assertThat(result.getAttributes().get(InternalConfigSelector.KEY)) + .isSameInstanceAs(configSelector); + assertCallSelectResult(call1, configSelector, "another-cluster", 20.0); + + verifyNoMoreInteractions(mockListener); } @SuppressWarnings("unchecked") @Test - public void generateXdsRoutingRawConfig_allowDuplicateMatchers() { - Route route = - new Route( - new RouteMatch( - new PathMatcher("/service/method", null, null), - Collections.emptyList(), null), - new RouteAction(15L, "cluster-foo", null)); - - Map config = - XdsNameResolver.generateXdsRoutingRawConfig(Arrays.asList(route, route)); - assertThat(config.keySet()).containsExactly(XDS_ROUTING_POLICY_NAME); - Map content = (Map) config.get(XDS_ROUTING_POLICY_NAME); - assertThat(content.keySet()).containsExactly("action", "route"); - Map actions = (Map) content.get("action"); - List routes = (List) content.get("route"); - assertThat(actions).hasSize(1); - assertThat(routes).hasSize(2); - assertThat(routes.get(0)).isEqualTo(routes.get(1)); + public void resolve_raceBetweenCallAndRepeatedResourceUpdate() { + InternalConfigSelector configSelector = resolveToClusters(); + assertCallSelectResult(call1, configSelector, cluster1, 15.0); + + reset(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverRoutes( + Arrays.asList( + new Route( + new RouteMatch(null, call1.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(20L), "another-cluster", null)), + new Route( + new RouteMatch(null, call2.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + + verify(mockListener).onResult(resolutionResultCaptor.capture()); + ResolutionResult result = resolutionResultCaptor.getValue(); + assertServiceConfigForLoadBalancingConfig( + Arrays.asList(cluster1, cluster2, "another-cluster"), + (Map) result.getServiceConfig().getConfig()); + + xdsClient.deliverRoutes( + Arrays.asList( + new Route( + new RouteMatch(null, call1.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), "another-cluster", null)), + new Route( + new RouteMatch(null, call2.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + verifyNoMoreInteractions(mockListener); // no cluster added/deleted + assertCallSelectResult(call1, configSelector, "another-cluster", 15.0); + } + + @Test + public void resolve_raceBetweenClusterReleasedAndResourceUpdateAddBackAgain() { + InternalConfigSelector configSelector = resolveToClusters(); + Result result = assertCallSelectResult(call1, configSelector, cluster1, 15.0); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverRoutes( + Collections.singletonList( + new Route( + new RouteMatch(null, call2.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + xdsClient.deliverRoutes( + Arrays.asList( + new Route( + new RouteMatch(null, call1.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster1, null)), + new Route( + new RouteMatch(null, call2.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + result.getCommittedCallback().run(); + verifyNoMoreInteractions(mockListener); } @SuppressWarnings("unchecked") @Test - public void convertToRawRoute() throws IOException { - RouteMatch routeMatch1 = - new RouteMatch( - new PathMatcher("/service/method", null, null), - Collections.emptyList(), null); - String expectedJson1 = "{\n" - + " \"path\": \"/service/method\",\n" - + " \"action\": \"action_foo\"" - + "}"; - assertThat(XdsNameResolver.convertToRawRoute(routeMatch1, "action_foo")) - .isEqualTo(JsonParser.parse(expectedJson1)); - - RouteMatch routeMatch2 = - new RouteMatch( - new PathMatcher(null, "/", null), Collections.emptyList(), - new FractionMatcher(10, 100)); - Map rawRoute2 = XdsNameResolver.convertToRawRoute(routeMatch2, "action_foo"); - Map rawMatchFraction = (Map) rawRoute2.get("matchFraction"); - assertThat(rawMatchFraction).containsExactly("numerator", 10, "denominator", 100); - - RouteMatch routeMatch3 = - new RouteMatch( - new PathMatcher(null, "/", null), - Arrays.asList( - new HeaderMatcher("timeout", null, null, new HeaderMatcher.Range(0L, 10L), - null, null, null, false)), - null); - Map rawRoute3 = XdsNameResolver.convertToRawRoute(routeMatch3, "action_foo"); - Map header = - (Map) Iterables.getOnlyElement((List) rawRoute3.get("headers")); - assertThat((Map) header.get("rangeMatch")).containsExactly("start", 0L, "end", 10L); - - RouteMatch routeMatch4 = - new RouteMatch( - new PathMatcher(null, "/", null), - Arrays.asList( - new HeaderMatcher(":scheme", "https", null, null, null, null, null, false), - new HeaderMatcher( - ":path", null, Pattern.compile("google.*"), null, null, null, null, true), - new HeaderMatcher("timeout", null, null, null, true, null, null, false), - new HeaderMatcher(":authority", null, null, null, null, "google", null, false), - new HeaderMatcher(":authority", null, null, null, null, null, "grpc.io", false)), - null); + public void resolve_simpleCallSucceeds_routeToWeightedCluster() { + when(mockRandom.nextInt(anyInt())).thenReturn(90, 10); + resolver.start(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverRoutes( + Arrays.asList( + new Route( + new RouteMatch(null, call1.getFullMethodNameForPath()), + new RouteAction( + TimeUnit.SECONDS.toNanos(20L), null, + Arrays.asList( + new ClusterWeight(cluster1, 20), new ClusterWeight(cluster2, 80)))))); + verify(mockListener).onResult(resolutionResultCaptor.capture()); + ResolutionResult result = resolutionResultCaptor.getValue(); + assertThat(result.getAddresses()).isEmpty(); + assertServiceConfigForLoadBalancingConfig( + Arrays.asList(cluster1, cluster2), (Map) result.getServiceConfig().getConfig()); + assertThat(result.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL)).isNotNull(); + InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); + Result selectResult = configSelector.selectConfig( + new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + assertThat(selectResult.getStatus().isOk()).isTrue(); + assertThat(selectResult.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY)) + .isEqualTo(cluster2); + assertServiceConfigForMethodConfig(20.0, (Map) selectResult.getConfig()); - String expectedJson4 = "{\n" - + " \"prefix\": \"/\",\n" - + " \"headers\": [\n" - + " {\n" - + " \"name\": \":scheme\",\n" - + " \"exactMatch\": \"https\",\n" - + " \"invertMatch\": false\n" - + " },\n" - + " {\n" - + " \"name\": \":path\",\n" - + " \"regexMatch\": \"google.*\",\n" - + " \"invertMatch\": true\n" - + " },\n" - + " {\n" - + " \"name\": \"timeout\",\n" - + " \"presentMatch\": true,\n" - + " \"invertMatch\": false\n" - + " },\n" - + " {\n" - + " \"name\": \":authority\",\n" - + " \"prefixMatch\": \"google\",\n" - + " \"invertMatch\": false\n" - + " },\n" - + " {\n" - + " \"name\": \":authority\",\n" - + " \"suffixMatch\": \"grpc.io\",\n" - + " \"invertMatch\": false\n" - + " }\n" - + " ],\n" - + " \"action\": \"action_foo\"" - + "}"; - assertThat(XdsNameResolver.convertToRawRoute(routeMatch4, "action_foo")) - .isEqualTo(JsonParser.parse(expectedJson4)); + selectResult = configSelector.selectConfig( + new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + assertThat(selectResult.getStatus().isOk()).isTrue(); + assertThat(selectResult.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY)) + .isEqualTo(cluster1); + assertServiceConfigForMethodConfig(20.0, (Map) selectResult.getConfig()); } - /** Asserts that the given action contains a single CDS policy with the given cluster name. */ @SuppressWarnings("unchecked") - static void assertCdsPolicy(Map action, String clusterName) { - assertThat(action.keySet()).containsExactly("childPolicy"); - Map lbConfig = - Iterables.getOnlyElement((List>) action.get("childPolicy")); - assertThat(lbConfig.keySet()).containsExactly(CDS_POLICY_NAME); - Map rawConfigValues = (Map) lbConfig.get(CDS_POLICY_NAME); - assertThat(rawConfigValues).containsExactly("cluster", clusterName); + private static Result assertCallSelectResult( + CallInfo call, InternalConfigSelector configSelector, String expectedCluster, + double expectedTimeoutSec) { + Result result = configSelector.selectConfig( + new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY)) + .isEqualTo(expectedCluster); + assertServiceConfigForMethodConfig(expectedTimeoutSec, (Map) result.getConfig()); + return result; + } + + @SuppressWarnings("unchecked") + private InternalConfigSelector resolveToClusters() { + resolver.start(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverRoutes( + Arrays.asList( + new Route( + new RouteMatch(null, call1.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster1, null)), + new Route( + new RouteMatch(null, call2.getFullMethodNameForPath()), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + verify(mockListener).onResult(resolutionResultCaptor.capture()); + ResolutionResult result = resolutionResultCaptor.getValue(); + assertThat(result.getAddresses()).isEmpty(); + assertServiceConfigForLoadBalancingConfig( + Arrays.asList(cluster1, cluster2), (Map) result.getServiceConfig().getConfig()); + assertThat(result.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL)).isNotNull(); + return result.getAttributes().get(InternalConfigSelector.KEY); } /** - * Asserts that the given action contains a single weighted-target policy with the given cluster - * to weight mapping. + * Verifies the raw service config contains a single method config for method with the + * specified timeout. */ - @SuppressWarnings("unchecked") - static void assertWeightedTargetPolicy( - Map action, Map clusterWeights) { - assertThat(action.keySet()).containsExactly("childPolicy"); - Map lbConfig = - Iterables.getOnlyElement((List>) action.get("childPolicy")); - assertThat(lbConfig.keySet()).containsExactly(WEIGHTED_TARGET_POLICY_NAME); - Map rawConfigValues = (Map) lbConfig.get(WEIGHTED_TARGET_POLICY_NAME); - assertWeightedTargetConfigClusterWeights(rawConfigValues, clusterWeights); + private static void assertServiceConfigForMethodConfig( + double timeoutSec, Map actualServiceConfig) { + List> rawMethodConfigs = + JsonUtil.getListOfObjects(actualServiceConfig, "methodConfig"); + Map methodConfig = Iterables.getOnlyElement(rawMethodConfigs); + List> methods = JsonUtil.getListOfObjects(methodConfig, "name"); + assertThat(Iterables.getOnlyElement(methods)).isEmpty(); + assertThat(JsonUtil.getString(methodConfig, "timeout")).isEqualTo(timeoutSec + "s"); } /** - * Asserts that the given raw config is a weighted-target config with the given cluster to weight - * mapping. + * Verifies the raw service config contains an xDS load balancing config for the given clusters. */ + private static void assertServiceConfigForLoadBalancingConfig( + List clusters, Map actualServiceConfig) { + List> rawLbConfigs = + JsonUtil.getListOfObjects(actualServiceConfig, "loadBalancingConfig"); + Map lbConfig = Iterables.getOnlyElement(rawLbConfigs); + assertThat(lbConfig.keySet()).containsExactly("cluster_manager_experimental"); + Map clusterManagerLbConfig = + JsonUtil.getObject(lbConfig, "cluster_manager_experimental"); + Map clusterManagerChildLbPolicies = + JsonUtil.getObject(clusterManagerLbConfig, "childPolicy"); + assertThat(clusterManagerChildLbPolicies.keySet()).containsExactlyElementsIn(clusters); + for (String cluster : clusters) { + Map childLbConfig = JsonUtil.getObject(clusterManagerChildLbPolicies, cluster); + assertThat(childLbConfig.keySet()).containsExactly("lbPolicy"); + List> childLbConfigValues = + JsonUtil.getListOfObjects(childLbConfig, "lbPolicy"); + Map cdsLbPolicy = Iterables.getOnlyElement(childLbConfigValues); + assertThat(cdsLbPolicy.keySet()).containsExactly("cds_experimental"); + assertThat(JsonUtil.getObject(cdsLbPolicy, "cds_experimental")) + .containsExactly("cluster", cluster); + } + } + @SuppressWarnings("unchecked") - static void assertWeightedTargetConfigClusterWeights( - Map rawConfigValues, Map clusterWeight) { - assertThat(rawConfigValues.keySet()).containsExactly("targets"); - Map targets = (Map) rawConfigValues.get("targets"); - assertThat(targets.keySet()).isEqualTo(clusterWeight.keySet()); - for (String targetName : targets.keySet()) { - Map target = (Map) targets.get(targetName); - assertThat(target.keySet()).containsExactly("childPolicy", "weight"); - Map lbConfig = - Iterables.getOnlyElement((List>) target.get("childPolicy")); - assertThat(lbConfig.keySet()).containsExactly(CDS_POLICY_NAME); - Map rawClusterConfigValues = (Map) lbConfig.get(CDS_POLICY_NAME); - assertThat(rawClusterConfigValues).containsExactly("cluster", targetName); - assertThat(target.get("weight")).isEqualTo(clusterWeight.get(targetName)); + @Test + public void generateServiceConfig_forLoadBalancingConfig() throws IOException { + List clusters = Arrays.asList("cluster-foo", "cluster-bar", "cluster-baz"); + String expectedServiceConfigJson = "{\n" + + " \"loadBalancingConfig\": [{\n" + + " \"cluster_manager_experimental\": {\n" + + " \"childPolicy\": {\n" + + " \"cluster-foo\": {\n" + + " \"lbPolicy\": [{\n" + + " \"cds_experimental\": {\n" + + " \"cluster\": \"cluster-foo\"\n" + + " }\n" + + " }]\n" + + " },\n" + + " \"cluster-bar\": {\n" + + " \"lbPolicy\": [{\n" + + " \"cds_experimental\": {\n" + + " \"cluster\": \"cluster-bar\"\n" + + " }\n" + + " }]\n" + + " },\n" + + " \"cluster-baz\": {\n" + + " \"lbPolicy\": [{\n" + + " \"cds_experimental\": {\n" + + " \"cluster\": \"cluster-baz\"\n" + + " }\n" + + " }]\n" + + " }\n" + + " }\n" + + " }\n" + + " }]\n" + + "}"; + Map expectedServiceConfig = + (Map) JsonParser.parse(expectedServiceConfigJson); + assertThat(XdsNameResolver.generateServiceConfigWithLoadBalancingConfig(clusters)) + .isEqualTo(expectedServiceConfig); + } + + @SuppressWarnings("unchecked") + @Test + public void generateServiceConfig_forMethodTimeoutConfig() throws IOException { + long timeoutNano = TimeUnit.SECONDS.toNanos(1L) + 1L; // 1.0000000001s + String expectedServiceConfigJson = "{\n" + + " \"methodConfig\": [{\n" + + " \"name\": [ {} ],\n" + + " \"timeout\": \"1.000000001s\"\n" + + " }]\n" + + "}"; + Map expectedServiceConfig = + (Map) JsonParser.parse(expectedServiceConfigJson); + assertThat(XdsNameResolver.generateServiceConfigWithMethodTimeoutConfig(timeoutNano)) + .isEqualTo(expectedServiceConfig); + } + + private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory { + @Override + public ObjectPool newXdsClientObjectPool(BootstrapInfo bootstrapInfo) { + return new ObjectPool() { + @Override + public XdsClient getObject() { + return new FakeXdsClient(); + } + + @Override + public XdsClient returnObject(Object object) { + return null; + } + }; + } + } + + private class FakeXdsClient extends XdsClient { + private String resource; + private ConfigWatcher watcher; + + @Override + void watchConfigData(String targetAuthority, ConfigWatcher watcher) { + resource = targetAuthority; + this.watcher = watcher; + } + + @Override + void shutdown() { + // no-op + } + + void deliverRoutes(final List routes) { + syncContext.execute(new Runnable() { + @Override + public void run() { + watcher.onConfigChanged(ConfigUpdate.newBuilder().addRoutes(routes).build()); + } + }); + } + + void deliverError(final Status error) { + syncContext.execute(new Runnable() { + @Override + public void run() { + watcher.onError(error); + } + }); + } + + void deliverResourceNotFound() { + Preconditions.checkState(resource != null, "no resource subscribed"); + syncContext.execute(new Runnable() { + @Override + public void run() { + watcher.onResourceDoesNotExist(resource); + } + }); + } + } + + private static final class CallInfo { + private final String service; + private final String method; + private final MethodDescriptor methodDescriptor; + + private CallInfo(String service, String method) { + this.service = service; + this.method = method; + methodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodType.UNARY).setFullMethodName(service + "/" + method) + .setRequestMarshaller(TestMethodDescriptors.voidMarshaller()) + .setResponseMarshaller(TestMethodDescriptors.voidMarshaller()) + .build(); + } + + private String getFullMethodNameForPath() { + return "/" + service + "/" + method; } } }