Skip to content

Commit

Permalink
core: add logger to OutlierDetectionLoadBalancer (#9880)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-matyukevich committed Feb 14, 2023
1 parent 3d4d46d commit 67d6600
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
60 changes: 52 additions & 8 deletions core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ConnectivityState;
Expand Down Expand Up @@ -73,23 +75,28 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
private ScheduledHandle detectionTimerHandle;
private Long detectionTimerStartNanos;

private final ChannelLogger logger;

private static final Attributes.Key<AddressTracker> ADDRESS_TRACKER_ATTR_KEY
= Attributes.Key.create("addressTrackerKey");

/**
* Creates a new instance of {@link OutlierDetectionLoadBalancer}.
*/
public OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider) {
logger = helper.getChannelLogger();
childHelper = new ChildHelper(checkNotNull(helper, "helper"));
switchLb = new GracefulSwitchLoadBalancer(childHelper);
trackerMap = new AddressTrackerMap();
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
this.timeProvider = timeProvider;
logger.log(ChannelLogLevel.DEBUG, "OutlierDetection lb created.");
}

@Override
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
OutlierDetectionLoadBalancerConfig config
= (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();

Expand Down Expand Up @@ -129,7 +136,7 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
trackerMap.resetCallCounters();
}

detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config),
detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config, logger),
initialDelayNanos, config.intervalNanos, NANOSECONDS, timeService);
} else if (detectionTimerHandle != null) {
// Outlier detection is not configured, but we have a lingering timer. Let's cancel it and
Expand Down Expand Up @@ -162,9 +169,11 @@ public void shutdown() {
class DetectionTimer implements Runnable {

OutlierDetectionLoadBalancerConfig config;
ChannelLogger logger;

DetectionTimer(OutlierDetectionLoadBalancerConfig config) {
DetectionTimer(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger) {
this.config = config;
this.logger = logger;
}

@Override
Expand All @@ -173,7 +182,7 @@ public void run() {

trackerMap.swapCounters();

for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config)) {
for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config, logger)) {
algo.ejectOutliers(trackerMap, detectionTimerStartNanos);
}

Expand Down Expand Up @@ -235,9 +244,11 @@ class OutlierDetectionSubchannel extends ForwardingSubchannel {
private boolean ejected;
private ConnectivityStateInfo lastSubchannelState;
private SubchannelStateListener subchannelStateListener;
private final ChannelLogger logger;

OutlierDetectionSubchannel(Subchannel delegate) {
this.delegate = delegate;
this.logger = delegate.getChannelLogger();
}

@Override
Expand Down Expand Up @@ -316,12 +327,14 @@ void eject() {
ejected = true;
subchannelStateListener.onSubchannelState(
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
logger.log(ChannelLogLevel.INFO, "Subchannel ejected: {0}", this);
}

void uneject() {
ejected = false;
if (lastSubchannelState != null) {
subchannelStateListener.onSubchannelState(lastSubchannelState);
logger.log(ChannelLogLevel.INFO, "Subchannel unejected: {0}", this);
}
}

Expand Down Expand Up @@ -353,6 +366,13 @@ public void onSubchannelState(ConnectivityStateInfo newState) {
}
}
}

@Override
public String toString() {
return "OutlierDetectionSubchannel{"
+ "addresses=" + delegate.getAllAddresses()
+ '}';
}
}


Expand Down Expand Up @@ -576,6 +596,13 @@ void reset() {
failureCount.set(0);
}
}

@Override
public String toString() {
return "AddressTracker{"
+ "subchannels=" + subchannels
+ '}';
}
}

