Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLoadBalancerProvider.XDS_POLICY_NAME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand All @@ -35,6 +36,7 @@
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.LocalityStore.LocalityStoreImpl;
Expand Down Expand Up @@ -314,12 +316,7 @@ protected Helper delegate() {
fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName())
.newLoadBalancer(fallbackBalancerHelper);
fallbackBalancerHelper.balancer = fallbackBalancer;
// TODO(carl-mastrangelo): propagate the load balancing config policy
fallbackBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(fallbackServers)
.setAttributes(fallbackAttributes)
.build());
propagateFallbackAddresses();
}

void updateFallbackServers(
Expand All @@ -334,12 +331,7 @@ void updateFallbackServers(
this.fallbackPolicy = fallbackPolicy;
if (fallbackBalancer != null) {
if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) {
// TODO(carl-mastrangelo): propagate the load balancing config policy
fallbackBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(fallbackServers)
.setAttributes(fallbackAttributes)
.build());
propagateFallbackAddresses();
} else {
fallbackBalancer.shutdown();
fallbackBalancer = null;
Expand All @@ -348,6 +340,38 @@ void updateFallbackServers(
}
}

private void propagateFallbackAddresses() {
String fallbackPolicyName = fallbackPolicy.getPolicyName();
List<EquivalentAddressGroup> servers = fallbackServers;

// Some addresses in the list may be grpclb-v1 balancer addresses, so if the fallback policy
// does not support grpclb-v1 balancer addresses, then we need to exclude them from the list.
if (!fallbackPolicyName.equals("grpclb") && !fallbackPolicyName.equals(XDS_POLICY_NAME)) {
ImmutableList.Builder<EquivalentAddressGroup> backends = ImmutableList.builder();
for (EquivalentAddressGroup eag : fallbackServers) {
if (eag.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) == null) {
backends.add(eag);
}
}
servers = backends.build();
}

// TODO(zhangkun83): FIXME(#5496): this is a temporary hack.
if (servers.isEmpty()
&& !fallbackBalancer.canHandleEmptyAddressListFromNameResolution()) {
fallbackBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address."
+ " addrs=" + fallbackServers + ", attrs=" + fallbackAttributes));
} else {
// TODO(carl-mastrangelo): propagate the load balancing config policy
fallbackBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(fallbackAttributes)
.build());
}
}

void startFallbackTimer() {
if (fallbackTimer == null) {
class FallbackTask implements Runnable {
Expand Down
4 changes: 3 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
@Internal
public final class XdsLoadBalancerProvider extends LoadBalancerProvider {

static final String XDS_POLICY_NAME = "xds_experimental";

private static final LbConfig DEFAULT_FALLBACK_POLICY =
new LbConfig("round_robin", ImmutableMap.<String, Void>of());

Expand All @@ -55,7 +57,7 @@ public int getPriority() {

@Override
public String getPolicyName() {
return "xds_experimental";
return XDS_POLICY_NAME;
}

@Override
Expand Down
113 changes: 108 additions & 5 deletions xds/src/test/java/io/grpc/xds/FallbackManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import com.google.common.collect.ImmutableList;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.EquivalentAddressGroup;
Expand All @@ -29,11 +30,15 @@
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.xds.XdsLoadBalancer.FallbackManager;
import java.util.ArrayList;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +47,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
Expand Down Expand Up @@ -70,7 +76,7 @@ public int getPriority() {

@Override
public String getPolicyName() {
return "test_policy";
return fallbackPolicy.getPolicyName();
}

@Override
Expand Down Expand Up @@ -127,10 +133,10 @@ public void setUp() {
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
doReturn(channelLogger).when(helper).getChannelLogger();
fallbackPolicy = new LbConfig("test_policy", new HashMap<String, Void>());
lbRegistry.register(fakeRoundRonbinLbProvider);
lbRegistry.register(fakeFallbackLbProvider);
fallbackManager = new FallbackManager(helper, lbRegistry);
fallbackPolicy = new LbConfig("test_policy", new HashMap<String, Void>());
}

@After
Expand All @@ -141,7 +147,8 @@ public void tearDown() {
@Test
public void useFallbackWhenTimeout() {
fallbackManager.startFallbackTimer();
List<EquivalentAddressGroup> eags = new ArrayList<>();
List<EquivalentAddressGroup> eags = ImmutableList.of(
new EquivalentAddressGroup(ImmutableList.<SocketAddress>of(new InetSocketAddress(8080))));
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);

Expand All @@ -164,10 +171,106 @@ public void useFallbackWhenTimeout() {
.build());
}

@Test
public void fallback_handleBackendsEagsOnly() {
fallbackManager.startFallbackTimer();
EquivalentAddressGroup eag0 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8080)));
Attributes attributes = Attributes
.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address")
.build();
EquivalentAddressGroup eag1 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8081)), attributes);
EquivalentAddressGroup eag2 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8082)));
List<EquivalentAddressGroup> eags = ImmutableList.of(eag0, eag1, eag2);
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);

fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);

assertThat(fallbackManager.isInFallbackMode()).isTrue();
verify(fakeFallbackLb).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.of(eag0, eag2))
.setAttributes(
Attributes.newBuilder()
.set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG,
fallbackPolicy.getRawConfigValue())
.build())
.build());
}

@Test
public void fallback_handleGrpclbAddresses() {
lbRegistry.deregister(fakeFallbackLbProvider);
fallbackPolicy = new LbConfig("grpclb", new HashMap<String, Void>());
lbRegistry.register(fakeFallbackLbProvider);

fallbackManager.startFallbackTimer();
EquivalentAddressGroup eag0 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8080)));
Attributes attributes = Attributes
.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address")
.build();
EquivalentAddressGroup eag1 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8081)), attributes);
EquivalentAddressGroup eag2 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8082)));
List<EquivalentAddressGroup> eags = ImmutableList.of(eag0, eag1, eag2);
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);

fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);

assertThat(fallbackManager.isInFallbackMode()).isTrue();
verify(fakeFallbackLb).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(eags)
.setAttributes(
Attributes.newBuilder()
.set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG,
fallbackPolicy.getRawConfigValue())
.build())
.build());
}

@Test
public void fallback_onlyGrpclbAddresses_NoBackendAddress() {
lbRegistry.deregister(fakeFallbackLbProvider);
fallbackPolicy = new LbConfig("not_grpclb", new HashMap<String, Void>());
lbRegistry.register(fakeFallbackLbProvider);

fallbackManager.startFallbackTimer();
Attributes attributes = Attributes
.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address")
.build();
EquivalentAddressGroup eag1 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8081)), attributes);
EquivalentAddressGroup eag2 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8082)), attributes);
List<EquivalentAddressGroup> eags = ImmutableList.of(eag1, eag2);
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);

fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);

assertThat(fallbackManager.isInFallbackMode()).isTrue();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(fakeFallbackLb).handleNameResolutionError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
}

@Test
public void cancelFallback() {
fallbackManager.startFallbackTimer();
List<EquivalentAddressGroup> eags = new ArrayList<>();
List<EquivalentAddressGroup> eags = ImmutableList.of(
new EquivalentAddressGroup(ImmutableList.<SocketAddress>of(new InetSocketAddress(8080))));
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);

Expand Down
3 changes: 3 additions & 0 deletions xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ public void onCompleted() {

doReturn(oobChannel1).doReturn(oobChannel2).doReturn(oobChannel3)
.when(helper).createResolvingOobChannel(anyString());

// To write less tedious code for tests, allow fallbackBalancer to handle empty address list.
doReturn(true).when(fallbackBalancer1).canHandleEmptyAddressListFromNameResolution();
}

@After
Expand Down