Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
21352b7
MAINTAINERS.md: Add Shiva
shivaspeaks Aug 11, 2025
458735d
Merge branch 'grpc:master' into master
shivaspeaks Sep 30, 2025
4f064fa
Update XdsTrustManagerFactoryTest.java
shivaspeaks Sep 30, 2025
0f96110
refactor and add documentation
shivaspeaks Oct 13, 2025
d7ef02a
Merge branch 'master' of https://github.com/grpc/grpc-java
shivaspeaks Oct 15, 2025
405217b
Merge branch 'grpc:master' into master
shivaspeaks Oct 22, 2025
587969d
Merge branch 'grpc:master' into master
shivaspeaks Oct 24, 2025
b1d0d6d
xds: client watcher API changes
shivaspeaks Oct 24, 2025
3fcff8d
xds: client watcher API changes
shivaspeaks Oct 24, 2025
bf905c2
xds: client watcher API changes
shivaspeaks Oct 24, 2025
4f5f3b3
xds: client watcher API changes
shivaspeaks Oct 24, 2025
494a75a
xds: client watcher API changes
shivaspeaks Oct 27, 2025
a5117ad
xds: client watcher API changes
shivaspeaks Oct 27, 2025
352ec9e
xds: client watcher API changes
shivaspeaks Oct 27, 2025
148f105
xds: client watcher API changes
shivaspeaks Oct 27, 2025
0c78954
xds: client watcher API changes
shivaspeaks Oct 27, 2025
b5571d1
xds: client watcher API changes
shivaspeaks Oct 27, 2025
c3a43b2
watcher api changes
shivaspeaks Oct 27, 2025
910b20b
watcher api changes
shivaspeaks Oct 27, 2025
319c113
Merge pull request #3 from shivaspeaks/client-watcher-changes-2
shivaspeaks Oct 27, 2025
2baf738
watcher api changes
shivaspeaks Oct 27, 2025
a51ac28
Merge branch 'master' of https://github.com/shivaspeaks/grpc-java
shivaspeaks Oct 27, 2025
3779aea
Merge branch 'client-watcher-changes-2' of https://github.com/shivasp…
shivaspeaks Oct 27, 2025
4ba6de4
address comments
shivaspeaks Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cronet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ dependencies {

task javadocs(type: Javadoc) {
source = android.sourceSets.main.java.srcDirs
classpath += files(android.getBootClasspath())
// classpath += files(android.getBootClasspath())
classpath += files({
android.libraryVariants.collect { variant ->
variant.javaCompileProvider.get().classpath
Expand Down
48 changes: 24 additions & 24 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,9 @@ private abstract class XdsWatcherBase<T extends ResourceUpdate>

@Nullable
private StatusOr<T> data;
@Nullable
@SuppressWarnings("unused")
private Status ambientError;


private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
Expand All @@ -640,42 +643,39 @@ private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
}

@Override
public void onError(Status error) {
checkNotNull(error, "error");
public void onResourceChanged(StatusOr<T> update) {
if (cancelled) {
return;
}
// Don't update configuration on error, if we've already received configuration
if (!hasDataValue()) {
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s: %s",
toContextString(), error.getCode(), error.getDescription())));
maybePublishConfig();
}
}
ambientError = null;
if (update.hasValue()) {
data = update;
subscribeToChildren(update.getValue());
} else {
Status status = update.getStatus();
Status translatedStatus = Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s. Details: %s%s",
toContextString(),
status.getCode(),
status.getDescription() != null ? status.getDescription() : "",
nodeInfo()));

@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
data = StatusOr.fromStatus(translatedStatus);
}

checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
toContextString() + " does not exist" + nodeInfo()));
maybePublishConfig();
}

