Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logger to OutlierDetectionLoadBalancer #9880

Merged
60 changes: 52 additions & 8 deletions core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ConnectivityState;
Expand Down Expand Up @@ -73,23 +75,28 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
private ScheduledHandle detectionTimerHandle;
private Long detectionTimerStartNanos;

private final ChannelLogger logger;

private static final Attributes.Key<AddressTracker> ADDRESS_TRACKER_ATTR_KEY
= Attributes.Key.create("addressTrackerKey");

/**
* Creates a new instance of {@link OutlierDetectionLoadBalancer}.
*/
public OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider) {
logger = helper.getChannelLogger();
childHelper = new ChildHelper(checkNotNull(helper, "helper"));
switchLb = new GracefulSwitchLoadBalancer(childHelper);
trackerMap = new AddressTrackerMap();
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
this.timeProvider = timeProvider;
logger.log(ChannelLogLevel.DEBUG, "OutlierDetection lb created.");
}

@Override
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
OutlierDetectionLoadBalancerConfig config
= (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();

Expand Down Expand Up @@ -129,7 +136,7 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
trackerMap.resetCallCounters();
}

detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config),
detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config, logger),
initialDelayNanos, config.intervalNanos, NANOSECONDS, timeService);
} else if (detectionTimerHandle != null) {
// Outlier detection is not configured, but we have a lingering timer. Let's cancel it and
Expand Down Expand Up @@ -162,9 +169,11 @@ public void shutdown() {
class DetectionTimer implements Runnable {

OutlierDetectionLoadBalancerConfig config;
ChannelLogger logger;

DetectionTimer(OutlierDetectionLoadBalancerConfig config) {
DetectionTimer(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger) {
this.config = config;
this.logger = logger;
}

@Override
Expand All @@ -173,7 +182,7 @@ public void run() {

trackerMap.swapCounters();

for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config)) {
for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config, logger)) {
algo.ejectOutliers(trackerMap, detectionTimerStartNanos);
}

Expand Down Expand Up @@ -235,9 +244,11 @@ class OutlierDetectionSubchannel extends ForwardingSubchannel {
private boolean ejected;
private ConnectivityStateInfo lastSubchannelState;
private SubchannelStateListener subchannelStateListener;
private final ChannelLogger logger;

OutlierDetectionSubchannel(Subchannel delegate) {
this.delegate = delegate;
this.logger = delegate.getChannelLogger();
}

@Override
Expand Down Expand Up @@ -316,12 +327,14 @@ void eject() {
ejected = true;
subchannelStateListener.onSubchannelState(
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
logger.log(ChannelLogLevel.INFO, "Subchannel ejected: {0}", this);
}

void uneject() {
ejected = false;
if (lastSubchannelState != null) {
subchannelStateListener.onSubchannelState(lastSubchannelState);
logger.log(ChannelLogLevel.INFO, "Subchannel unejected: {0}", this);
}
}

Expand Down Expand Up @@ -353,6 +366,13 @@ public void onSubchannelState(ConnectivityStateInfo newState) {
}
}
}

@Override
public String toString() {
return "OutlierDetectionSubchannel{"
+ "addresses=" + delegate.getAllAddresses()
+ '}';
}
}


Expand Down Expand Up @@ -576,6 +596,13 @@ void reset() {
failureCount.set(0);
}
}

@Override
public String toString() {
return "AddressTracker{"
+ "subchannels=" + subchannels
+ '}';
}
}

/**
Expand Down Expand Up @@ -684,13 +711,14 @@ interface OutlierEjectionAlgorithm {

/** Builds a list of algorithms that are enabled in the given config. */
@Nullable
static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config) {
static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config,
ChannelLogger logger) {
ImmutableList.Builder<OutlierEjectionAlgorithm> algoListBuilder = ImmutableList.builder();
if (config.successRateEjection != null) {
algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config));
algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config, logger));
}
if (config.failurePercentageEjection != null) {
algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config));
algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config, logger));
}
return algoListBuilder.build();
}
Expand All @@ -705,9 +733,13 @@ static class SuccessRateOutlierEjectionAlgorithm implements OutlierEjectionAlgor

private final OutlierDetectionLoadBalancerConfig config;

SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config) {
private final ChannelLogger logger;

SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
ChannelLogger logger) {
checkArgument(config.successRateEjection != null, "success rate ejection config is null");
this.config = config;
this.logger = logger;
}

@Override
Expand Down Expand Up @@ -744,6 +776,11 @@ public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)

// If success rate is below the threshold, eject the address.
if (tracker.successRate() < requiredSuccessRate) {
logger.log(ChannelLogLevel.DEBUG,
"SuccessRate algorithm detected outlier: {0}. "
+ "Parameters: successRate={1}, mean={2}, stdev={3}, "
+ "requiredSuccessRate={4}",
tracker, tracker.successRate(), mean, stdev, requiredSuccessRate);
// Only eject some addresses based on the enforcement percentage.
if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) {
tracker.ejectSubchannels(ejectionTimeNanos);
Expand Down Expand Up @@ -781,8 +818,12 @@ static class FailurePercentageOutlierEjectionAlgorithm implements OutlierEjectio

private final OutlierDetectionLoadBalancerConfig config;

FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config) {
private final ChannelLogger logger;

FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
ChannelLogger logger) {
this.config = config;
this.logger = logger;
}

@Override
Expand Down Expand Up @@ -814,6 +855,9 @@ public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)
// If the failure rate is above the threshold, we should eject...
double maxFailureRate = ((double)config.failurePercentageEjection.threshold) / 100;
if (tracker.failureRate() > maxFailureRate) {
logger.log(ChannelLogLevel.DEBUG,
"FailurePercentage algorithm detected outlier: {0}, failureRate={1}",
tracker, tracker.failureRate());
// ...but only enforce this based on the enforcement percentage.
if (new Random().nextInt(100) < config.failurePercentageEjection.enforcementPercentage) {
tracker.ejectSubchannels(ejectionTimeNanos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.grpc.ChannelLogger;
import io.grpc.InternalServiceProviders;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
Expand Down Expand Up @@ -66,6 +67,9 @@ LoadBalancerProvider.class, getClass().getClassLoader())) {
@Test
public void providesLoadBalancer() {
Helper helper = mock(Helper.class);
ChannelLogger channelLogger = mock(ChannelLogger.class);

when(helper.getChannelLogger()).thenReturn(channelLogger);
when(helper.getSynchronizationContext()).thenReturn(syncContext);
when(helper.getScheduledExecutorService()).thenReturn(mock(ScheduledExecutorService.class));
assertThat(provider.newLoadBalancer(helper))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.grpc.ChannelLogger;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
Expand Down Expand Up @@ -165,6 +166,9 @@ public void setUp() {
subchannel4 = subchannelIterator.next();
subchannel5 = subchannelIterator.next();

ChannelLogger channelLogger = mock(ChannelLogger.class);

when(mockHelper.getChannelLogger()).thenReturn(channelLogger);
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
when(mockHelper.getScheduledExecutorService()).thenReturn(
fakeClock.getScheduledExecutorService());
Expand All @@ -174,6 +178,7 @@ public void setUp() {
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0];
final Subchannel subchannel = subchannels.get(args.getAddresses());
when(subchannel.getChannelLogger()).thenReturn(channelLogger);
when(subchannel.getAllAddresses()).thenReturn(args.getAddresses());
when(subchannel.getAttributes()).thenReturn(args.getAttributes());
doAnswer(new Answer<Void>() {
Expand Down