Skip to content

Commit

Permalink
xds: add EdsLoadBalancerProvider to shortcut CDS to EDS flow skipping…
Browse files Browse the repository at this point in the history
… fallback

The `EdsLoadBalancerProvider` provides `LookasideLb` (Will rename `LookasideLb` to `EdsLoadBalancer` in future, but kept the name now to show better diff) with no-op callbacks for fallback.

- `CdsLoadBalancer` will load `EdsLoadBalancerProvider/LookasideLb` directly skipping fallback.

- The EDS-only flow is unchanged, still loading `XdsLoadBalancerProvider/XdsLoadBalancer2`, keeping current fallback behavior and producing horrible error message when both the primary and fallback policy fail.
  • Loading branch information
dapengzhang0 committed Jan 30, 2020
1 parent 4ad3acb commit f04c492
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 9 deletions.
4 changes: 2 additions & 2 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLoadBalancerProvider.XDS_POLICY_NAME;
import static io.grpc.xds.EdsLoadBalancerProvider.EDS_POLICY_NAME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -308,7 +308,7 @@ public void onClusterChanged(ClusterUpdate newUpdate) {
/* lrsServerName = */ newUpdate.getLrsServerName());
updateSslContextProvider(newUpdate.getUpstreamTlsContext());
if (edsBalancer == null) {
edsBalancer = lbRegistry.getProvider(XDS_POLICY_NAME).newLoadBalancer(helper);
edsBalancer = lbRegistry.getProvider(EDS_POLICY_NAME).newLoadBalancer(helper);
}
edsBalancer.handleResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(edsConfig).build());
Expand Down
75 changes: 75 additions & 0 deletions xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.xds.LookasideLb.EndpointUpdateCallback;
import java.util.Map;

/**
* The provider for the "eds" balancing policy. This class should not be directly referenced in
* code. The policy should be accessed through {@link io.grpc.LoadBalancerRegistry#getProvider}
* with the name "eds_experimental").
*/
@Internal
public class EdsLoadBalancerProvider extends LoadBalancerProvider {

static final String EDS_POLICY_NAME = "eds_experimental";

@Override
public boolean isAvailable() {
return true;
}

@Override
public int getPriority() {
return 5;
}

@Override
public String getPolicyName() {
return EDS_POLICY_NAME;
}

@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new LookasideLb(
helper,
new EndpointUpdateCallback() {
@Override
public void onWorking() {}

@Override
public void onError() {}

@Override
public void onAllDrop() {}
});
}

@Override
public ConfigOrError parseLoadBalancingPolicyConfig(
Map<String, ?> rawLoadBalancingPolicyConfig) {
return XdsLoadBalancerProvider.parseLoadBalancingConfigPolicy(
rawLoadBalancingPolicyConfig, LoadBalancerRegistry.getDefaultRegistry());
}
}
6 changes: 6 additions & 0 deletions xds/src/main/java/io/grpc/xds/LookasideLb.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ public void onEndpointChanged(EndpointUpdate endpointUpdate) {
public void onError(Status error) {
channelLogger.log(ChannelLogLevel.ERROR, "EDS load balancer received an error: {0}", error);
endpointUpdateCallback.onError();
// If we get an error before getting any valid result, we should put the channel in
// TRANSIENT_FAILURE; if they get an error after getting a valid result, we keep using the
// previous channel state.
if (!firstEndpointUpdateReceived) {
lookasideLbHelper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
io.grpc.xds.CdsLoadBalancerProvider
io.grpc.xds.EdsLoadBalancerProvider
io.grpc.xds.XdsLoadBalancerProvider
12 changes: 6 additions & 6 deletions xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLoadBalancerProvider.XDS_POLICY_NAME;
import static io.grpc.xds.EdsLoadBalancerProvider.EDS_POLICY_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
Expand Down Expand Up @@ -95,7 +95,7 @@ XdsClient createXdsClient() {
);

private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
private final LoadBalancerProvider fakeXdsLoadBlancerProvider = new LoadBalancerProvider() {
private final LoadBalancerProvider fakeEdsLoadBlancerProvider = new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
return true;
Expand All @@ -108,7 +108,7 @@ public int getPriority() {

@Override
public String getPolicyName() {
return XDS_POLICY_NAME;
return EDS_POLICY_NAME;
}

@Override
Expand Down Expand Up @@ -150,7 +150,7 @@ public void setUp() {
doReturn(channelLogger).when(helper).getChannelLogger();
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
lbRegistry.register(fakeXdsLoadBlancerProvider);
lbRegistry.register(fakeEdsLoadBlancerProvider);
cdsLoadBalancer = new CdsLoadBalancer(helper, lbRegistry, mockTlsContextManager);
}

Expand Down Expand Up @@ -523,8 +523,8 @@ public void clusterWatcher_onErrorCalledBeforeAndAfterOnClusterChanged() throws

@Test
public void cdsBalancerIntegrateWithEdsBalancer() throws Exception {
lbRegistry.deregister(fakeXdsLoadBlancerProvider);
lbRegistry.register(new XdsLoadBalancerProvider());
lbRegistry.deregister(fakeEdsLoadBlancerProvider);
lbRegistry.register(new EdsLoadBalancerProvider());

ResolvedAddresses resolvedAddresses1 = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
Expand Down
28 changes: 27 additions & 1 deletion xds/src/test/java/io/grpc/xds/LookasideLbTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -663,13 +663,39 @@ LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry,
}

@Test
public void verifyErrorPropagation() {
public void verifyErrorPropagation_noPreviousEndpointUpdateReceived() {
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));

verify(edsUpdateCallback, never()).onError();
// Forwarding 20 seconds so that the xds client will deem EDS resource not available.
fakeClock.forwardTime(20, TimeUnit.SECONDS);
verify(edsUpdateCallback).onError();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
}

@Test
public void verifyErrorPropagation_withPreviousEndpointUpdateReceived() {
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));
// Endpoint update received.
ClusterLoadAssignment clusterLoadAssignment =
buildClusterLoadAssignment("edsServiceName1",
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.of(buildDropOverload("throttle", 1000)));
receiveEndpointUpdate(clusterLoadAssignment);

verify(helper, never()).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
verify(edsUpdateCallback, never()).onError();

// XdsClient stream receives an error.
responseObserver.onError(new RuntimeException("fake error"));
verify(helper, never()).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
verify(edsUpdateCallback).onError();
}

/**
Expand Down

0 comments on commit f04c492

Please sign in to comment.