/**
Expand Down Expand Up @@ -684,13 +711,14 @@ interface OutlierEjectionAlgorithm {

/** Builds a list of algorithms that are enabled in the given config. */
@Nullable
static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config) {
static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config,
ChannelLogger logger) {
ImmutableList.Builder<OutlierEjectionAlgorithm> algoListBuilder = ImmutableList.builder();
if (config.successRateEjection != null) {
algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config));
algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config, logger));
}
if (config.failurePercentageEjection != null) {
algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config));
algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config, logger));
}
return algoListBuilder.build();
}
Expand All @@ -705,9 +733,13 @@ static class SuccessRateOutlierEjectionAlgorithm implements OutlierEjectionAlgor

private final OutlierDetectionLoadBalancerConfig config;

SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config) {
private final ChannelLogger logger;

SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
ChannelLogger logger) {
checkArgument(config.successRateEjection != null, "success rate ejection config is null");
this.config = config;
this.logger = logger;
}

@Override
Expand Down Expand Up @@ -744,6 +776,11 @@ public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)

// If success rate is below the threshold, eject the address.
if (tracker.successRate() < requiredSuccessRate) {
logger.log(ChannelLogLevel.DEBUG,
"SuccessRate algorithm detected outlier: {0}. "
+ "Parameters: successRate={1}, mean={2}, stdev={3}, "
+ "requiredSuccessRate={4}",
tracker, tracker.successRate(), mean, stdev, requiredSuccessRate);
// Only eject some addresses based on the enforcement percentage.
if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) {
tracker.ejectSubchannels(ejectionTimeNanos);
Expand Down Expand Up @@ -781,8 +818,12 @@ static class FailurePercentageOutlierEjectionAlgorithm implements OutlierEjectio

private final OutlierDetectionLoadBalancerConfig config;

FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config) {
private final ChannelLogger logger;

FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
ChannelLogger logger) {
this.config = config;
this.logger = logger;
}

@Override
Expand Down Expand Up @@ -814,6 +855,9 @@ public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)
// If the failure rate is above the threshold, we should eject...
double maxFailureRate = ((double)config.failurePercentageEjection.threshold) / 100;
if (tracker.failureRate() > maxFailureRate) {
logger.log(ChannelLogLevel.DEBUG,
"FailurePercentage algorithm detected outlier: {0}, failureRate={1}",
tracker, tracker.failureRate());
// ...but only enforce this based on the enforcement percentage.
if (new Random().nextInt(100) < config.failurePercentageEjection.enforcementPercentage) {
tracker.ejectSubchannels(ejectionTimeNanos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.grpc.ChannelLogger;
import io.grpc.InternalServiceProviders;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
Expand Down Expand Up @@ -66,6 +67,9 @@ LoadBalancerProvider.class, getClass().getClassLoader())) {
@Test
public void providesLoadBalancer() {
Helper helper = mock(Helper.class);
ChannelLogger channelLogger = mock(ChannelLogger.class);

when(helper.getChannelLogger()).thenReturn(channelLogger);
when(helper.getSynchronizationContext()).thenReturn(syncContext);
when(helper.getScheduledExecutorService()).thenReturn(mock(ScheduledExecutorService.class));
assertThat(provider.newLoadBalancer(helper))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.grpc.ChannelLogger;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
Expand Down Expand Up @@ -165,6 +166,9 @@ public void setUp() {
subchannel4 = subchannelIterator.next();
subchannel5 = subchannelIterator.next();

ChannelLogger channelLogger = mock(ChannelLogger.class);

when(mockHelper.getChannelLogger()).thenReturn(channelLogger);
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
when(mockHelper.getScheduledExecutorService()).thenReturn(
fakeClock.getScheduledExecutorService());
Expand All @@ -174,6 +178,7 @@ public void setUp() {
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0];
final Subchannel subchannel = subchannels.get(args.getAddresses());
when(subchannel.getChannelLogger()).thenReturn(channelLogger);
when(subchannel.getAllAddresses()).thenReturn(args.getAddresses());
when(subchannel.getAttributes()).thenReturn(args.getAttributes());
doAnswer(new Answer<Void>() {
Expand Down

0 comments on commit 67d6600

Please sign in to comment.