diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index b225b01af7a..0106da79028 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -35,6 +35,7 @@ import io.grpc.internal.ObjectPool; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingSubchannel; +import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig; import io.grpc.xds.Endpoints.DropOverload; @@ -87,7 +88,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer { private CallCounterProvider callCounterProvider; private ClusterDropStats dropStats; private ClusterImplLbHelper childLbHelper; - private LoadBalancer childLb; + private GracefulSwitchLoadBalancer childSwitchLb; ClusterImplLoadBalancer(Helper helper) { this(helper, ThreadSafeRandomImpl.instance); @@ -120,7 +121,7 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { childLbHelper = new ClusterImplLbHelper( callCounterProvider.getOrCreate(config.cluster, config.edsServiceName), config.lrsServerInfo); - childLb = config.childPolicy.getProvider().newLoadBalancer(childLbHelper); + childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper); // Assume load report server does not change throughout cluster lifetime. if (config.lrsServerInfo != null) { dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName); @@ -129,7 +130,9 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { childLbHelper.updateDropPolicies(config.dropCategories); childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests); childLbHelper.updateSslContextProviderSupplier(config.tlsContext); - childLb.handleResolvedAddresses( + + childSwitchLb.switchTo(config.childPolicy.getProvider()); + childSwitchLb.handleResolvedAddresses( resolvedAddresses.toBuilder() .setAttributes(attributes) .setLoadBalancingPolicyConfig(config.childPolicy.getConfig()) @@ -139,8 +142,8 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { @Override public void handleNameResolutionError(Status error) { - if (childLb != null) { - childLb.handleNameResolutionError(error); + if (childSwitchLb != null) { + childSwitchLb.handleNameResolutionError(error); } else { helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker(error)); } @@ -151,8 +154,8 @@ public void shutdown() { if (dropStats != null) { dropStats.release(); } - if (childLb != null) { - childLb.shutdown(); + if (childSwitchLb != null) { + childSwitchLb.shutdown(); if (childLbHelper != null) { childLbHelper.updateSslContextProviderSupplier(null); childLbHelper = null; diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index f553b558c9f..00ecd05e399 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -165,6 +165,41 @@ public void handleResolvedAddresses_propagateToChildPolicy() { .isSameInstanceAs(xdsClientPool); } + /** + * If the control plane switches from using the legacy lb_policy field in the xDS Cluster proto + * to the newer load_balancing_policy then the child policy can switch from weighted_target to + * xds_wrr_locality (this could happen the opposite way as well). This test assures that this + * results in the child LB changing if this were to happen. If this is not done correctly the new + * configuration would be given to the old LB implementation which would cause a channel panic. + */ + @Test + public void handleResolvedAddresses_childPolicyChanges() { + FakeLoadBalancerProvider weightedTargetProvider = + new FakeLoadBalancerProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); + Object weightedTargetConfig = new Object(); + ClusterImplConfig configWithWeightedTarget = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, + LRS_SERVER_INFO, + null, Collections.emptyList(), + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); + deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWeightedTarget); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers); + assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); + assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig); + + FakeLoadBalancerProvider wrrLocalityProvider = + new FakeLoadBalancerProvider(XdsLbPolicies.WRR_LOCALITY_POLICY_NAME); + Object wrrLocalityConfig = new Object(); + ClusterImplConfig configWithWrrLocality = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, + LRS_SERVER_INFO, + null, Collections.emptyList(), + new PolicySelection(wrrLocalityProvider, wrrLocalityConfig), null); + deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWrrLocality); + childBalancer = Iterables.getOnlyElement(downstreamBalancers); + assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.WRR_LOCALITY_POLICY_NAME); + assertThat(childBalancer.config).isSameInstanceAs(wrrLocalityConfig); + } + @Test public void nameResolutionError_beforeChildPolicyInstantiated_returnErrorPickerToUpstream() { loadBalancer.handleNameResolutionError(Status.UNIMPLEMENTED.withDescription("not found"));