Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private InProcessChannelBuilder(String name) {
this.name = Preconditions.checkNotNull(name, "name");
// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
setRecordStats(false);
setStatsRecordStartedRpcs(false);
setStatsRecordFinishedRpcs(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ private InProcessServerBuilder(String name) {
this.name = Preconditions.checkNotNull(name, "name");
// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
setRecordStats(false);
setStatsRecordStartedRpcs(false);
setStatsRecordFinishedRpcs(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ protected final int maxInboundMessageSize() {
}

private boolean statsEnabled = true;
private boolean recordStats = true;
private boolean recordStartedRpcs = true;
private boolean recordFinishedRpcs = true;
private boolean tracingEnabled = true;

@Nullable
Expand Down Expand Up @@ -296,11 +297,19 @@ protected void setStatsEnabled(boolean value) {
}

/**
* Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true.
* Enabled by default.
* Disable or enable stats recording for RPC upstarts. Effective only if {@link
* #setStatsEnabled} is set to true. Enabled by default.
*/
protected void setRecordStats(boolean value) {
recordStats = value;
protected void setStatsRecordStartedRpcs(boolean value) {
recordStartedRpcs = value;
}

/**
* Disable or enable stats recording for RPC completions. Effective only if {@link
* #setStatsEnabled} is set to true. Enabled by default.
*/
protected void setStatsRecordFinishedRpcs(boolean value) {
recordFinishedRpcs = value;
}

/**
Expand Down Expand Up @@ -348,7 +357,8 @@ final List<ClientInterceptor> getEffectiveInterceptors() {
}
// First interceptor runs last (see ClientInterceptors.intercept()), so that no
// other interceptor can override the tracer factory we set in CallOptions.
effectiveInterceptors.add(0, censusStats.getClientInterceptor(recordStats));
effectiveInterceptors.add(
0, censusStats.getClientInterceptor(recordStartedRpcs, recordFinishedRpcs));
}
if (tracingEnabled) {
CensusTracingModule censusTracing =
Expand Down
22 changes: 16 additions & 6 deletions core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public List<ServerServiceDefinition> getServices() {
private CensusStatsModule censusStatsOverride;

private boolean statsEnabled = true;
private boolean recordStats = true;
private boolean recordStartedRpcs = true;
private boolean recordFinishedRpcs = true;
private boolean tracingEnabled = true;

@Override
Expand Down Expand Up @@ -195,11 +196,19 @@ protected void setStatsEnabled(boolean value) {
}

/**
* Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true.
* Enabled by default.
* Disable or enable stats recording for RPC upstarts. Effective only if {@link
* #setStatsEnabled} is set to true. Enabled by default.
*/
protected void setRecordStats(boolean value) {
recordStats = value;
protected void setStatsRecordStartedRpcs(boolean value) {
recordStartedRpcs = value;
}

/**
* Disable or enable stats recording for RPC completions. Effective only if {@link
* #setStatsEnabled} is set to true. Enabled by default.
*/
protected void setStatsRecordFinishedRpcs(boolean value) {
recordFinishedRpcs = value;
}

/**
Expand Down Expand Up @@ -230,7 +239,8 @@ final List<ServerStreamTracer.Factory> getTracerFactories() {
if (censusStats == null) {
censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true);
}
tracerFactories.add(censusStats.getServerTracerFactory(recordStats));
tracerFactories.add(
censusStats.getServerTracerFactory(recordStartedRpcs, recordFinishedRpcs));
}
if (tracingEnabled) {
CensusTracingModule censusTracing =
Expand Down
84 changes: 52 additions & 32 deletions core/src/main/java/io/grpc/internal/CensusStatsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.grpc.internal;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY;
Expand Down Expand Up @@ -53,7 +52,6 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Provides factories for {@link StreamTracer} that records stats to Census.
Expand Down Expand Up @@ -133,22 +131,25 @@ public TagContext parseBytes(byte[] serialized) {
*/
@VisibleForTesting
ClientCallTracer newClientCallTracer(
TagContext parentCtx, String fullMethodName, boolean recordStats) {
return new ClientCallTracer(this, parentCtx, fullMethodName, recordStats);
TagContext parentCtx, String fullMethodName,
boolean recordStartedRpcs, boolean recordFinishedRpcs) {
return new ClientCallTracer(
this, parentCtx, fullMethodName, recordStartedRpcs, recordFinishedRpcs);
}

/**
* Returns the server tracer factory.
*/
ServerStreamTracer.Factory getServerTracerFactory(boolean recordStats) {
return new ServerTracerFactory(recordStats);
ServerStreamTracer.Factory getServerTracerFactory(
boolean recordStartedRpcs, boolean recordFinishedRpcs) {
return new ServerTracerFactory(recordStartedRpcs, recordFinishedRpcs);
}

/**
* Returns the client interceptor that facilitates Census-based stats reporting.
*/
ClientInterceptor getClientInterceptor(boolean recordStats) {
return new StatsClientInterceptor(recordStats);
ClientInterceptor getClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
return new StatsClientInterceptor(recordStartedRpcs, recordFinishedRpcs);
}

private static final class ClientTracer extends ClientStreamTracer {
Expand Down Expand Up @@ -221,18 +222,27 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory {
private volatile ClientTracer streamTracer;
private volatile int callEnded;
private final TagContext parentCtx;
private final boolean recordStats;
private final TagContext startCtx;
private final boolean recordFinishedRpcs;

ClientCallTracer(
CensusStatsModule module,
TagContext parentCtx,
String fullMethodName,
boolean recordStats) {
boolean recordStartedRpcs,
boolean recordFinishedRpcs) {
this.module = module;
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.parentCtx = checkNotNull(parentCtx);
this.startCtx =
module.tagger.toBuilder(parentCtx)
.put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)).build();
this.stopwatch = module.stopwatchSupplier.get().start();
this.recordStats = recordStats;
this.recordFinishedRpcs = recordFinishedRpcs;
if (recordStartedRpcs) {
module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT, 1)
.record(startCtx);
}
}

@Override
Expand Down Expand Up @@ -262,7 +272,7 @@ void callEnded(Status status) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
if (!recordStats) {
if (!recordFinishedRpcs) {
return;
}
stopwatch.stop();
Expand All @@ -272,7 +282,8 @@ void callEnded(Status status) {
tracer = BLANK_CLIENT_TRACER;
}
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// The metrics are in double
.put(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT, 1)
// The latency is double value
.put(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI)
.put(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount)
.put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount)
Expand All @@ -290,8 +301,7 @@ void callEnded(Status status) {
measureMap.record(
module
.tagger
.toBuilder(parentCtx)
.put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName))
.toBuilder(startCtx)
.put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
.build());
}
Expand All @@ -315,12 +325,11 @@ private static final class ServerTracer extends ServerStreamTracer {

private final CensusStatsModule module;
private final String fullMethodName;
@Nullable
private final TagContext parentCtx;
private volatile int streamClosed;
private final Stopwatch stopwatch;
private final Tagger tagger;
private final boolean recordStats;
private final boolean recordFinishedRpcs;
private volatile long outboundMessageCount;
private volatile long inboundMessageCount;
private volatile long outboundWireSize;
Expand All @@ -334,13 +343,18 @@ private static final class ServerTracer extends ServerStreamTracer {
TagContext parentCtx,
Supplier<Stopwatch> stopwatchSupplier,
Tagger tagger,
boolean recordStats) {
boolean recordStartedRpcs,
boolean recordFinishedRpcs) {
this.module = module;
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.stopwatch = stopwatchSupplier.get().start();
this.tagger = tagger;
this.recordStats = recordStats;
this.recordFinishedRpcs = recordFinishedRpcs;
if (recordStartedRpcs) {
module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_SERVER_STARTED_COUNT, 1)
.record(parentCtx);
}
}

@Override
Expand Down Expand Up @@ -384,13 +398,14 @@ public void streamClosed(Status status) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
if (!recordStats) {
if (!recordFinishedRpcs) {
return;
}
stopwatch.stop();
long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// The metrics are in double
.put(RpcMeasureConstants.RPC_SERVER_FINISHED_COUNT, 1)
// The latency is double value
.put(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI)
.put(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount)
.put(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount)
Expand All @@ -401,11 +416,10 @@ public void streamClosed(Status status) {
if (!status.isOk()) {
measureMap.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1);
}
TagContext ctx = firstNonNull(parentCtx, tagger.empty());
measureMap.record(
module
.tagger
.toBuilder(ctx)
.toBuilder(parentCtx)
.put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
.build());
}
Expand All @@ -421,10 +435,12 @@ public Context filterContext(Context context) {

@VisibleForTesting
final class ServerTracerFactory extends ServerStreamTracer.Factory {
private final boolean recordStats;
private final boolean recordStartedRpcs;
private final boolean recordFinishedRpcs;

ServerTracerFactory(boolean recordStats) {
this.recordStats = recordStats;
ServerTracerFactory(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
this.recordStartedRpcs = recordStartedRpcs;
this.recordFinishedRpcs = recordFinishedRpcs;
}

@Override
Expand All @@ -444,16 +460,19 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
parentCtx,
stopwatchSupplier,
tagger,
recordStats);
recordStartedRpcs,
recordFinishedRpcs);
}
}

@VisibleForTesting
final class StatsClientInterceptor implements ClientInterceptor {
private final boolean recordStats;
private final boolean recordStartedRpcs;
private final boolean recordFinishedRpcs;

StatsClientInterceptor(boolean recordStats) {
this.recordStats = recordStats;
StatsClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
this.recordStartedRpcs = recordStartedRpcs;
this.recordFinishedRpcs = recordFinishedRpcs;
}

@Override
Expand All @@ -462,7 +481,8 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
newClientCallTracer(parentCtx, method.getFullMethodName(), recordStats);
newClientCallTracer(parentCtx, method.getFullMethodName(),
recordStartedRpcs, recordFinishedRpcs);
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
Expand Down
Loading