Skip to content

Commit

Permalink
Merge 21c96a4 into 89cd643
Browse files Browse the repository at this point in the history
  • Loading branch information
creamsoup committed Nov 15, 2019
2 parents 89cd643 + 21c96a4 commit b272f45
Show file tree
Hide file tree
Showing 7 changed files with 693 additions and 42 deletions.
5 changes: 3 additions & 2 deletions api/src/main/java/io/grpc/NameResolver.java
Expand Up @@ -315,6 +315,7 @@ public abstract static class Listener2 implements Listener {
@Deprecated
public final void onAddresses(
List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes) {
// TODO(jihuncho) need to promote Listener2 if we want to use ConfigOrError
onResult(
ResolutionResult.newBuilder().setAddresses(servers).setAttributes(attributes).build());
}
Expand All @@ -329,8 +330,8 @@ public final void onAddresses(
public abstract void onResult(ResolutionResult resolutionResult);

/**
* Handles an error from the resolver. The listener is responsible for eventually invoking
* {@link NameResolver#refresh()} to re-attempt resolution.
* Handles a name resolving error from the resolver. The listener is responsible for eventually
* invoking {@link NameResolver#refresh()} to re-attempt resolution.
*
* @param error a non-OK status
* @since 1.21.0
Expand Down
20 changes: 11 additions & 9 deletions core/src/main/java/io/grpc/internal/DnsNameResolver.java
Expand Up @@ -303,26 +303,28 @@ public void run() {
return;
}

Attributes.Builder attrs = Attributes.newBuilder();
ResolutionResult.Builder resultBuilder = ResolutionResult.newBuilder().setAddresses(servers);
if (!resolutionResults.txtRecords.isEmpty()) {
ConfigOrError serviceConfig =
parseServiceConfig(resolutionResults.txtRecords, random, getLocalHostname());
if (serviceConfig != null) {
if (serviceConfig.getError() != null) {
savedListener.onError(serviceConfig.getError());
return;
} else {
resultBuilder.setServiceConfig(serviceConfig);
if (serviceConfig.getError() == null) {
@SuppressWarnings("unchecked")
Map<String, ?> config = (Map<String, ?>) serviceConfig.getConfig();
attrs.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, config);
resultBuilder
.setAttributes(
Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, config)
.build());
}

}
} else {
logger.log(Level.FINE, "No TXT records found for {0}", new Object[]{host});
}
ResolutionResult resolutionResult =
ResolutionResult.newBuilder().setAddresses(servers).setAttributes(attrs.build()).build();
savedListener.onResult(resolutionResult);

savedListener.onResult(resultBuilder.build());
}
}

Expand Down
54 changes: 38 additions & 16 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -130,6 +130,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
static final Status SUBCHANNEL_SHUTDOWN_STATUS =
Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");

private static final Map<String, ?> EMPTY_SERVICE_CONFIG = Collections.emptyMap();

private final InternalLogId logId;
private final String target;
private final NameResolverRegistry nameResolverRegistry;
Expand Down Expand Up @@ -1300,24 +1302,34 @@ private final class NameResolverListener extends NameResolver.Listener2 {
public void onResult(final ResolutionResult resolutionResult) {
final class NamesResolved implements Runnable {

@SuppressWarnings("ReferenceEquality")
@SuppressWarnings({"ReferenceEquality", "unchecked"})
@Override
public void run() {
List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
Attributes attrs = resolutionResult.getAttributes();
channelLogger.log(
ChannelLogLevel.DEBUG, "Resolved address: {0}, config={1}", servers, attrs);

boolean nrReplied = haveBackends == null;

if (haveBackends == null || !haveBackends) {
channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
haveBackends = true;
}

nameResolverBackoffPolicy = null;
ConfigOrError configOrError = resolutionResult.getServiceConfig();
Map<String, ?> serviceConfig = null;
if (configOrError != null) {
if (configOrError.getConfig() != null) {
serviceConfig = (Map<String, ?>) configOrError.getConfig();
} else if (nrReplied || defaultServiceConfig == null) {
// First DNS lookup has invalid service config, and cannot fall back to default(=null)
onError(configOrError.getError());
return;
}
}

// Assuming no error in config resolution for now.
final Map<String, ?> serviceConfig =
attrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
Map<String, ?> effectiveServiceConfig;
if (!lookUpServiceConfig) {
if (serviceConfig != null) {
Expand All @@ -1331,20 +1343,23 @@ public void run() {
// Otherwise, try to use the default config if available
if (serviceConfig != null) {
effectiveServiceConfig = serviceConfig;
} else {
} else if (defaultServiceConfig != null) {
effectiveServiceConfig = defaultServiceConfig;
if (defaultServiceConfig != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Received no service config, using default service config");
}
channelLogger.log(
ChannelLogLevel.INFO,
"Received no service config, using default service config");
} else {
effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
channelLogger.log(
ChannelLogLevel.INFO,
"Received no service config and default service config is not provided. "
+ "Falling back to empty service config");
}

// FIXME(notcarl): reference equality is not right (although not harmful) right now.
// Name resolver should return the same config if txt record is the same
if (effectiveServiceConfig != lastServiceConfig) {
channelLogger.log(ChannelLogLevel.INFO,
"Service config changed{0}", effectiveServiceConfig == null ? " to null" : "");
channelLogger.log(ChannelLogLevel.INFO, "Service config changed");
lastServiceConfig = effectiveServiceConfig;
}

Expand All @@ -1368,9 +1383,9 @@ public void run() {
}
Status handleResult = helper.lb.tryHandleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(effectiveAttrs)
.build());
.setAddresses(servers)
.setAttributes(effectiveAttrs)
.build());
if (!handleResult.isOk()) {
handleErrorInSyncContext(handleResult.augmentDescription(resolver + " was used"));
}
Expand Down Expand Up @@ -1399,13 +1414,20 @@ private void handleErrorInSyncContext(Status error) {
new Object[] {getLogId(), error});
if (haveBackends == null || haveBackends) {
channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
haveBackends = false;
}
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
return;
}
boolean nameResolverReplied = haveBackends == null;
if (nameResolverReplied) {
lastServiceConfig = defaultServiceConfig != null
? defaultServiceConfig : EMPTY_SERVICE_CONFIG;
haveBackends = false;
}

helper.lb.handleNameResolutionError(error);

if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
// The name resolver may invoke onError multiple times, but we only want to
// schedule one backoff attempt
Expand Down
Expand Up @@ -67,6 +67,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -225,17 +226,20 @@ public void newCallExitsIdleness() throws Exception {
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));

verify(mockNameResolver).start(nameResolverListenerCaptor.capture());
Attributes attr =
Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, new HashMap<String, Object>())
.build();
// Simulate new address resolved to make sure the LoadBalancer is correctly linked to
// the NameResolver.
ResolutionResult resolutionResult =
ResolutionResult.newBuilder()
.setAddresses(servers)
.setAttributes(Attributes.EMPTY)
.setAttributes(attr)
.build();
nameResolverListenerCaptor.getValue().onResult(resolutionResult);
verify(mockLoadBalancer).handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
.build());
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(attr).build());
}

@Test
Expand Down
43 changes: 31 additions & 12 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Expand Up @@ -154,6 +154,7 @@
/** Unit tests for {@link ManagedChannelImpl}. */
@RunWith(JUnit4.class)
public class ManagedChannelImplTest {

private static final int DEFAULT_PORT = 447;

private static final MethodDescriptor<String, Integer> method =
Expand All @@ -175,6 +176,10 @@ public class ManagedChannelImplTest {
.setUserAgent(USER_AGENT);
private static final String TARGET = "fake://" + SERVICE_NAME;
private static final String MOCK_POLICY_NAME = "mock_lb";
private static final Attributes EMPTY_SERVICE_CONFIG_ATTRIBUTES =
Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, new HashMap<String, Object>())
.build();
private URI expectedUri;
private final SocketAddress socketAddress =
new SocketAddress() {
Expand Down Expand Up @@ -751,7 +756,7 @@ public void noMoreCallbackAfterLoadBalancerShutdown() {
verify(mockLoadBalancer).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Arrays.asList(addressGroup))
.setAttributes(Attributes.EMPTY)
.setAttributes(EMPTY_SERVICE_CONFIG_ATTRIBUTES)
.build());

SubchannelStateListener stateListener1 = mock(SubchannelStateListener.class);
Expand Down Expand Up @@ -1101,6 +1106,7 @@ public void firstResolvedServerFailedToConnect() throws Exception {
inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Arrays.asList(addressGroup))
.setAttributes(EMPTY_SERVICE_CONFIG_ATTRIBUTES)
.build());
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Expand Down Expand Up @@ -1250,6 +1256,7 @@ public void allServersFailedToConnect() throws Exception {
inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Arrays.asList(addressGroup))
.setAttributes(EMPTY_SERVICE_CONFIG_ATTRIBUTES)
.build());
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Expand Down Expand Up @@ -2762,15 +2769,15 @@ public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends() throws Exc
.setAttributes(Attributes.EMPTY)
.build();
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult1);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);

prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);

prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);

prevSize = getStats(channel).channelTrace.events.size();
ResolutionResult resolutionResult2 = ResolutionResult.newBuilder()
Expand Down Expand Up @@ -2804,6 +2811,7 @@ public void channelTracing_serviceConfigChange() throws Exception {
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setAttributes(attributes)
.setServiceConfig(ConfigOrError.fromConfig(new HashMap<String, Object>()))
.build();
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult1);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
Expand All @@ -2820,9 +2828,10 @@ public void channelTracing_serviceConfigChange() throws Exception {
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setAttributes(attributes)
.setServiceConfig(ConfigOrError.fromConfig(new HashMap<String, Object>()))
.build();
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult2);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);

prevSize = getStats(channel).channelTrace.events.size();
Map<String, Object> serviceConfig = new HashMap<>();
Expand All @@ -2837,6 +2846,7 @@ public void channelTracing_serviceConfigChange() throws Exception {
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setAttributes(attributes)
.setServiceConfig(ConfigOrError.fromConfig(new HashMap<String, Object>()))
.build();
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult3);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
Expand Down Expand Up @@ -3400,15 +3410,17 @@ public String getServiceAuthority() {
@Override
public void start(Listener2 listener) {
this.listener = listener;
ImmutableMap<String, Object> serviceConfig =
ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom");
listener.onResult(
ResolutionResult.newBuilder()
.setAddresses(addresses)
.setAttributes(
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom"))
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig)
.build())
.setServiceConfig(ConfigOrError.fromConfig(serviceConfig))
.build());
}