@Override
public void onChanged(T update) {
checkNotNull(update, "update");
public void onAmbientError(Status error) {
if (cancelled) {
return;
}

this.data = StatusOr.fromValue(update);
subscribeToChildren(update);
maybePublishConfig();
ambientError = error.withDescription(
String.format("Ambient error for %s: %s. Details: %s%s",
toContextString(),
error.getCode(),
error.getDescription() != null ? error.getDescription() : "",
nodeInfo()));
}

protected abstract void subscribeToChildren(T update);
Expand Down
135 changes: 62 additions & 73 deletions xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.GrpcUtil;
Expand Down Expand Up @@ -382,18 +383,30 @@ private DiscoveryState(String resourceName) {
}

@Override
public void onChanged(final LdsUpdate update) {
public void onResourceChanged(final StatusOr<LdsUpdate> update) {
if (stopped) {
return;
}
logger.log(Level.FINEST, "Received Lds update {0}", update);
if (update.listener() == null) {
onResourceDoesNotExist("Non-API");

if (!update.hasValue()) {
Status status = update.getStatus();
StatusException statusException = Status.UNAVAILABLE.withDescription(
String.format("Listener %s unavailable: %s", resourceName, status.getDescription()))
.withCause(status.asException())
.asException();
handleConfigNotFoundOrMismatch(statusException);
return;
}

String ldsAddress = update.listener().address();
if (ldsAddress == null || update.listener().protocol() != Protocol.TCP
final LdsUpdate ldsUpdate = update.getValue();
logger.log(Level.FINEST, "Received Lds update {0}", ldsUpdate);
if (ldsUpdate.listener() == null) {
handleConfigNotFoundOrMismatch(
Status.NOT_FOUND.withDescription("Listener is null in LdsUpdate").asException());
return;
}
String ldsAddress = ldsUpdate.listener().address();
if (ldsAddress == null || ldsUpdate.listener().protocol() != Protocol.TCP
|| !ipAddressesMatch(ldsAddress)) {
handleConfigNotFoundOrMismatch(
Status.UNKNOWN.withDescription(
Expand All @@ -402,16 +415,14 @@ public void onChanged(final LdsUpdate update) {
listenerAddress, ldsAddress)).asException());
return;
}

if (!pendingRds.isEmpty()) {
// filter chain state has not yet been applied to filterChainSelectorManager and there
// are two sets of sslContextProviderSuppliers, so we release the old ones.
releaseSuppliersInFlight();
pendingRds.clear();
}

filterChains = update.listener().filterChains();
defaultFilterChain = update.listener().defaultFilterChain();
// Filters are loaded even if the server isn't serving yet.
filterChains = ldsUpdate.listener().filterChains();
defaultFilterChain = ldsUpdate.listener().defaultFilterChain();
updateActiveFilters();

List<FilterChain> allFilterChains = filterChains;
Expand Down Expand Up @@ -450,43 +461,33 @@ public void onChanged(final LdsUpdate update) {
}
}

private boolean ipAddressesMatch(String ldsAddress) {
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
return false;
}
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
return listenerIp.equals(ldsIp);
}

@Override
public void onResourceDoesNotExist(final String resourceName) {
if (stopped) {
return;
}
StatusException statusException = Status.UNAVAILABLE.withDescription(
String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
xdsClient.getBootstrapInfo().node().getId())).asException();
handleConfigNotFoundOrMismatch(statusException);
}

@Override
public void onError(final Status error) {
public void onAmbientError(final Status error) {
if (stopped) {
return;
}
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);

if (!isServing) {
listener.onNotServing(errorWithNodeId.asException());
}
}

private boolean ipAddressesMatch(String ldsAddress) {
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
return false;
}
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
return listenerIp.equals(ldsIp);
}

private void shutdown() {
stopped = true;
cleanUpRouteDiscoveryStates();
Expand Down Expand Up @@ -775,54 +776,42 @@ private RouteDiscoveryState(String resourceName) {
}

@Override
public void onChanged(final RdsUpdate update) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
}
if (savedVirtualHosts == null && !isPending) {
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
}
savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
updateRdsRoutingConfig();
maybeUpdateSelector();
public void onResourceChanged(final StatusOr<RdsUpdate> update) {
syncContext.execute(() -> {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return; // Watcher has been cancelled.
}
});
}

@Override
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
if (update.hasValue()) {
if (savedVirtualHosts == null && !isPending) {
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
}
logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
savedVirtualHosts = ImmutableList.copyOf(update.getValue().virtualHosts);
} else {
logger.log(Level.WARNING, "Rds {0} unavailable: {1}",
new Object[]{resourceName, update.getStatus()});
savedVirtualHosts = null;
updateRdsRoutingConfig();
maybeUpdateSelector();
}
// In both cases, a change has occurred that requires a config update.
updateRdsRoutingConfig();
maybeUpdateSelector();
});
}

@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
}
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
new Object[]{resourceName, errorWithNodeId});
maybeUpdateSelector();
public void onAmbientError(final Status error) {
syncContext.execute(() -> {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return; // Watcher has been cancelled.
}
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
new Object[]{resourceName, errorWithNodeId});

// Per gRFC A88, ambient errors should not trigger a configuration change.
// Therefore, we do NOT call maybeUpdateSelector() here.
});
}

Expand Down
4 changes: 3 additions & 1 deletion xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
if (serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION)) {
ignoreResourceDeletion = true;
}
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
}
Expand Down
41 changes: 13 additions & 28 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,44 +453,29 @@ private void handleRpcStreamClosed(Status status) {
stopwatch.reset();
}

Status newStatus = status;
if (responseReceived) {
// A closed ADS stream after a successful response is not considered an error. Servers may
// close streams for various reasons during normal operation, such as load balancing or
// underlying connection hitting its max connection age limit (see gRFC A9).
if (!status.isOk()) {
newStatus = Status.OK;
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
+ "response was received, so this will not be treated as an error. Cause: {2}",
status.getCode(), status.getDescription(), status.getCause());
} else {
logger.log(XdsLogLevel.DEBUG,
"ADS stream closed by server after a response was received");
}
} else {
// If the ADS stream is closed without ever having received a response from the server, then
// the XdsClient should consider that a connectivity error (see gRFC A57).
Status statusToPropagate = status;
if (!responseReceived && status.isOk()) {
// If the ADS stream is closed with OK without ever having received a response,
// it is a connectivity error (see gRFC A57).
statusToPropagate = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
}
if (!statusToPropagate.isOk()) {
inError = true;
if (status.isOk()) {
newStatus = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
}
logger.log(
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
logger.log(XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
statusToPropagate.getCode(), statusToPropagate.getDescription(),
statusToPropagate.getCause());
}

close(newStatus.asException());

// FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
// to avoid TSAN races, since tests may wait until callbacks are called but then would run
// concurrently with the stopwatch and schedule.
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
close(status.asException());
rpcRetryTimer =
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);

xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived);
xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived);
}

private void close(Exception error) {
Expand Down
Loading