diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java index 64d7fd7c3..dafc3c1e9 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java @@ -280,9 +280,14 @@ public DeltaWatch createDeltaWatch( return watch; } } else if (hasClusterChanged && requestResourceType.equals(ResourceType.ENDPOINT)) { - ResponseState responseState = respondDeltaTracked( + Map> snapshotResources = snapshot.resources(request.getResourceType()); + List removedResources = findRemovedResources(watch, + snapshotResources); + Map> changedResources = findChangedResources(watch, snapshotResources); + ResponseState responseState = respondDelta( watch, - snapshot.resources(request.getResourceType()), + changedResources, + removedResources, version, group); if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { @@ -304,8 +309,13 @@ public DeltaWatch createDeltaWatch( } // Otherwise, version is different, the watch may be responded immediately - ResponseState responseState = respondDeltaTracked(watch, - snapshot.resources(request.getResourceType()), + Map> snapshotResources = snapshot.resources(request.getResourceType()); + List removedResources = findRemovedResources(watch, + snapshotResources); + Map> changedResources = findChangedResources(watch, snapshotResources); + ResponseState responseState = respondDelta(watch, + changedResources, + removedResources, version, group); if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { @@ -470,8 +480,10 @@ protected void respondWithSpecificOrder(T group, .filter(s -> watch.trackedResources().get(s) != null) .collect(Collectors.toList()); - ResponseState responseState = respondDeltaTracked(watch, - snapshotChangedResources, + Map> changedResources = findChangedResources(watch, snapshotChangedResources); + + ResponseState responseState = respondDelta(watch, + changedResources, removedResources, version, group); @@ -551,22 +563,30 @@ private boolean respond(Watch watch, U snapshot, T group) { return false; } - /** - * Responds a delta watch using resource version comparison. - * - * @return if the watch has been responded. - */ - private ResponseState respondDeltaTracked(DeltaWatch watch, - Map> snapshotResources, - String version, - T group) { + private List findRemovedResources(DeltaWatch watch, Map> snapshotResources) { // remove resources for which client has a tracked version but do not exist in snapshot - List removedResources = watch.trackedResources().keySet() + return watch.trackedResources().keySet() .stream() .filter(s -> !snapshotResources.containsKey(s)) .collect(Collectors.toList()); + } - return respondDeltaTracked(watch, snapshotResources, removedResources, version, group); + private Map> findChangedResources(DeltaWatch watch, + Map> snapshotResources) { + return snapshotResources.entrySet() + .stream() + .filter(entry -> { + if (watch.pendingResources().contains(entry.getKey())) { + return true; + } + String resourceVersion = watch.trackedResources().get(entry.getKey()); + if (resourceVersion == null) { + // resource is not tracked, should respond it only if watch is wildcard + return watch.isWildcard(); + } + return !entry.getValue().version().equals(resourceVersion); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } private ResponseState respondDeltaTracked(DeltaWatch watch,