Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: Outlier detection tracer delegation #10459

Merged
merged 3 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 33 additions & 25 deletions util/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -394,47 +394,55 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {

Subchannel subchannel = pickResult.getSubchannel();
if (subchannel != null) {
return PickResult.withSubchannel(subchannel,
new ResultCountingClientStreamTracerFactory(
subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY)));
return PickResult.withSubchannel(subchannel, new ResultCountingClientStreamTracerFactory(
subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY),
pickResult.getStreamTracerFactory()));
}

return pickResult;
}

/**
* Builds instances of {@link ResultCountingClientStreamTracer}.
* Builds instances of a {@link ClientStreamTracer} that increments the call count in the
* tracker for each closed stream.
*/
class ResultCountingClientStreamTracerFactory extends ClientStreamTracer.Factory {

private final AddressTracker tracker;

ResultCountingClientStreamTracerFactory(AddressTracker tracker) {
this.tracker = tracker;
}

@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return new ResultCountingClientStreamTracer(tracker);
}
}
@Nullable
private final ClientStreamTracer.Factory delegateFactory;

/**
* Counts the results (successful/unsuccessful) of a particular {@link
* OutlierDetectionSubchannel}s streams and increments the counter in the associated {@link
* AddressTracker}.
*/
class ResultCountingClientStreamTracer extends ClientStreamTracer {

AddressTracker tracker;

public ResultCountingClientStreamTracer(AddressTracker tracker) {
ResultCountingClientStreamTracerFactory(AddressTracker tracker,
@Nullable ClientStreamTracer.Factory delegateFactory) {
this.tracker = tracker;
this.delegateFactory = delegateFactory;
}

@Override
public void streamClosed(Status status) {
tracker.incrementCallCount(status.isOk());
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
if (delegateFactory != null) {
ClientStreamTracer delegateTracer = delegateFactory.newClientStreamTracer(info, headers);
return new ForwardingClientStreamTracer() {
@Override
protected ClientStreamTracer delegate() {
return delegateTracer;
}

@Override
public void streamClosed(Status status) {
tracker.incrementCallCount(status.isOk());
delegate().streamClosed(status);
}
};
} else {
return new ClientStreamTracer() {
@Override
public void streamClosed(Status status) {
tracker.incrementCallCount(status.isOk());
}
};
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LoadBalancerProvider;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
Expand Down Expand Up @@ -96,6 +97,10 @@ public class OutlierDetectionLoadBalancerTest {
private Helper mockHelper;
@Mock
private SocketAddress mockSocketAddress;
@Mock
private ClientStreamTracer.Factory mockStreamTracerFactory;
@Mock
private ClientStreamTracer mockStreamTracer;

@Captor
private ArgumentCaptor<ConnectivityState> connectivityStateCaptor;
Expand Down Expand Up @@ -193,6 +198,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}
});

// when(mockStreamTracerFactory.newClientStreamTracer(isA(StreamInfo.class),
// isA(Metadata.class))).thenReturn(mockStreamTracer);
sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
when(mockStreamTracerFactory.newClientStreamTracer(any(),
any())).thenReturn(mockStreamTracer);

loadBalancer = new OutlierDetectionLoadBalancer(mockHelper, fakeClock.getTimeProvider());
}

Expand Down Expand Up @@ -355,6 +365,38 @@ public void delegatePick() throws Exception {
readySubchannel);
}

/**
* Any ClientStreamTracer.Factory set by the delegate picker should still get used.
*/
@Test
sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
public void delegatePickTracerFactoryPreserved() throws Exception {
OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder()
.setSuccessRateEjection(new SuccessRateEjection.Builder().build())
.setChildPolicy(new PolicySelection(fakeLbProvider, null)).build();

loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers.get(0)));

// Make one of the subchannels READY.
final Subchannel readySubchannel = subchannels.values().iterator().next();
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));

verify(mockHelper, times(2)).updateBalancingState(stateCaptor.capture(),
pickerCaptor.capture());

// Make sure that we can pick the single READY subchannel.
SubchannelPicker picker = pickerCaptor.getAllValues().get(1);
PickResult pickResult = picker.pickSubchannel(mock(PickSubchannelArgs.class));

// Calls to a stream tracer created with the factory in the result should make it to a stream
// tracer the underlying LB/picker is using.
ClientStreamTracer clientStreamTracer = pickResult.getStreamTracerFactory()
.newClientStreamTracer(ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata());
clientStreamTracer.inboundHeaders();
// The underlying fake LB provider is configured with a factory that returns a mock stream
// tracer.
verify(mockStreamTracer).inboundHeaders();
}

/**
* The success rate algorithm leaves a healthy set of addresses alone.
*/
Expand Down Expand Up @@ -1121,7 +1163,7 @@ void assertEjectedSubchannels(Set<SocketAddress> addresses) {
}

/** Round robin like fake load balancer. */
private static final class FakeLoadBalancer extends LoadBalancer {
private final class FakeLoadBalancer extends LoadBalancer {
private final Helper helper;

List<Subchannel> subchannelList;
Expand Down Expand Up @@ -1159,7 +1201,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (lastPickIndex < 0 || lastPickIndex > subchannelList.size() - 1) {
lastPickIndex = 0;
}
return PickResult.withSubchannel(subchannelList.get(lastPickIndex++));
return PickResult.withSubchannel(subchannelList.get(lastPickIndex++),
mockStreamTracerFactory);
}
};
helper.updateBalancingState(state, picker);
Expand Down