Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/src/main/java/io/grpc/ClientStreamTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ public abstract class ClientStreamTracer extends StreamTracer {
public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadata headers) {
}

/**
* Name resolution is completed and the connection starts getting established. This method is only
* invoked on the streams that encounter such delay.
*
* </p>gRPC buffers the client call if the remote address and configurations, e.g. timeouts and
* retry policy, are not ready. Asynchronously gRPC internally does the name resolution to get
* this information. The streams that are processed immediately on ready transports by the time
* the RPC comes do not go through the pending process, thus this callback will not be invoked.
*/
public void createPendingStream() {
}

/**
* Headers has been sent to the socket.
*/
Expand Down
9 changes: 9 additions & 0 deletions census/src/main/java/io/grpc/census/CensusTracingModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ private final class ClientTracer extends ClientStreamTracer {
final Metadata.Key<SpanContext> tracingHeader;
final boolean isSampledToLocalTracing;
volatile int seqNo;
boolean isPendingStream;

ClientTracer(
Span span, Span parentSpan, Metadata.Key<SpanContext> tracingHeader,
Expand All @@ -315,6 +316,14 @@ public void streamCreated(Attributes transportAtts, Metadata headers) {
headers.discardAll(tracingHeader);
headers.put(tracingHeader, span.getContext());
}
if (isPendingStream) {
span.addAnnotation("Delayed LB pick complete");
}
}

@Override
public void createPendingStream() {
isPendingStream = true;
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ public void clientBasicTracingDefaultSpan() {
censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.createPendingStream();
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), eq(spyClientSpan));
Expand All @@ -767,6 +768,7 @@ public void clientBasicTracingDefaultSpan() {
.putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0));
inOrder.verify(spyAttemptSpan)
.putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false));
inOrder.verify(spyAttemptSpan).addAnnotation("Delayed LB pick complete");
inOrder.verify(spyAttemptSpan, times(2)).addMessageEvent(messageEventCaptor.capture());
List<MessageEvent> events = messageEventCaptor.getAllValues();
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ private PendingStream createPendingStream(
if (getPendingStreamsCount() == 1) {
syncContext.executeLater(reportTransportInUse);
}
for (ClientStreamTracer streamTracer : tracers) {
streamTracer.createPendingStream();
}
return pendingStream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public void streamCreated(Attributes transportAttrs, Metadata headers) {
delegate().streamCreated(transportAttrs, headers);
}

@Override
public void createPendingStream() {
delegate().createPendingStream();
}

@Override
public void outboundHeaders() {
delegate().outboundHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public void streamCreated(Attributes transportAttrs, Metadata headers) {
delegate().streamCreated(transportAttrs, headers);
}

@Override
public void createPendingStream() {
delegate().createPendingStream();
}

@Override
public void outboundHeaders() {
delegate().outboundHeaders();
Expand Down