-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds: manage load stats for all clusters in XdsClient #7299
Changes from all commits
d2df5c1
92f53bc
1e1b00b
e2955ea
26d39bc
887e00c
58cb691
84232bd
210a803
754b69f
30d8c17
4d9d25e
b035d3e
bd48b69
84e6579
d35e66e
801b23d
6b4199d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,27 +54,10 @@ final class ClientLoadCounter { | |
private final AtomicLong callsIssued = new AtomicLong(); | ||
private final MetricRecorder[] metricRecorders = new MetricRecorder[THREAD_BALANCING_FACTOR]; | ||
|
||
// True if this counter continues to record stats after next snapshot. Otherwise, it will be | ||
// discarded. | ||
private boolean active; | ||
|
||
ClientLoadCounter() { | ||
for (int i = 0; i < THREAD_BALANCING_FACTOR; i++) { | ||
metricRecorders[i] = new MetricRecorder(); | ||
} | ||
active = true; | ||
} | ||
|
||
/** | ||
* Must only be used for testing. | ||
*/ | ||
@VisibleForTesting | ||
ClientLoadCounter(long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replaced by individual setters as the way of testing has changed. |
||
this(); | ||
this.callsSucceeded.set(callsSucceeded); | ||
this.callsInProgress.set(callsInProgress); | ||
this.callsFailed.set(callsFailed); | ||
this.callsIssued.set(callsIssued); | ||
} | ||
|
||
void recordCallStarted() { | ||
|
@@ -98,12 +81,8 @@ void recordMetric(String name, double value) { | |
} | ||
|
||
/** | ||
* Generates a snapshot for load stats recorded in this counter. Successive snapshots represent | ||
* load stats recorded for the interval since the previous snapshot. So taking a snapshot clears | ||
* the counter state except for ongoing RPC recordings. | ||
* | ||
* <p>This method is not thread-safe and must be called from {@link | ||
* io.grpc.LoadBalancer.Helper#getSynchronizationContext()}. | ||
* Generates a snapshot for load stats recorded in this counter for the interval between calls | ||
* of this method. | ||
*/ | ||
ClientLoadSnapshot snapshot() { | ||
Map<String, MetricValue> aggregatedValues = new HashMap<>(); | ||
|
@@ -127,12 +106,24 @@ ClientLoadSnapshot snapshot() { | |
aggregatedValues); | ||
} | ||
|
||
void setActive(boolean value) { | ||
active = value; | ||
@VisibleForTesting | ||
void setCallsIssued(long callsIssued) { | ||
this.callsIssued.set(callsIssued); | ||
} | ||
|
||
@VisibleForTesting | ||
void setCallsInProgress(long callsInProgress) { | ||
this.callsInProgress.set(callsInProgress); | ||
} | ||
|
||
boolean isActive() { | ||
return active; | ||
@VisibleForTesting | ||
void setCallsSucceeded(long callsSucceeded) { | ||
this.callsSucceeded.set(callsSucceeded); | ||
} | ||
|
||
@VisibleForTesting | ||
void setCallsFailed(long callsFailed) { | ||
this.callsFailed.set(callsFailed); | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
import io.grpc.xds.EnvoyProtoData.Locality; | ||
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; | ||
import io.grpc.xds.EnvoyProtoData.Node; | ||
import io.grpc.xds.LoadStatsManager.LoadStatsStore; | ||
import io.grpc.xds.LocalityStore.LocalityStoreFactory; | ||
import io.grpc.xds.XdsClient.EndpointUpdate; | ||
import io.grpc.xds.XdsClient.EndpointWatcher; | ||
|
@@ -208,11 +209,9 @@ public void shutdown() { | |
*/ | ||
private final class ClusterEndpointsBalancerFactory extends LoadBalancer.Factory { | ||
@Nullable final String clusterServiceName; | ||
final LoadStatsStore loadStatsStore; | ||
|
||
ClusterEndpointsBalancerFactory(@Nullable String clusterServiceName) { | ||
this.clusterServiceName = clusterServiceName; | ||
loadStatsStore = new LoadStatsStoreImpl(clusterName, clusterServiceName); | ||
} | ||
|
||
@Override | ||
|
@@ -248,6 +247,7 @@ final class ClusterEndpointsBalancer extends LoadBalancer { | |
ClusterEndpointsBalancer(Helper helper) { | ||
this.helper = helper; | ||
resourceName = clusterServiceName != null ? clusterServiceName : clusterName; | ||
LoadStatsStore loadStatsStore = xdsClient.addClientStats(clusterName, clusterServiceName); | ||
localityStore = | ||
localityStoreFactory.newLocalityStore(logId, helper, lbRegistry, loadStatsStore); | ||
endpointWatcher = new EndpointWatcherImpl(); | ||
|
@@ -267,22 +267,12 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { | |
throw new AssertionError("Can only report load to the same management server"); | ||
} | ||
if (!isReportingLoad) { | ||
logger.log( | ||
XdsLogLevel.INFO, | ||
"Start reporting loads for cluster: {0}, cluster_service: {1}", | ||
clusterName, | ||
clusterServiceName); | ||
xdsClient.reportClientStats(clusterName, clusterServiceName, loadStatsStore); | ||
xdsClient.reportClientStats(); | ||
isReportingLoad = true; | ||
} | ||
} else { | ||
if (isReportingLoad) { | ||
logger.log( | ||
XdsLogLevel.INFO, | ||
"Stop reporting loads for cluster: {0}, cluster_service: {1}", | ||
clusterName, | ||
clusterServiceName); | ||
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName); | ||
xdsClient.cancelClientStatsReport(); | ||
isReportingLoad = false; | ||
} | ||
} | ||
|
@@ -304,15 +294,11 @@ public boolean canHandleEmptyAddressListFromNameResolution() { | |
@Override | ||
public void shutdown() { | ||
if (isReportingLoad) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, forgot to put that back. Basically, nothing has changed in terms of the control flow. Only added XdsClient APIs for creating stats objects. |
||
logger.log( | ||
XdsLogLevel.INFO, | ||
"Stop reporting loads for cluster: {0}, cluster_service: {1}", | ||
clusterName, | ||
clusterServiceName); | ||
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName); | ||
xdsClient.cancelClientStatsReport(); | ||
isReportingLoad = false; | ||
} | ||
localityStore.reset(); | ||
voidzcy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
xdsClient.removeClientStats(clusterName, clusterServiceName); | ||
xdsClient.cancelEndpointDataWatch(resourceName, endpointWatcher); | ||
logger.log( | ||
XdsLogLevel.INFO, | ||
|
@@ -365,12 +351,7 @@ public void onEndpointChanged(EndpointUpdate endpointUpdate) { | |
public void onResourceDoesNotExist(String resourceName) { | ||
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName); | ||
if (isReportingLoad) { | ||
logger.log( | ||
XdsLogLevel.INFO, | ||
"Stop reporting loads for cluster: {0}, cluster_service: {1}", | ||
clusterName, | ||
clusterServiceName); | ||
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName); | ||
xdsClient.cancelClientStatsReport(); | ||
isReportingLoad = false; | ||
} | ||
localityStore.reset(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a reminder,
active
is true if this counter is currently in-use by some LB policy to track loads sent to a specific locality.We do not mark internally if a counter is still in use by some LB policy, instead ref-count the usage of this counter externally by wrapping a counter with
ReferenceCounted
.