Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/ClientStreamTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2861")
@ThreadSafe
public abstract class ClientStreamTracer extends StreamTracer {
/**
* The call was delayed due to waiting for name resolution result.
*/
public static final CallOptions.Key<Boolean> NAME_RESOLUTION_DELAYED =
CallOptions.Key.createWithDefault("io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED",
false);

/**
* The stream is being created on a ready transport.
Expand Down
4 changes: 4 additions & 0 deletions census/src/main/java/io/grpc/census/CensusTracingModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.census;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -269,6 +270,9 @@ public ClientStreamTracer newClientStreamTracer(
"previous-rpc-attempts", AttributeValue.longAttributeValue(info.getPreviousAttempts()));
attemptSpan.putAttribute(
"transparent-retry", AttributeValue.booleanAttributeValue(info.isTransparentRetry()));
if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED)) {
span.addAnnotation("Delayed name resolution complete");
}
return new ClientTracer(attemptSpan, span, tracingHeader, isSampledToLocalTracing);
}

Expand Down
5 changes: 4 additions & 1 deletion census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL;
Expand Down Expand Up @@ -132,7 +133,8 @@ public class CensusModulesTest {
private static final CallOptions CALL_OPTIONS =
CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue");
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
ClientStreamTracer.StreamInfo.newBuilder().build();
ClientStreamTracer.StreamInfo.newBuilder()
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, true)).build();

private static class StringInputStream extends InputStream {
final String string;
Expand Down Expand Up @@ -768,6 +770,7 @@ public void clientBasicTracingDefaultSpan() {
.putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0));
inOrder.verify(spyAttemptSpan)
.putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false));
inOrder.verify(spyClientSpan).addAnnotation("Delayed name resolution complete");
inOrder.verify(spyAttemptSpan).addAnnotation("Delayed LB pick complete");
inOrder.verify(spyAttemptSpan, times(2)).addMessageEvent(messageEventCaptor.capture());
List<MessageEvent> events = messageEventCaptor.getAllValues();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
Expand Down Expand Up @@ -1085,7 +1086,8 @@ void reprocess() {
ClientCall<ReqT, RespT> realCall;
Context previous = context.attach();
try {
realCall = newClientCall(method, callOptions);
CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED, true);
realCall = newClientCall(method, delayResolutionOption);
} finally {
context.detach(previous);
}
Expand Down
45 changes: 45 additions & 0 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
Expand Down Expand Up @@ -1071,6 +1072,50 @@ public void loadBalancerThrowsInHandleResolvedAddresses() {
verifyPanicMode(ex);
}

@Test
public void delayedNameResolution() {
ClientStream mockStream = mock(ClientStream.class);
final ClientStreamTracer tracer = new ClientStreamTracer() {};
ClientStreamTracer.Factory factory = new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return tracer;
}
};
FakeNameResolverFactory nsFactory = new FakeNameResolverFactory.Builder(expectedUri)
.setResolvedAtStart(false).build();
channelBuilder.nameResolverFactory(nsFactory);
createChannel();

CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());

nsFactory.allResolved();
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
ArgumentMatchers.<ClientStreamTracer[]>any()))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));

updateBalancingStateSafely(helper, READY, mockPicker);
assertEquals(2, executor.runDueTasks());

verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(
same(method), any(Metadata.class), callOptionsCaptor.capture(),
tracersCaptor.capture());
assertThat(Arrays.asList(tracersCaptor.getValue()).contains(tracer)).isTrue();
assertThat(callOptionsCaptor.getValue().getOption(NAME_RESOLUTION_DELAYED)).isTrue();
}

@Test
public void nameResolvedAfterChannelShutdown() {
// Delay the success of name resolution until allResolved() is called.
Expand Down