Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Schepens <sebastian.schepens@mercadolibre.com>
  • Loading branch information
sschepens committed Oct 23, 2020
1 parent f6b5f3b commit c75d114
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 24 deletions.
28 changes: 15 additions & 13 deletions cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,21 @@ public static class V3 {
ROUTE,
SECRET);

public static final Map<String, String> V3_TYPE_URLS_TO_V2 = ImmutableMap.of(
Resources.V3.CLUSTER_TYPE_URL, Resources.V2.CLUSTER_TYPE_URL,
Resources.V3.ENDPOINT_TYPE_URL, Resources.V2.ENDPOINT_TYPE_URL,
Resources.V3.LISTENER_TYPE_URL, Resources.V2.LISTENER_TYPE_URL,
Resources.V3.ROUTE_TYPE_URL, Resources.V2.ROUTE_TYPE_URL,
Resources.V3.SECRET_TYPE_URL, Resources.V2.SECRET_TYPE_URL);

public static final Map<String, String> V2_TYPE_URLS_TO_V3 = ImmutableMap.of(
Resources.V2.CLUSTER_TYPE_URL, Resources.V3.CLUSTER_TYPE_URL,
Resources.V2.ENDPOINT_TYPE_URL, Resources.V3.ENDPOINT_TYPE_URL,
Resources.V2.LISTENER_TYPE_URL, Resources.V3.LISTENER_TYPE_URL,
Resources.V2.ROUTE_TYPE_URL, Resources.V3.ROUTE_TYPE_URL,
Resources.V2.SECRET_TYPE_URL, Resources.V3.SECRET_TYPE_URL);
public static final Map<String, String> V3_TYPE_URLS_TO_V2 = ImmutableMap.<String, String>builder()
.put(Resources.V3.CLUSTER_TYPE_URL, Resources.V2.CLUSTER_TYPE_URL)
.put(Resources.V3.ENDPOINT_TYPE_URL, Resources.V2.ENDPOINT_TYPE_URL)
.put(Resources.V3.LISTENER_TYPE_URL, Resources.V2.LISTENER_TYPE_URL)
.put(Resources.V3.ROUTE_TYPE_URL, Resources.V2.ROUTE_TYPE_URL)
.put(Resources.V3.SECRET_TYPE_URL, Resources.V2.SECRET_TYPE_URL)
.build();

public static final Map<String, String> V2_TYPE_URLS_TO_V3 = ImmutableMap.<String, String>builder()
.put(Resources.V2.CLUSTER_TYPE_URL, Resources.V3.CLUSTER_TYPE_URL)
.put(Resources.V2.ENDPOINT_TYPE_URL, Resources.V3.ENDPOINT_TYPE_URL)
.put(Resources.V2.LISTENER_TYPE_URL, Resources.V3.LISTENER_TYPE_URL)
.put(Resources.V2.ROUTE_TYPE_URL, Resources.V3.ROUTE_TYPE_URL)
.put(Resources.V2.SECRET_TYPE_URL, Resources.V3.SECRET_TYPE_URL)
.build();

