Skip to content

Commit ca9a41a

Browse files
authored
core: record RPC upstarts to Census. (#3708)
RPC upstarts are counted into metrics RPC_{CLIENT,SERVER}_STARTED_COUNT. In addition, RPC completions are counted into metrics RPC_{CLIENT,SERVER}_FINISHED_COUNT. From these metrics, users will be able to derive count of RPCs that are currently active.
1 parent bd32d6f commit ca9a41a

File tree

8 files changed

+324
-128
lines changed

8 files changed

+324
-128
lines changed

core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ private InProcessChannelBuilder(String name) {
7171
this.name = Preconditions.checkNotNull(name, "name");
7272
// In-process transport should not record its traffic to the stats module.
7373
// https://github.com/grpc/grpc-java/issues/2284
74-
setRecordStats(false);
74+
setStatsRecordStartedRpcs(false);
75+
setStatsRecordFinishedRpcs(false);
7576
}
7677

7778
@Override

core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ private InProcessServerBuilder(String name) {
8181
this.name = Preconditions.checkNotNull(name, "name");
8282
// In-process transport should not record its traffic to the stats module.
8383
// https://github.com/grpc/grpc-java/issues/2284
84-
setRecordStats(false);
84+
setStatsRecordStartedRpcs(false);
85+
setStatsRecordFinishedRpcs(false);
8586
}
8687

8788
@Override

core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ protected final int maxInboundMessageSize() {
144144
}
145145

146146
private boolean statsEnabled = true;
147-
private boolean recordStats = true;
147+
private boolean recordStartedRpcs = true;
148+
private boolean recordFinishedRpcs = true;
148149
private boolean tracingEnabled = true;
149150

150151
@Nullable
@@ -296,11 +297,19 @@ protected void setStatsEnabled(boolean value) {
296297
}
297298

298299
/**
299-
* Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true.
300-
* Enabled by default.
300+
* Disable or enable stats recording for RPC upstarts. Effective only if {@link
301+
* #setStatsEnabled} is set to true. Enabled by default.
301302
*/
302-
protected void setRecordStats(boolean value) {
303-
recordStats = value;
303+
protected void setStatsRecordStartedRpcs(boolean value) {
304+
recordStartedRpcs = value;
305+
}
306+
307+
/**
308+
* Disable or enable stats recording for RPC completions. Effective only if {@link
309+
* #setStatsEnabled} is set to true. Enabled by default.
310+
*/
311+
protected void setStatsRecordFinishedRpcs(boolean value) {
312+
recordFinishedRpcs = value;
304313
}
305314

306315
/**
@@ -348,7 +357,8 @@ final List<ClientInterceptor> getEffectiveInterceptors() {
348357
}
349358
// First interceptor runs last (see ClientInterceptors.intercept()), so that no
350359
// other interceptor can override the tracer factory we set in CallOptions.
351-
effectiveInterceptors.add(0, censusStats.getClientInterceptor(recordStats));
360+
effectiveInterceptors.add(
361+
0, censusStats.getClientInterceptor(recordStartedRpcs, recordFinishedRpcs));
352362
}
353363
if (tracingEnabled) {
354364
CensusTracingModule censusTracing =

core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ public List<ServerServiceDefinition> getServices() {
103103
private CensusStatsModule censusStatsOverride;
104104

105105
private boolean statsEnabled = true;
106-
private boolean recordStats = true;
106+
private boolean recordStartedRpcs = true;
107+
private boolean recordFinishedRpcs = true;
107108
private boolean tracingEnabled = true;
108109

109110
@Override
@@ -207,11 +208,19 @@ protected void setStatsEnabled(boolean value) {
207208
}
208209

209210
/**
210-
* Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true.
211-
* Enabled by default.
211+
* Disable or enable stats recording for RPC upstarts. Effective only if {@link
212+
* #setStatsEnabled} is set to true. Enabled by default.
212213
*/
213-
protected void setRecordStats(boolean value) {
214-
recordStats = value;
214+
protected void setStatsRecordStartedRpcs(boolean value) {
215+
recordStartedRpcs = value;
216+
}
217+
218+
/**
219+
* Disable or enable stats recording for RPC completions. Effective only if {@link
220+
* #setStatsEnabled} is set to true. Enabled by default.
221+
*/
222+
protected void setStatsRecordFinishedRpcs(boolean value) {
223+
recordFinishedRpcs = value;
215224
}
216225

217226
/**
@@ -242,7 +251,8 @@ final List<ServerStreamTracer.Factory> getTracerFactories() {
242251
if (censusStats == null) {
243252
censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true);
244253
}
245-
tracerFactories.add(censusStats.getServerTracerFactory(recordStats));
254+
tracerFactories.add(
255+
censusStats.getServerTracerFactory(recordStartedRpcs, recordFinishedRpcs));
246256
}
247257
if (tracingEnabled) {
248258
CensusTracingModule censusTracing =

core/src/main/java/io/grpc/internal/CensusStatsModule.java

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.grpc.internal;
1818

19-
import static com.google.common.base.MoreObjects.firstNonNull;
2019
import static com.google.common.base.Preconditions.checkNotNull;
2120
import static com.google.common.base.Preconditions.checkState;
2221
import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY;
@@ -53,7 +52,6 @@
5352
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
5453
import java.util.logging.Level;
5554
import java.util.logging.Logger;
56-
import javax.annotation.Nullable;
5755

5856
/**
5957
* Provides factories for {@link StreamTracer} that records stats to Census.
@@ -133,22 +131,25 @@ public TagContext parseBytes(byte[] serialized) {
133131
*/
134132
@VisibleForTesting
135133
ClientCallTracer newClientCallTracer(
136-
TagContext parentCtx, String fullMethodName, boolean recordStats) {
137-
return new ClientCallTracer(this, parentCtx, fullMethodName, recordStats);
134+
TagContext parentCtx, String fullMethodName,
135+
boolean recordStartedRpcs, boolean recordFinishedRpcs) {
136+
return new ClientCallTracer(
137+
this, parentCtx, fullMethodName, recordStartedRpcs, recordFinishedRpcs);
138138
}
139139

140140
/**
141141
* Returns the server tracer factory.
142142
*/
143-
ServerStreamTracer.Factory getServerTracerFactory(boolean recordStats) {
144-
return new ServerTracerFactory(recordStats);
143+
ServerStreamTracer.Factory getServerTracerFactory(
144+
boolean recordStartedRpcs, boolean recordFinishedRpcs) {
145+
return new ServerTracerFactory(recordStartedRpcs, recordFinishedRpcs);
145146
}
146147

147148
/**
148149
* Returns the client interceptor that facilitates Census-based stats reporting.
149150
*/
150-
ClientInterceptor getClientInterceptor(boolean recordStats) {
151-
return new StatsClientInterceptor(recordStats);
151+
ClientInterceptor getClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
152+
return new StatsClientInterceptor(recordStartedRpcs, recordFinishedRpcs);
152153
}
153154

154155
private static final class ClientTracer extends ClientStreamTracer {
@@ -221,18 +222,27 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory {
221222
private volatile ClientTracer streamTracer;
222223
private volatile int callEnded;
223224
private final TagContext parentCtx;
224-
private final boolean recordStats;
225+
private final TagContext startCtx;
226+
private final boolean recordFinishedRpcs;
225227

226228
ClientCallTracer(
227229
CensusStatsModule module,
228230
TagContext parentCtx,
229231
String fullMethodName,
230-
boolean recordStats) {
232+
boolean recordStartedRpcs,
233+
boolean recordFinishedRpcs) {
231234
this.module = module;
232-
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
233235
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
236+
this.parentCtx = checkNotNull(parentCtx);
237+
this.startCtx =
238+
module.tagger.toBuilder(parentCtx)
239+
.put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)).build();
234240
this.stopwatch = module.stopwatchSupplier.get().start();
235-
this.recordStats = recordStats;
241+
this.recordFinishedRpcs = recordFinishedRpcs;
242+
if (recordStartedRpcs) {
243+
module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT, 1)
244+
.record(startCtx);
245+
}
236246
}
237247

238248
@Override
@@ -262,7 +272,7 @@ void callEnded(Status status) {
262272
if (callEndedUpdater.getAndSet(this, 1) != 0) {
263273
return;
264274
}
265-
if (!recordStats) {
275+
if (!recordFinishedRpcs) {
266276
return;
267277
}
268278
stopwatch.stop();
@@ -272,7 +282,8 @@ void callEnded(Status status) {
272282
tracer = BLANK_CLIENT_TRACER;
273283
}
274284
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
275-
// The metrics are in double
285+
.put(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT, 1)
286+
// The latency is double value
276287
.put(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI)
277288
.put(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount)
278289
.put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount)
@@ -290,8 +301,7 @@ void callEnded(Status status) {
290301
measureMap.record(
291302
module
292303
.tagger
293-
.toBuilder(parentCtx)
294-
.put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName))
304+
.toBuilder(startCtx)
295305
.put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
296306
.build());
297307
}
@@ -315,12 +325,11 @@ private static final class ServerTracer extends ServerStreamTracer {
315325

316326
private final CensusStatsModule module;
317327
private final String fullMethodName;
318-
@Nullable
319328
private final TagContext parentCtx;
320329
private volatile int streamClosed;
321330
private final Stopwatch stopwatch;
322331
private final Tagger tagger;
323-
private final boolean recordStats;
332+
private final boolean recordFinishedRpcs;
324333
private volatile long outboundMessageCount;
325334
private volatile long inboundMessageCount;
326335
private volatile long outboundWireSize;
@@ -334,13 +343,18 @@ private static final class ServerTracer extends ServerStreamTracer {
334343
TagContext parentCtx,
335344
Supplier<Stopwatch> stopwatchSupplier,
336345
Tagger tagger,
337-
boolean recordStats) {
346+
boolean recordStartedRpcs,
347+
boolean recordFinishedRpcs) {
338348
this.module = module;
339349
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
340350
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
341351
this.stopwatch = stopwatchSupplier.get().start();
342352
this.tagger = tagger;
343-
this.recordStats = recordStats;
353+
this.recordFinishedRpcs = recordFinishedRpcs;
354+
if (recordStartedRpcs) {
355+
module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_SERVER_STARTED_COUNT, 1)
356+
.record(parentCtx);
357+
}
344358
}
345359

346360
@Override
@@ -384,13 +398,14 @@ public void streamClosed(Status status) {
384398
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
385399
return;
386400
}
387-
if (!recordStats) {
401+
if (!recordFinishedRpcs) {
388402
return;
389403
}
390404
stopwatch.stop();
391405
long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
392406
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
393-
// The metrics are in double
407+
.put(RpcMeasureConstants.RPC_SERVER_FINISHED_COUNT, 1)
408+
// The latency is double value
394409
.put(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI)
395410
.put(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount)
396411
.put(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount)
@@ -401,11 +416,10 @@ public void streamClosed(Status status) {
401416
if (!status.isOk()) {
402417
measureMap.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1);
403418
}
404-
TagContext ctx = firstNonNull(parentCtx, tagger.empty());
405419
measureMap.record(
406420
module
407421
.tagger
408-
.toBuilder(ctx)
422+
.toBuilder(parentCtx)
409423
.put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
410424
.build());
411425
}
@@ -421,10 +435,12 @@ public Context filterContext(Context context) {
421435

422436
@VisibleForTesting
423437
final class ServerTracerFactory extends ServerStreamTracer.Factory {
424-
private final boolean recordStats;
438+
private final boolean recordStartedRpcs;
439+
private final boolean recordFinishedRpcs;
425440

426-
ServerTracerFactory(boolean recordStats) {
427-
this.recordStats = recordStats;
441+
ServerTracerFactory(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
442+
this.recordStartedRpcs = recordStartedRpcs;
443+
this.recordFinishedRpcs = recordFinishedRpcs;
428444
}
429445

430446
@Override
@@ -444,16 +460,19 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
444460
parentCtx,
445461
stopwatchSupplier,
446462
tagger,
447-
recordStats);
463+
recordStartedRpcs,
464+
recordFinishedRpcs);
448465
}
449466
}
450467

451468
@VisibleForTesting
452469
final class StatsClientInterceptor implements ClientInterceptor {
453-
private final boolean recordStats;
470+
private final boolean recordStartedRpcs;
471+
private final boolean recordFinishedRpcs;
454472

455-
StatsClientInterceptor(boolean recordStats) {
456-
this.recordStats = recordStats;
473+
StatsClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
474+
this.recordStartedRpcs = recordStartedRpcs;
475+
this.recordFinishedRpcs = recordFinishedRpcs;
457476
}
458477

459478
@Override
@@ -462,7 +481,8 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
462481
// New RPCs on client-side inherit the tag context from the current Context.
463482
TagContext parentCtx = tagger.getCurrentTagContext();
464483
final ClientCallTracer tracerFactory =
465-
newClientCallTracer(parentCtx, method.getFullMethodName(), recordStats);
484+
newClientCallTracer(parentCtx, method.getFullMethodName(),
485+
recordStartedRpcs, recordFinishedRpcs);
466486
ClientCall<ReqT, RespT> call =
467487
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
468488
return new SimpleForwardingClientCall<ReqT, RespT>(call) {

0 commit comments

Comments
 (0)