Skip to content

Commit

Permalink
xds: update (local) configurations atomically for each LDS/RDS resour…
Browse files Browse the repository at this point in the history
…ce update (#8011)

Fixes inconsistent state of XdsNameResolver with most recently received xDS configurations. The full suite of configurations should always be updated whenever receiving new resource updates, including updates that revoke currently in-use resources. Reference counts for currently in-use clusters should also be cleaned up properly. Otherwise, re-receiving (after being revoked) the same resource can be treated as if the configuration never changed.
  • Loading branch information
voidzcy committed Mar 25, 2021
1 parent ccedd85 commit ad2b264
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 259 deletions.
126 changes: 76 additions & 50 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ final class XdsNameResolver extends NameResolver {
private final ThreadSafeRandom random;
private final FilterRegistry filterRegistry;
private final XxHash64 hashFunc = XxHash64.INSTANCE;
// Clusters (with reference counts) to which new/existing requests can be/are routed.
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
private final ConfigSelector configSelector = new ConfigSelector();

Expand Down Expand Up @@ -638,12 +639,10 @@ private class ResolveState implements LdsResourceWatcher {
// let channel take action for no config selector
.build();
private boolean stopped;
private Set<String> existingClusters;
@Nullable
private String rdsResource;
private Set<String> existingClusters; // clusters to which new requests can be routed
@Nullable
private RdsResourceWatcher rdsWatcher;
private LdsUpdate update;
private RouteDiscoveryState routeDiscoveryState;

@Override
public void onChanged(final LdsUpdate update) {
Expand All @@ -654,20 +653,16 @@ public void run() {
return;
}
logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
ResolveState.this.update = update;
List<VirtualHost> virtualHosts = update.virtualHosts;
String rdsName = update.rdsName;
if (rdsName != null && rdsName.equals(rdsResource)) {
return;
}
cleanUpRdsWatcher();
cleanUpRouteDiscoveryState();
if (virtualHosts != null) {
updateRoutes(virtualHosts);
updateRoutes(virtualHosts, update.httpMaxStreamDurationNano, update.filterChain);
} else {
rdsResource = rdsName;
rdsWatcher = new RdsResourceWatcherImpl();
logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsResource);
xdsClient.watchRdsResource(rdsResource, rdsWatcher);
routeDiscoveryState = new RouteDiscoveryState(
rdsName, update.httpMaxStreamDurationNano, update.filterChain);
logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
xdsClient.watchRdsResource(rdsName, routeDiscoveryState);
}
}
});
Expand All @@ -681,9 +676,6 @@ public void run() {
if (stopped) {
return;
}
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}", xdsClient, error.getDescription());
listener.onError(error);
}
});
Expand All @@ -698,8 +690,8 @@ public void run() {
return;
}
logger.log(XdsLogLevel.INFO, "LDS resource {0} unavailable", resourceName);
cleanUpRdsWatcher();
listener.onResult(emptyResult);
cleanUpRouteDiscoveryState();
cleanUpRoutes();
}
});
}
Expand All @@ -712,39 +704,43 @@ private void start() {
private void stop() {
logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", authority);
stopped = true;
cleanUpRdsWatcher();
cleanUpRouteDiscoveryState();
xdsClient.cancelLdsResourceWatch(authority, this);
}

private void updateRoutes(List<VirtualHost> virtualHosts) {
private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
@Nullable List<NamedFilterConfig> filterConfigs) {
VirtualHost virtualHost = findVirtualHostForHostName(virtualHosts, authority);
if (virtualHost == null) {
logger.log(XdsLogLevel.WARNING,
"Failed to find virtual host matching hostname {0}", authority);
listener.onResult(emptyResult);
cleanUpRoutes();
return;
}

// A router filter is required for request routing. For backward compatibility, routing
// is always enabled for gRPC clients without HttpFilter support.
List<Route> routes = virtualHost.routes();
List<NamedFilterConfig> filterChain = null;
if (update.filterChain != null) {
if (filterConfigs != null) {
boolean hasRouter = false;
filterChain = new ArrayList<>(update.filterChain.size());
for (NamedFilterConfig namedFilter : update.filterChain) {
filterChain = new ArrayList<>(filterConfigs.size());
for (NamedFilterConfig namedFilter : filterConfigs) {
filterChain.add(namedFilter);
if (namedFilter.filterConfig.equals(RouterFilter.ROUTER_CONFIG)) {
hasRouter = true;
break;
}
}
if (!hasRouter) {
// Fail all RPCs if a router filter is not present. Reference counts for all currently
// selectable clusters should be reclaimed.
filterChain.add(LAME_FILTER);
routingConfig = new RoutingConfig(
update.httpMaxStreamDurationNano, Collections.<Route>emptyList(), filterChain,
virtualHost.filterConfigOverrides());
updateResolutionResult();
return;
routes = Collections.emptyList();
}
}
List<Route> routes = virtualHost.routes();

// Populate all clusters to which requests can be routed to through the virtual host.
Set<String> clusters = new HashSet<>();
for (Route route : routes) {
RouteAction action = route.routeAction();
Expand All @@ -756,13 +752,15 @@ private void updateRoutes(List<VirtualHost> virtualHosts) {
}
}
}

// Updates channel's load balancing config whenever the set of selectable clusters changes.
boolean shouldUpdateResult = existingClusters == null;
Set<String> addedClusters =
existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
Set<String> deletedClusters =
existingClusters == null
? Collections.<String>emptySet() : Sets.difference(existingClusters, clusters);
existingClusters = clusters;
boolean shouldUpdateResult = false;
for (String cluster : addedClusters) {
if (clusterRefs.containsKey(cluster)) {
clusterRefs.get(cluster).incrementAndGet();
Expand All @@ -777,9 +775,10 @@ private void updateRoutes(List<VirtualHost> virtualHosts) {
}
// Make newly added clusters selectable by config selector and deleted clusters no longer
// selectable.
routingConfig = new RoutingConfig(
update.httpMaxStreamDurationNano, routes, filterChain,
virtualHost.filterConfigOverrides());
routingConfig =
new RoutingConfig(
httpMaxStreamDurationNano, routes, filterChain,
virtualHost.filterConfigOverrides());
shouldUpdateResult = false;
for (String cluster : deletedClusters) {
int count = clusterRefs.get(cluster).decrementAndGet();
Expand All @@ -793,26 +792,56 @@ private void updateRoutes(List<VirtualHost> virtualHosts) {
}
}

private void cleanUpRdsWatcher() {
if (rdsWatcher != null) {
logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsResource);
xdsClient.cancelRdsResourceWatch(rdsResource, rdsWatcher);
rdsResource = null;
rdsWatcher = null;
private void cleanUpRoutes() {
if (existingClusters != null) {
for (String cluster : existingClusters) {
int count = clusterRefs.get(cluster).decrementAndGet();
if (count == 0) {
clusterRefs.remove(cluster);
}
}
existingClusters = null;
}
routingConfig = RoutingConfig.empty;
listener.onResult(emptyResult);
}

private class RdsResourceWatcherImpl implements RdsResourceWatcher {
private void cleanUpRouteDiscoveryState() {
if (routeDiscoveryState != null) {
String rdsName = routeDiscoveryState.resourceName;
logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
xdsClient.cancelRdsResourceWatch(rdsName, routeDiscoveryState);
routeDiscoveryState = null;
}
}

/**
* Discovery state for RouteConfiguration resource. One instance for each Listener resource
* update.
*/
private class RouteDiscoveryState implements RdsResourceWatcher {
private final String resourceName;
private final long httpMaxStreamDurationNano;
@Nullable
private final List<NamedFilterConfig> filterConfigs;

private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
@Nullable List<NamedFilterConfig> filterConfigs) {
this.resourceName = resourceName;
this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
this.filterConfigs = filterConfigs;
}

@Override
public void onChanged(final RdsUpdate update) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (RdsResourceWatcherImpl.this != rdsWatcher) {
if (RouteDiscoveryState.this != routeDiscoveryState) {
return;
}
updateRoutes(update.virtualHosts);
logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs);
}
});
}
Expand All @@ -822,12 +851,9 @@ public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (RdsResourceWatcherImpl.this != rdsWatcher) {
if (RouteDiscoveryState.this != routeDiscoveryState) {
return;
}
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}", xdsClient, error.getDescription());
listener.onError(error);
}
});
Expand All @@ -838,11 +864,11 @@ public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (RdsResourceWatcherImpl.this != rdsWatcher) {
if (RouteDiscoveryState.this != routeDiscoveryState) {
return;
}
logger.log(XdsLogLevel.INFO, "RDS resource {0} unavailable", resourceName);
listener.onResult(emptyResult);
cleanUpRoutes();
}
});
}
Expand Down

0 comments on commit ad2b264

Please sign in to comment.