Expand Down Expand Up @@ -3461,15 +3473,17 @@ protected ClientTransportFactory buildTransportFactory() {

// ok the service config is bad, let's fix it.

ImmutableMap<String, Object> serviceConfig =
ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin");
factory.resolver.listener.onResult(
ResolutionResult.newBuilder()
.setAddresses(addresses)
.setAttributes(
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin"))
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig)
.build())
.setServiceConfig(ConfigOrError.fromConfig(serviceConfig))
.build());

ClientCall<Void, Void> call2 = mychannel.newCall(
Expand Down Expand Up @@ -3886,7 +3900,7 @@ public void enableServiceConfigLookUp_resolverReturnsNoConfig_noDefaultConfig()
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup);
Attributes actualAttrs = resultCaptor.getValue().getAttributes();
assertThat(actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)).isNull();
assertThat(actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)).isEmpty();
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
Expand Down Expand Up @@ -4048,11 +4062,16 @@ void resolved() {
listener.onError(error);
return;
}
listener.onResult(
Attributes attr = nextResolvedAttributes.get();
Map<String, ?> config = attr.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
ResolutionResult.Builder builder =
ResolutionResult.newBuilder()
.setAddresses(servers)
.setAttributes(nextResolvedAttributes.get())
.build());
.setAttributes(attr);
if (config != null) {
builder.setServiceConfig(ConfigOrError.fromConfig(config));
}
listener.onResult(builder.build());
}

@Override public void shutdown() {
Expand Down

0 comments on commit b272f45

Please sign in to comment.