public static final Map<String, ResourceType> TYPE_URLS_TO_RESOURCE_TYPE =
new ImmutableMap.Builder<String, ResourceType>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,11 @@ public Collection<T> groups() {
public synchronized void setSnapshot(T group, U snapshot) {
// we take a writeLock to prevent watches from being created while we update the snapshot
ConcurrentMap<ResourceType, CacheStatusInfo<T>> status;
U previousSnapshot;
writeLock.lock();
try {
// Update the existing snapshot entry.
snapshots.put(group, snapshot);
previousSnapshot = snapshots.put(group, snapshot);
status = statuses.get(group);
} finally {
writeLock.unlock();
Expand All @@ -379,7 +380,7 @@ public synchronized void setSnapshot(T group, U snapshot) {

// Responses should be in specific order and typeUrls has a list of resources in the right
// order.
respondWithSpecificOrder(group, snapshot, status);
respondWithSpecificOrder(group, previousSnapshot, snapshot, status);
}

/**
Expand All @@ -403,7 +404,7 @@ public StatusInfo statusInfo(T group) {

@VisibleForTesting
protected void respondWithSpecificOrder(T group,
U snapshot,
U previousSnapshot, U snapshot,
ConcurrentMap<ResourceType, CacheStatusInfo<T>> statusMap) {
for (ResourceType resourceType : RESOURCE_TYPES_IN_ORDER) {
CacheStatusInfo<T> status = statusMap.get(resourceType);
Expand Down Expand Up @@ -435,6 +436,53 @@ protected void respondWithSpecificOrder(T group,
// Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
return false;
});

Map<String, SnapshotResource<?>> previousResources = previousSnapshot == null
? Collections.emptyMap()
: previousSnapshot.resources(resourceType);
Map<String, SnapshotResource<?>> snapshotResources = snapshot.resources(resourceType);

Map<String, SnapshotResource<?>> snapshotChangedResources = snapshotResources.entrySet()
.stream()
.filter(entry -> {
SnapshotResource<?> snapshotResource = previousResources.get(entry.getKey());
return snapshotResource == null || !snapshotResource.version().equals(entry.getValue().version());
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Set<String> snapshotRemovedResources = previousResources.keySet()
.stream()
.filter(s -> !snapshotResources.containsKey(s))
.collect(Collectors.toSet());

status.deltaWatchesRemoveIf((id, watch) -> {
String version = snapshot.version(watch.request().getResourceType(), Collections.emptyList());

if (!watch.version().equals(version)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("responding to open watch {}[{}] with new version {}",
id,
String.join(", ", watch.trackedResources().keySet()),
version);
}

List<String> removedResources = snapshotRemovedResources.stream()
.filter(s -> watch.trackedResources().get(s) != null)
.collect(Collectors.toList());

ResponseState responseState = respondDeltaTracked(watch,
snapshotChangedResources,
removedResources,
version,
group);
// Discard the watch if it was responded or cancelled.
// A new watch will be created for future snapshots once envoy ACKs the response.
return ResponseState.RESPONDED.equals(responseState) || ResponseState.CANCELLED.equals(responseState);
}

// Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
return false;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ Set<String> pendingResources(String typeUrl) {

@Override
boolean isWildcard(String typeUrl) {
return typeUrl.equals(Resources.V2.CLUSTER_TYPE_URL)
|| typeUrl.equals(Resources.V3.CLUSTER_TYPE_URL)
|| typeUrl.equals(Resources.V2.LISTENER_TYPE_URL)
|| typeUrl.equals(Resources.V3.LISTENER_TYPE_URL);
Resources.ResourceType resourceType = Resources.TYPE_URLS_TO_RESOURCE_TYPE.get(typeUrl);
return Resources.ResourceType.CLUSTER.equals(resourceType)
|| Resources.ResourceType.LISTENER.equals(resourceType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ public class XdsDeltaDiscoveryRequestStreamObserver<V, X, Y> extends DeltaDiscov
super(defaultTypeUrl, responseObserver, streamId, executor, discoveryServer);
this.trackedResources = new HashMap<>();
this.pendingResources = new HashSet<>();
this.isWildcard = defaultTypeUrl.equals(Resources.V2.CLUSTER_TYPE_URL)
|| defaultTypeUrl.equals(Resources.V3.CLUSTER_TYPE_URL)
|| defaultTypeUrl.equals(Resources.V2.LISTENER_TYPE_URL)
|| defaultTypeUrl.equals(Resources.V3.LISTENER_TYPE_URL);
Resources.ResourceType resourceType = Resources.TYPE_URLS_TO_RESOURCE_TYPE.get(defaultTypeUrl);
this.isWildcard = Resources.ResourceType.CLUSTER.equals(resourceType)
|| Resources.ResourceType.LISTENER.equals(resourceType);
this.responses = new ConcurrentHashMap<>();
}

Expand Down

0 comments on commit c75d114

Please sign in to comment.