Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions census/src/main/java/io/grpc/census/CensusTracingModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.census;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
Expand Down Expand Up @@ -124,8 +125,8 @@ public SpanContext parseBytes(byte[] serialized) {
*/
@VisibleForTesting
CallAttemptsTracerFactory newClientCallTracer(
@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
return new CallAttemptsTracerFactory(parentSpan, method);
@Nullable Span clientSpan, MethodDescriptor<?, ?> method) {
return new CallAttemptsTracerFactory(clientSpan, method);
}

/**
Expand Down Expand Up @@ -248,17 +249,11 @@ final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
private final Span span;
private final String fullMethodName;

CallAttemptsTracerFactory(@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
CallAttemptsTracerFactory(@Nullable Span clientSpan, MethodDescriptor<?, ?> method) {
checkNotNull(method, "method");
this.isSampledToLocalTracing = method.isSampledToLocalTracing();
this.fullMethodName = method.getFullMethodName();
this.span =
censusTracer
.spanBuilderWithExplicitParent(
generateTraceSpanName(false, fullMethodName),
parentSpan)
.setRecordEvents(true)
.startSpan();
this.span = clientSpan;
}

@Override
Expand Down Expand Up @@ -461,13 +456,20 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
final CallAttemptsTracerFactory tracerFactory =
newClientCallTracer(
io.opencensus.trace.unsafe.ContextUtils.getValue(Context.current()), method);
Span parentSpan = io.opencensus.trace.unsafe.ContextUtils.getValue(Context.current());
Span clientSpan = censusTracer
.spanBuilderWithExplicitParent(
generateTraceSpanName(false, method.getFullMethodName()),
parentSpan)
.setRecordEvents(true)
.startSpan();

final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory));
callOptions.withStreamTracerFactory(tracerFactory)
.withOption(CLIENT_TRACE_SPAN_CONTEXT_KEY, clientSpan.getContext()));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER_STATUS;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
import io.opencensus.stats.Aggregation;
import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble;
import io.opencensus.stats.View;
import io.opencensus.trace.SpanContext;
import java.util.Arrays;

// TODO(dnvindhya): Remove metric and view definitions from this class once it is moved to
Expand All @@ -42,6 +44,9 @@
@VisibleForTesting
public final class ObservabilityCensusConstants {

public static CallOptions.Key<SpanContext> CLIENT_TRACE_SPAN_CONTEXT_KEY
= CallOptions.Key.createWithDefault("Client span context for tracing", SpanContext.INVALID);

static final Aggregation AGGREGATION_WITH_BYTES_HISTOGRAM =
RpcViewConstants.GRPC_CLIENT_SENT_BYTES_PER_RPC_VIEW.getAggregation();

Expand Down
21 changes: 9 additions & 12 deletions census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL;
import static io.grpc.census.internal.ObservabilityCensusConstants.API_LATENCY_PER_CALL;
import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -317,6 +318,10 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
capturedCallOptions.get().getStreamTracerFactories().get(1)
instanceof CensusStatsModule.CallAttemptsTracerFactory);

// The interceptor adds client SpanContext to CallOptions
assertTrue(capturedCallOptions.get().getOption(CLIENT_TRACE_SPAN_CONTEXT_KEY).isValid());
assertTrue(capturedCallOptions.get().getOption(CLIENT_TRACE_SPAN_CONTEXT_KEY) != null);

// Make the call
Metadata headers = new Metadata();
call.start(mockClientCallListener, headers);
Expand Down Expand Up @@ -738,12 +743,10 @@ private void assertPerCallMetrics(double expectedLatencyValue) {
@Test
public void clientBasicTracingDefaultSpan() {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, method);
censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), ArgumentMatchers.<Span>isNull());
verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), eq(spyClientSpan));
verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
Expand Down Expand Up @@ -797,7 +800,7 @@ public void clientBasicTracingDefaultSpan() {
@Test
public void clientTracingSampledToLocalSpanStore() {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, sampledMethod);
censusTracing.newClientCallTracer(spyClientSpan, sampledMethod);
callTracer.callEnded(Status.OK);

verify(spyClientSpan).end(
Expand Down Expand Up @@ -867,10 +870,7 @@ record = statsRecorder.pollRecord();
@Test
public void clientStreamNeverCreatedStillRecordTracing() {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
verify(spyClientSpanBuilder).setRecordEvents(eq(true));
censusTracing.newClientCallTracer(spyClientSpan, method);

callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
verify(spyClientSpan).end(
Expand Down Expand Up @@ -1046,18 +1046,15 @@ public void statsHeaderMalformed() {
@Test
public void traceHeadersPropagateSpanContext() throws Exception {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer streamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
streamTracer.streamCreated(Attributes.EMPTY, headers);

verify(mockTracingPropagationHandler).toByteArray(same(fakeAttemptSpanContext));
verifyNoMoreInteractions(mockTracingPropagationHandler);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), same(spyClientSpan));
verify(spyClientSpanBuilder).setRecordEvents(eq(true));
verifyNoMoreInteractions(tracer);
assertTrue(headers.containsKey(censusTracing.tracingHeader));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void wrapUp() {
@Test
public void clientBasicTracingUncompressedSizeAnnotation() {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, method);
censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.gcp.observability.logging.TraceLoggingHelper;
import io.opencensus.common.Duration;
import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
Expand All @@ -58,7 +59,8 @@
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class GcpObservability implements AutoCloseable {
private static final int METRICS_EXPORT_INTERVAL = 30;
private static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of(
@VisibleForTesting
static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of(
"google.logging.v2.LoggingServiceV2", "google.monitoring.v3.MetricService",
"google.devtools.cloudtrace.v2.TraceService");
private static GcpObservability instance = null;
Expand All @@ -77,9 +79,11 @@ public static synchronized GcpObservability grpcInit() throws IOException {
if (instance == null) {
GlobalLocationTags globalLocationTags = new GlobalLocationTags();
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(
observabilityConfig.getProjectId());
Sink sink = new GcpLogSink(observabilityConfig.getProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
SERVICES_TO_EXCLUDE);
globalLocationTags.getLocationTags(), observabilityConfig,
SERVICES_TO_EXCLUDE, traceLoggingHelper);
LogHelper helper = new LogHelper(sink);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.getInstance(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.gcp.observability.interceptors;

import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;

import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.CallOptions;
Expand All @@ -33,6 +35,7 @@
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.SpanContext;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -92,6 +95,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
// Get the stricter deadline to calculate the timeout once the call starts
final Deadline deadline = LogHelper.min(callOptions.getDeadline(),
Context.current().getDeadline());
final SpanContext clientSpanContext = callOptions.getOption(CLIENT_TRACE_SPAN_CONTEXT_KEY);

FilterParams filterParams = filterHelper.logRpcMethod(method.getFullMethodName(), true);
if (!filterParams.log()) {
Expand Down Expand Up @@ -122,7 +126,8 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
maxHeaderBytes,
EventLogger.CLIENT,
callId,
null);
null,
clientSpanContext);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
Expand All @@ -148,7 +153,8 @@ public void onMessage(RespT message) {
message,
maxMessageBytes,
EventLogger.CLIENT,
callId);
callId,
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
Expand All @@ -168,7 +174,8 @@ public void onHeaders(Metadata headers) {
maxHeaderBytes,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
LogHelper.getPeerAddress(getAttributes()),
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
Expand All @@ -189,7 +196,8 @@ public void onClose(Status status, Metadata trailers) {
maxHeaderBytes,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
LogHelper.getPeerAddress(getAttributes()),
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
Expand All @@ -212,7 +220,8 @@ public void sendMessage(ReqT message) {
message,
maxMessageBytes,
EventLogger.CLIENT,
callId);
callId,
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
Expand All @@ -229,7 +238,8 @@ public void halfClose() {
methodName,
authority,
EventLogger.CLIENT,
callId);
callId,
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
Expand All @@ -246,7 +256,8 @@ public void cancel(String message, Throwable cause) {
methodName,
authority,
EventLogger.CLIENT,
callId);
callId,
clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.unsafe.ContextHandleUtils;
import java.net.SocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -91,6 +94,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
Deadline deadline = Context.current().getDeadline();
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
Span span = ContextHandleUtils.getValue(ContextHandleUtils.currentContext());
final SpanContext serverSpanContext = span == null ? SpanContext.INVALID : span.getContext();

FilterParams filterParams =
filterHelper.logRpcMethod(call.getMethodDescriptor().getFullMethodName(), false);
Expand All @@ -113,7 +118,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
maxHeaderBytes,
EventLogger.SERVER,
callId,
peerAddress);
peerAddress,
serverSpanContext);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
Expand All @@ -139,7 +145,8 @@ public void sendHeaders(Metadata headers) {
maxHeaderBytes,
EventLogger.SERVER,
callId,
null);
null,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
Expand All @@ -160,7 +167,8 @@ public void sendMessage(RespT message) {
message,
maxMessageBytes,
EventLogger.SERVER,
callId);
callId,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
Expand All @@ -181,7 +189,8 @@ public void close(Status status, Metadata trailers) {
maxHeaderBytes,
EventLogger.SERVER,
callId,
null);
null,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
Expand All @@ -206,7 +215,8 @@ public void onMessage(ReqT message) {
message,
maxMessageBytes,
EventLogger.SERVER,
callId);
callId,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
Expand All @@ -223,7 +233,8 @@ public void onHalfClose() {
methodName,
authority,
EventLogger.SERVER,
callId);
callId,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
Expand All @@ -240,7 +251,8 @@ public void onCancel() {
methodName,
authority,
EventLogger.SERVER,
callId);
callId,
serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
Expand Down
Loading