diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 3202bb703ae..31d4d318f71 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -1843,7 +1843,7 @@ public void handleEdsResponse( private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment) throws ResourceInvalidException { - Set priorities = new HashSet<>(); + Map> priorities = new HashMap<>(); Map localityLbEndpointsMap = new LinkedHashMap<>(); List dropOverloads = new ArrayList<>(); int maxPriority = -1; @@ -1859,14 +1859,20 @@ private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assi } LocalityLbEndpoints localityLbEndpoints = structOrError.getStruct(); - maxPriority = Math.max(maxPriority, localityLbEndpoints.priority()); - priorities.add(localityLbEndpoints.priority()); + int priority = localityLbEndpoints.priority(); + maxPriority = Math.max(maxPriority, priority); // Note endpoints with health status other than HEALTHY and UNKNOWN are still // handed over to watching parties. It is watching parties' responsibility to // filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy(). - localityLbEndpointsMap.put( - parseLocality(localityLbEndpointsProto.getLocality()), - localityLbEndpoints); + Locality locality = parseLocality(localityLbEndpointsProto.getLocality()); + localityLbEndpointsMap.put(locality, localityLbEndpoints); + if (!priorities.containsKey(priority)) { + priorities.put(priority, new HashSet<>()); + } + if (!priorities.get(priority).add(locality)) { + throw new ResourceInvalidException("ClusterLoadAssignment has duplicate locality:" + + locality + " for priority:" + priority); + } } if (priorities.size() != maxPriority + 1) { throw new ResourceInvalidException("ClusterLoadAssignment has sparse priorities"); diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index bd662b1da0d..5e57add5c13 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -2342,6 +2342,29 @@ public void edsResourceUpdated() { verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); } + @Test + public void edsDuplicateLocalityInTheSamePriority() { + DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher); + verifyResourceMetadataRequested(EDS, EDS_RESOURCE); + + // Updated EDS response. + Any updatedClusterLoadAssignment = Any.pack(mf.buildClusterLoadAssignment(EDS_RESOURCE, + ImmutableList.of( + mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", + mf.buildLbEndpoint("172.44.2.2", 8000, "unknown", 3), 2, 1), + mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", + mf.buildLbEndpoint("172.44.2.3", 8080, "healthy", 10), 2, 1) + ), + ImmutableList.of())); + call.sendResponse(EDS, updatedClusterLoadAssignment, "0", "0001"); + String errorMsg = "EDS response ClusterLoadAssignment" + + " \'cluster-load-assignment.googleapis.com\' " + + "validation error: ClusterLoadAssignment has duplicate " + + "locality:Locality{region=region2, zone=zone2, subZone=subzone2} for priority:1"; + call.verifyRequestNack(EDS, EDS_RESOURCE, "", "0001", NODE, ImmutableList.of( + errorMsg)); + } + @Test public void edsResourceDeletedByCds() { String resource = "backend-service.googleapis.com";