Skip to content

Commit

Permalink
xds: fix ServerXdsClient to return subscribed resources only for LDS (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sanjaypujare authored Dec 10, 2020
1 parent e8d2188 commit 821a953
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 17 deletions.
36 changes: 22 additions & 14 deletions xds/src/main/java/io/grpc/xds/ServerXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,25 @@ final class ServerXdsClient extends AbstractXdsClient {
@Nullable
private ListenerWatcher listenerWatcher;
private int listenerPort = -1;
private final boolean newServerApi;
private final boolean useNewApiForListenerQuery;
@Nullable private final String instanceIp;
private String grpcServerResourceId;
@Nullable
private ScheduledHandle ldsRespTimer;

ServerXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier,
boolean newServerApi, String instanceIp, String grpcServerResourceId) {
ServerXdsClient(
XdsChannel channel,
Node node,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier,
boolean useNewApiForListenerQuery,
String instanceIp,
String grpcServerResourceId) {
super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier);
this.newServerApi = channel.isUseProtocolV3() && newServerApi;
this.useNewApiForListenerQuery = channel.isUseProtocolV3() && useNewApiForListenerQuery;
this.instanceIp = (instanceIp != null ? instanceIp : "0.0.0.0");
this.grpcServerResourceId =
(grpcServerResourceId != null) ? grpcServerResourceId : "grpc/server";
this.grpcServerResourceId = grpcServerResourceId != null ? grpcServerResourceId : "grpc/server";
}

@Override
Expand All @@ -78,7 +83,7 @@ void watchListenerData(final int port, final ListenerWatcher watcher) {
listenerWatcher = checkNotNull(watcher, "watcher");
checkArgument(port > 0, "port needs to be > 0");
listenerPort = port;
if (newServerApi) {
if (useNewApiForListenerQuery) {
String listeningAddress = instanceIp + ":" + listenerPort;
grpcServerResourceId =
grpcServerResourceId + "?udpa.resource.listening_address=" + listeningAddress;
Expand All @@ -89,7 +94,7 @@ void watchListenerData(final int port, final ListenerWatcher watcher) {
@Override
public void run() {
getLogger().log(XdsLogLevel.INFO, "Started watching listener for port {0}", port);
if (!newServerApi) {
if (!useNewApiForListenerQuery) {
updateNodeMetadataForListenerRequest(port);
}
adjustResourceSubscription(ResourceType.LDS);
Expand All @@ -107,7 +112,10 @@ public void run() {
@Nullable
@Override
Collection<String> getSubscribedResources(ResourceType type) {
if (newServerApi) {
if (type != ResourceType.LDS) {
return null;
}
if (useNewApiForListenerQuery) {
return ImmutableList.<String>of(grpcServerResourceId);
} else {
return Collections.emptyList();
Expand Down Expand Up @@ -175,17 +183,17 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
}

private boolean isRequestedListener(Listener listener) {
if (newServerApi) {
if (useNewApiForListenerQuery) {
return grpcServerResourceId.equals(listener.getName())
&& listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
&& isAddressMatching(listener.getAddress(), listenerPort);
&& listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
&& isAddressMatching(listener.getAddress(), listenerPort);
}
return isAddressMatching(listener.getAddress(), 15001)
&& hasMatchingFilter(listener.getFilterChainsList());
}

private boolean isAddressMatching(Address address, int portToMatch) {
return address.hasSocketAddress() && (address.getSocketAddress().getPortValue() == portToMatch);
return address.hasSocketAddress() && address.getSocketAddress().getPortValue() == portToMatch;
}

private boolean hasMatchingFilter(List<FilterChain> filterChainsList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public boolean shouldAccept(Runnable command) {
private ListenerWatcher listenerWatcher;

private ManagedChannel channel;
private XdsClient xdsClient;
private ServerXdsClient xdsClient;

@Before
public void setUp() throws IOException {
Expand Down Expand Up @@ -531,6 +531,7 @@ public void streamClosedAndRetry() {
.onNext(eq(buildDiscoveryRequest(NODE, "0",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(requestObserver);

// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
Expand All @@ -551,6 +552,7 @@ public void streamClosedAndRetry() {
.onNext(eq(buildDiscoveryRequest(NODE, "0",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(requestObserver);

// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
Expand All @@ -571,6 +573,7 @@ public void streamClosedAndRetry() {
.onNext(eq(buildDiscoveryRequest(NODE, "0",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(requestObserver);

// Management server sends back a LDS response.
response = buildDiscoveryResponse("1", listeners,
Expand All @@ -595,6 +598,7 @@ public void streamClosedAndRetry() {
.onNext(eq(buildDiscoveryRequest(NODE, "1",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(requestObserver);

// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
Expand All @@ -616,7 +620,7 @@ public void streamClosedAndRetry() {
ResourceType.LDS.typeUrl(), "")));

verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
backoffPolicy2, requestObserver);
}

static Listener buildListenerWithFilterChain(String name, int portValue, String address,
Expand Down
7 changes: 6 additions & 1 deletion xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ public void streamClosedAndRetry() {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);

final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
Expand Down Expand Up @@ -675,6 +676,7 @@ public void streamClosedAndRetry() {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);

// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
Expand All @@ -694,6 +696,7 @@ public void streamClosedAndRetry() {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);

// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
Expand All @@ -713,6 +716,7 @@ public void streamClosedAndRetry() {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);

// Management server sends back a LDS response.
response = buildDiscoveryResponseV2("1", listeners,
Expand All @@ -736,6 +740,7 @@ public void streamClosedAndRetry() {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);

// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
Expand All @@ -756,7 +761,7 @@ public void streamClosedAndRetry() {
ResourceType.LDS.typeUrlV2(), "")));

verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
backoffPolicy2, requestObserver);
}

static Listener buildListenerWithFilterChain(String name, int portValue, String address,
Expand Down

0 comments on commit 821a953

Please sign in to comment.