3939import io .grpc .StreamTracer ;
4040import io .opencensus .contrib .grpc .metrics .RpcMeasureConstants ;
4141import io .opencensus .stats .MeasureMap ;
42+ import io .opencensus .stats .Stats ;
4243import io .opencensus .stats .StatsRecorder ;
4344import io .opencensus .tags .TagContext ;
4445import io .opencensus .tags .TagValue ;
4546import io .opencensus .tags .Tagger ;
47+ import io .opencensus .tags .Tags ;
4648import io .opencensus .tags .propagation .TagContextBinarySerializer ;
4749import io .opencensus .tags .propagation .TagContextSerializationException ;
4850import java .util .concurrent .TimeUnit ;
@@ -74,21 +76,33 @@ final class CensusStatsModule {
7476 private final Supplier <Stopwatch > stopwatchSupplier ;
7577 @ VisibleForTesting
7678 final Metadata .Key <TagContext > statsHeader ;
77- private final StatsClientInterceptor clientInterceptor = new StatsClientInterceptor ();
78- private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory ();
7979 private final boolean propagateTags ;
80- private final boolean recordStats ;
8180
81+ /**
82+ * Creates a {@link CensusStatsModule} with the default OpenCensus implementation.
83+ */
84+ CensusStatsModule (Supplier <Stopwatch > stopwatchSupplier , boolean propagateTags ) {
85+ this (
86+ Tags .getTagger (),
87+ Tags .getTagPropagationComponent ().getBinarySerializer (),
88+ Stats .getStatsRecorder (),
89+ stopwatchSupplier ,
90+ propagateTags );
91+ }
92+
93+ /**
94+ * Creates a {@link CensusStatsModule} with the given OpenCensus implementation.
95+ */
8296 CensusStatsModule (
8397 final Tagger tagger ,
8498 final TagContextBinarySerializer tagCtxSerializer ,
8599 StatsRecorder statsRecorder , Supplier <Stopwatch > stopwatchSupplier ,
86- boolean propagateTags , boolean recordStats ) {
100+ boolean propagateTags ) {
87101 this .tagger = checkNotNull (tagger , "tagger" );
88102 this .statsRecorder = checkNotNull (statsRecorder , "statsRecorder" );
103+ checkNotNull (tagCtxSerializer , "tagCtxSerializer" );
89104 this .stopwatchSupplier = checkNotNull (stopwatchSupplier , "stopwatchSupplier" );
90105 this .propagateTags = propagateTags ;
91- this .recordStats = recordStats ;
92106 this .statsHeader =
93107 Metadata .Key .of ("grpc-tags-bin" , new Metadata .BinaryMarshaller <TagContext >() {
94108 @ Override
@@ -118,22 +132,23 @@ public TagContext parseBytes(byte[] serialized) {
118132 * Creates a {@link ClientCallTracer} for a new call.
119133 */
120134 @ VisibleForTesting
121- ClientCallTracer newClientCallTracer (TagContext parentCtx , String fullMethodName ) {
122- return new ClientCallTracer (this , parentCtx , fullMethodName );
135+ ClientCallTracer newClientCallTracer (
136+ TagContext parentCtx , String fullMethodName , boolean recordStats ) {
137+ return new ClientCallTracer (this , parentCtx , fullMethodName , recordStats );
123138 }
124139
125140 /**
126141 * Returns the server tracer factory.
127142 */
128- ServerStreamTracer .Factory getServerTracerFactory () {
129- return serverTracerFactory ;
143+ ServerStreamTracer .Factory getServerTracerFactory (boolean recordStats ) {
144+ return new ServerTracerFactory ( recordStats ) ;
130145 }
131146
132147 /**
133148 * Returns the client interceptor that facilitates Census-based stats reporting.
134149 */
135- ClientInterceptor getClientInterceptor () {
136- return clientInterceptor ;
150+ ClientInterceptor getClientInterceptor (boolean recordStats ) {
151+ return new StatsClientInterceptor ( recordStats ) ;
137152 }
138153
139154 private static final class ClientTracer extends ClientStreamTracer {
@@ -206,12 +221,18 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory {
206221 private volatile ClientTracer streamTracer ;
207222 private volatile int callEnded ;
208223 private final TagContext parentCtx ;
224+ private final boolean recordStats ;
209225
210- ClientCallTracer (CensusStatsModule module , TagContext parentCtx , String fullMethodName ) {
226+ ClientCallTracer (
227+ CensusStatsModule module ,
228+ TagContext parentCtx ,
229+ String fullMethodName ,
230+ boolean recordStats ) {
211231 this .module = module ;
212232 this .parentCtx = checkNotNull (parentCtx , "parentCtx" );
213233 this .fullMethodName = checkNotNull (fullMethodName , "fullMethodName" );
214234 this .stopwatch = module .stopwatchSupplier .get ().start ();
235+ this .recordStats = recordStats ;
215236 }
216237
217238 @ Override
@@ -241,7 +262,7 @@ void callEnded(Status status) {
241262 if (callEndedUpdater .getAndSet (this , 1 ) != 0 ) {
242263 return ;
243264 }
244- if (!module . recordStats ) {
265+ if (!recordStats ) {
245266 return ;
246267 }
247268 stopwatch .stop ();
@@ -299,6 +320,7 @@ private static final class ServerTracer extends ServerStreamTracer {
299320 private volatile int streamClosed ;
300321 private final Stopwatch stopwatch ;
301322 private final Tagger tagger ;
323+ private final boolean recordStats ;
302324 private volatile long outboundMessageCount ;
303325 private volatile long inboundMessageCount ;
304326 private volatile long outboundWireSize ;
@@ -311,12 +333,14 @@ private static final class ServerTracer extends ServerStreamTracer {
311333 String fullMethodName ,
312334 TagContext parentCtx ,
313335 Supplier <Stopwatch > stopwatchSupplier ,
314- Tagger tagger ) {
336+ Tagger tagger ,
337+ boolean recordStats ) {
315338 this .module = module ;
316339 this .fullMethodName = checkNotNull (fullMethodName , "fullMethodName" );
317340 this .parentCtx = checkNotNull (parentCtx , "parentCtx" );
318341 this .stopwatch = stopwatchSupplier .get ().start ();
319342 this .tagger = tagger ;
343+ this .recordStats = recordStats ;
320344 }
321345
322346 @ Override
@@ -360,7 +384,7 @@ public void streamClosed(Status status) {
360384 if (streamClosedUpdater .getAndSet (this , 1 ) != 0 ) {
361385 return ;
362386 }
363- if (!module . recordStats ) {
387+ if (!recordStats ) {
364388 return ;
365389 }
366390 stopwatch .stop ();
@@ -397,6 +421,12 @@ public Context filterContext(Context context) {
397421
398422 @ VisibleForTesting
399423 final class ServerTracerFactory extends ServerStreamTracer .Factory {
424+ private final boolean recordStats ;
425+
426+ ServerTracerFactory (boolean recordStats ) {
427+ this .recordStats = recordStats ;
428+ }
429+
400430 @ Override
401431 public ServerStreamTracer newServerStreamTracer (String fullMethodName , Metadata headers ) {
402432 TagContext parentCtx = headers .get (statsHeader );
@@ -409,19 +439,30 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
409439 .put (RpcMeasureConstants .RPC_METHOD , TagValue .create (fullMethodName ))
410440 .build ();
411441 return new ServerTracer (
412- CensusStatsModule .this , fullMethodName , parentCtx , stopwatchSupplier , tagger );
442+ CensusStatsModule .this ,
443+ fullMethodName ,
444+ parentCtx ,
445+ stopwatchSupplier ,
446+ tagger ,
447+ recordStats );
413448 }
414449 }
415450
416451 @ VisibleForTesting
417452 final class StatsClientInterceptor implements ClientInterceptor {
453+ private final boolean recordStats ;
454+
455+ StatsClientInterceptor (boolean recordStats ) {
456+ this .recordStats = recordStats ;
457+ }
458+
418459 @ Override
419460 public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
420461 MethodDescriptor <ReqT , RespT > method , CallOptions callOptions , Channel next ) {
421462 // New RPCs on client-side inherit the tag context from the current Context.
422463 TagContext parentCtx = tagger .getCurrentTagContext ();
423464 final ClientCallTracer tracerFactory =
424- newClientCallTracer (parentCtx , method .getFullMethodName ());
465+ newClientCallTracer (parentCtx , method .getFullMethodName (), recordStats );
425466 ClientCall <ReqT , RespT > call =
426467 next .newCall (method , callOptions .withStreamTracerFactory (tracerFactory ));
427468 return new SimpleForwardingClientCall <ReqT , RespT >(call ) {
0 commit comments