Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1843,7 +1843,7 @@ public void handleEdsResponse(

private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
throws ResourceInvalidException {
Set<Integer> priorities = new HashSet<>();
Map<Integer, Set<Locality>> priorities = new HashMap<>();
Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap = new LinkedHashMap<>();
List<DropOverload> dropOverloads = new ArrayList<>();
int maxPriority = -1;
Expand All @@ -1859,14 +1859,20 @@ private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assi
}

LocalityLbEndpoints localityLbEndpoints = structOrError.getStruct();
maxPriority = Math.max(maxPriority, localityLbEndpoints.priority());
priorities.add(localityLbEndpoints.priority());
int priority = localityLbEndpoints.priority();
maxPriority = Math.max(maxPriority, priority);
// Note endpoints with health status other than HEALTHY and UNKNOWN are still
// handed over to watching parties. It is watching parties' responsibility to
// filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy().
localityLbEndpointsMap.put(
parseLocality(localityLbEndpointsProto.getLocality()),
localityLbEndpoints);
Locality locality = parseLocality(localityLbEndpointsProto.getLocality());
localityLbEndpointsMap.put(locality, localityLbEndpoints);
if (!priorities.containsKey(priority)) {
priorities.put(priority, new HashSet<>());
}
if (!priorities.get(priority).add(locality)) {
throw new ResourceInvalidException("ClusterLoadAssignment has duplicate locality:"
+ locality + " for priority:" + priority);
}
}
if (priorities.size() != maxPriority + 1) {
throw new ResourceInvalidException("ClusterLoadAssignment has sparse priorities");
Expand Down
23 changes: 23 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2342,6 +2342,29 @@ public void edsResourceUpdated() {
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
}

@Test
public void edsDuplicateLocalityInTheSamePriority() {
DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher);
verifyResourceMetadataRequested(EDS, EDS_RESOURCE);

// Updated EDS response.
Any updatedClusterLoadAssignment = Any.pack(mf.buildClusterLoadAssignment(EDS_RESOURCE,
ImmutableList.of(
mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2",
mf.buildLbEndpoint("172.44.2.2", 8000, "unknown", 3), 2, 1),
mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2",
mf.buildLbEndpoint("172.44.2.3", 8080, "healthy", 10), 2, 1)
),
ImmutableList.<Message>of()));
call.sendResponse(EDS, updatedClusterLoadAssignment, "0", "0001");
String errorMsg = "EDS response ClusterLoadAssignment"
+ " \'cluster-load-assignment.googleapis.com\' "
+ "validation error: ClusterLoadAssignment has duplicate "
+ "locality:Locality{region=region2, zone=zone2, subZone=subzone2} for priority:1";
call.verifyRequestNack(EDS, EDS_RESOURCE, "", "0001", NODE, ImmutableList.of(
errorMsg));
}

@Test
public void edsResourceDeletedByCds() {
String resource = "backend-service.googleapis.com";
Expand Down