Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,22 @@ public LoadBalancer newLoadBalancer(Helper helper) {
return new AutoConfiguredLoadBalancer(helper);
}

private static final class EmptySubchannelPicker extends SubchannelPicker {
private static final class NoopLoadBalancer extends LoadBalancer {

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
public void handleResolvedAddressGroups(List<EquivalentAddressGroup> s, Attributes a) {}

@Override
public void handleNameResolutionError(Status error) {}

@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {}

@Override
public void shutdown() {}
}


@VisibleForTesting
static final class AutoConfiguredLoadBalancer extends LoadBalancer {
private final Helper helper;
Expand All @@ -75,9 +83,22 @@ static final class AutoConfiguredLoadBalancer extends LoadBalancer {
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
Map<String, Object> configMap = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
Factory newlbf = decideLoadBalancerFactory(servers, configMap);
Factory newlbf;
try {
newlbf = decideLoadBalancerFactory(servers, configMap);
} catch (RuntimeException e) {
Status s = Status.INTERNAL
.withDescription("Failed to pick a load balancer from service config")
.withCause(e);
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s));
delegate.shutdown();
delegateFactory = null;
delegate = new NoopLoadBalancer();
return;
}

if (newlbf != null && newlbf != delegateFactory) {
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptySubchannelPicker());
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker());
delegate.shutdown();
delegateFactory = newlbf;
delegate = delegateFactory.newLoadBalancer(helper);
Expand Down Expand Up @@ -181,4 +202,25 @@ static LoadBalancer.Factory decideLoadBalancerFactory(
return PickFirstBalancerFactory.getInstance();
}
}

private static final class EmptyPicker extends SubchannelPicker {

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
}

private static final class FailingPicker extends SubchannelPicker {
private final Status failure;

FailingPicker(Status failure) {
this.failure = failure;
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(failure);
}
}
}
107 changes: 107 additions & 0 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
Expand Down Expand Up @@ -85,6 +88,8 @@
import io.grpc.internal.Channelz.ChannelTrace;
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import io.grpc.stub.ClientCalls;
import io.grpc.testing.TestMethodDescriptors;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
Expand All @@ -97,12 +102,15 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -2558,6 +2566,105 @@ public double nextDouble() {
channel.isTerminated());
}

@Test
public void badServiceConfigIsRecoverable() throws Exception {
final List<EquivalentAddressGroup> addresses =
ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {}));
final class FakeNameResolver extends NameResolver {
Listener listener;

@Override
public String getServiceAuthority() {
return "also fake";
}

@Override
public void start(Listener listener) {
this.listener = listener;
listener.onAddresses(addresses,
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom"))
.build());
}

@Override
public void shutdown() {}
}

final class FakeNameResolverFactory extends NameResolver.Factory {
FakeNameResolver resolver;

@Nullable
@Override
public NameResolver newNameResolver(URI targetUri, Attributes params) {
return (resolver = new FakeNameResolver());
}

@Override
public String getDefaultScheme() {
return "fake";
}
}

FakeNameResolverFactory factory = new FakeNameResolverFactory();
final class CustomBuilder extends AbstractManagedChannelImplBuilder<CustomBuilder> {

CustomBuilder() {
super(TARGET);
this.executorPool = ManagedChannelImplTest.this.executorPool;
this.channelz = ManagedChannelImplTest.this.channelz;
}

@Override
protected ClientTransportFactory buildTransportFactory() {
return mockTransportFactory;
}
}

ManagedChannel mychannel = new CustomBuilder()
.nameResolverFactory(factory)
.loadBalancerFactory(new AutoConfiguredLoadBalancerFactory()).build();

ClientCall<Void, Void> call1 =
mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT);
ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null);
executor.runDueTasks();
try {
future1.get();
Assert.fail();
} catch (ExecutionException e) {
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom");
}

// ok the service config is bad, let's fix it.

factory.resolver.listener.onAddresses(addresses,
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin"))
.build());

ClientCall<Void, Void> call2 = mychannel.newCall(
TestMethodDescriptors.voidMethod(),
CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);

timer.forwardTime(1234, TimeUnit.SECONDS);

executor.runDueTasks();
try {
future2.get();
Assert.fail();
} catch (ExecutionException e) {
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline");
}

mychannel.shutdownNow();
}

private static final class ChannelBuilder
extends AbstractManagedChannelImplBuilder<ChannelBuilder> {

Expand Down