Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: manage load stats for all clusters in XdsClient (backport 1.31.x) #7317

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 18 additions & 27 deletions xds/src/main/java/io/grpc/xds/ClientLoadCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,10 @@ final class ClientLoadCounter {
private final AtomicLong callsIssued = new AtomicLong();
private final MetricRecorder[] metricRecorders = new MetricRecorder[THREAD_BALANCING_FACTOR];

// True if this counter continues to record stats after next snapshot. Otherwise, it will be
// discarded.
private boolean active;

ClientLoadCounter() {
for (int i = 0; i < THREAD_BALANCING_FACTOR; i++) {
metricRecorders[i] = new MetricRecorder();
}
active = true;
}

/**
* Must only be used for testing.
*/
@VisibleForTesting
ClientLoadCounter(long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued) {
this();
this.callsSucceeded.set(callsSucceeded);
this.callsInProgress.set(callsInProgress);
this.callsFailed.set(callsFailed);
this.callsIssued.set(callsIssued);
}

void recordCallStarted() {
Expand All @@ -98,12 +81,8 @@ void recordMetric(String name, double value) {
}

/**
* Generates a snapshot for load stats recorded in this counter. Successive snapshots represent
* load stats recorded for the interval since the previous snapshot. So taking a snapshot clears
* the counter state except for ongoing RPC recordings.
*
* <p>This method is not thread-safe and must be called from {@link
* io.grpc.LoadBalancer.Helper#getSynchronizationContext()}.
* Generates a snapshot for load stats recorded in this counter for the interval between calls
* of this method.
*/
ClientLoadSnapshot snapshot() {
Map<String, MetricValue> aggregatedValues = new HashMap<>();
Expand All @@ -127,12 +106,24 @@ ClientLoadSnapshot snapshot() {
aggregatedValues);
}

void setActive(boolean value) {
active = value;
@VisibleForTesting
void setCallsIssued(long callsIssued) {
this.callsIssued.set(callsIssued);
}

@VisibleForTesting
void setCallsInProgress(long callsInProgress) {
this.callsInProgress.set(callsInProgress);
}

boolean isActive() {
return active;
@VisibleForTesting
void setCallsSucceeded(long callsSucceeded) {
this.callsSucceeded.set(callsSucceeded);
}

@VisibleForTesting
void setCallsFailed(long callsFailed) {
this.callsFailed.set(callsFailed);
}

/**
Expand Down
33 changes: 7 additions & 26 deletions xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.LoadStatsManager.LoadStatsStore;
import io.grpc.xds.LocalityStore.LocalityStoreFactory;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
Expand Down Expand Up @@ -208,11 +209,9 @@ public void shutdown() {
*/
private final class ClusterEndpointsBalancerFactory extends LoadBalancer.Factory {
@Nullable final String clusterServiceName;
final LoadStatsStore loadStatsStore;

ClusterEndpointsBalancerFactory(@Nullable String clusterServiceName) {
this.clusterServiceName = clusterServiceName;
loadStatsStore = new LoadStatsStoreImpl(clusterName, clusterServiceName);
}

@Override
Expand Down Expand Up @@ -248,6 +247,7 @@ final class ClusterEndpointsBalancer extends LoadBalancer {
ClusterEndpointsBalancer(Helper helper) {
this.helper = helper;
resourceName = clusterServiceName != null ? clusterServiceName : clusterName;
LoadStatsStore loadStatsStore = xdsClient.addClientStats(clusterName, clusterServiceName);
localityStore =
localityStoreFactory.newLocalityStore(logId, helper, lbRegistry, loadStatsStore);
endpointWatcher = new EndpointWatcherImpl();
Expand All @@ -267,22 +267,12 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
throw new AssertionError("Can only report load to the same management server");
}
if (!isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Start reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.reportClientStats(clusterName, clusterServiceName, loadStatsStore);
xdsClient.reportClientStats();
isReportingLoad = true;
}
} else {
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
xdsClient.cancelClientStatsReport();
isReportingLoad = false;
}
}
Expand All @@ -304,15 +294,11 @@ public boolean canHandleEmptyAddressListFromNameResolution() {
@Override
public void shutdown() {
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
xdsClient.cancelClientStatsReport();
isReportingLoad = false;
}
localityStore.reset();
xdsClient.removeClientStats(clusterName, clusterServiceName);
xdsClient.cancelEndpointDataWatch(resourceName, endpointWatcher);
logger.log(
XdsLogLevel.INFO,
Expand Down Expand Up @@ -365,12 +351,7 @@ public void onEndpointChanged(EndpointUpdate endpointUpdate) {
public void onResourceDoesNotExist(String resourceName) {
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName);
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
xdsClient.cancelClientStatsReport();
isReportingLoad = false;
}
localityStore.reset();
Expand Down
Loading