From 6866bf3322e142cfb0ba9471d5e59f498e5c4299 Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Thu, 7 Jul 2022 02:21:39 -0700 Subject: [PATCH] Implement ignore_resource_deletion resource handling logic --- .../java/io/grpc/xds/ClientXdsClient.java | 100 ++++++++++++------ 1 file changed, 70 insertions(+), 30 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index f7f0d9e0623..8fd18f7e719 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -2156,8 +2156,7 @@ public void run() { ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { - subscriber.stopTimer(); - logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName); + subscriber.cancelResourceWatch(); ldsResourceSubscribers.remove(resourceName); if (subscriber.xdsChannel != null) { subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS); @@ -2194,8 +2193,7 @@ public void run() { ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { - subscriber.stopTimer(); - logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName); + subscriber.cancelResourceWatch(); rdsResourceSubscribers.remove(resourceName); if (subscriber.xdsChannel != null) { subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS); @@ -2232,8 +2230,7 @@ public void run() { ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { - subscriber.stopTimer(); - logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName); + subscriber.cancelResourceWatch(); cdsResourceSubscribers.remove(resourceName); if (subscriber.xdsChannel != null) { subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS); @@ -2270,8 +2267,7 @@ public void run() { ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { - subscriber.stopTimer(); - logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName); + subscriber.cancelResourceWatch(); edsResourceSubscribers.remove(resourceName); if (subscriber.xdsChannel != null) { subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS); @@ -2320,7 +2316,7 @@ public void run() { Bootstrapper.BootstrapInfo getBootstrapInfo() { return bootstrapInfo; } - + @Override public String toString() { return logId.toString(); @@ -2370,29 +2366,17 @@ private void handleResourceUpdate( } else if (type == ResourceType.LDS || type == ResourceType.CDS) { if (subscriber.data != null && invalidResources.contains(resourceName)) { // Update is rejected but keep using the cached data. - if (type == ResourceType.LDS) { - LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data; - io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager(); - if (hcm != null) { - String rdsName = hcm.rdsName(); - if (rdsName != null) { - retainedResources.add(rdsName); - } - } - } else { - CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data; - String edsName = cdsUpdate.edsServiceName(); - if (edsName == null) { - edsName = cdsUpdate.clusterName(); - } - retainedResources.add(edsName); - } + retainDependentResource(subscriber, retainedResources); } else if (invalidResources.contains(resourceName)) { subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail)); } else { // For State of the World services, notify watchers when their watched resource is missing // from the ADS update. subscriber.onAbsent(); + // Retain any dependent resources if the resource deletion is ignored per server setting. + if (!subscriber.absent) { + retainDependentResource(subscriber, retainedResources); + } } } } @@ -2409,6 +2393,28 @@ private void handleResourceUpdate( } } + private void retainDependentResource( + ResourceSubscriber subscriber, Set retainedResources) { + if (subscriber.data == null) { + return; + } + String resourceName = null; + if (subscriber.type == ResourceType.LDS) { + LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data; + io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager(); + if (hcm != null) { + resourceName = hcm.rdsName(); + } + } else if (subscriber.type == ResourceType.CDS) { + CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data; + resourceName = cdsUpdate.edsServiceName(); + } + + if (resourceName != null) { + retainedResources.add(resourceName); + } + } + private static final class ParsedResource { private final ResourceUpdate resourceUpdate; private final Any rawResource; @@ -2431,15 +2437,18 @@ private Any getRawResource() { * Tracks a single subscribed resource. */ private final class ResourceSubscriber { - private final ServerInfo serverInfo; + @Nullable private final ServerInfo serverInfo; @Nullable private final AbstractXdsClient xdsChannel; private final ResourceType type; private final String resource; private final Set watchers = new HashSet<>(); - private ResourceUpdate data; + @Nullable private ResourceUpdate data; private boolean absent; - private ScheduledHandle respTimer; - private ResourceMetadata metadata; + // Tracks whether the deletion has been ignored per server request. + // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md + private boolean resourceDeletionIgnored; + @Nullable private ScheduledHandle respTimer; + @Nullable private ResourceMetadata metadata; @Nullable private String errorDescription; ResourceSubscriber(ResourceType type, String resource) { @@ -2533,6 +2542,19 @@ void stopTimer() { } } + void cancelResourceWatch() { + if (isWatched()) { + throw new IllegalStateException("Can't cancel resource watch with active watchers present"); + } + stopTimer(); + String message = "Unsubscribing {0} resource {1} from server {2}"; + if (resourceDeletionIgnored) { + message += " for which we previously ignored a deletion"; + } + logger.log(XdsLogLevel.INFO, message, type, resource, + serverInfo != null ? serverInfo.target() : "unknown"); + } + boolean isWatched() { return !watchers.isEmpty(); } @@ -2547,6 +2569,12 @@ void onData(ParsedResource parsedResource, String version, long updateTime) { ResourceUpdate oldData = this.data; this.data = parsedResource.getResourceUpdate(); absent = false; + if (resourceDeletionIgnored) { + logger.log(XdsLogLevel.INFO, "xds server {0}: server returned new version of resource " + + "for which we previously ignored a deletion: type {1} name {2}", + serverInfo != null ? serverInfo.target() : "unknown", type, resource); + resourceDeletionIgnored = false; + } if (!Objects.equals(oldData, data)) { for (ResourceWatcher watcher : watchers) { notifyWatcher(watcher, data); @@ -2558,6 +2586,18 @@ void onAbsent() { if (respTimer != null && respTimer.isPending()) { // too early to conclude absence return; } + + // Ignore deletion when the server instructs to, and the resource is reusable. + if (data != null && serverInfo != null && serverInfo.ignoreResourceDeletion()) { + if (!resourceDeletionIgnored) { + logger.log(XdsLogLevel.WARNING, + "xds server {0}: ignoring deletion for resource type {1} name {2}}", + serverInfo.target(), type, resource); + resourceDeletionIgnored = true; + } + return; + } + logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); if (!absent) { data = null;