Skip to content

Commit

Permalink
Issue resilience4j#201 AdaptiveBulkhead states simplified (resilience…
Browse files Browse the repository at this point in the history
  • Loading branch information
hexmind committed Jul 19, 2023
1 parent 993b8a1 commit d2b1e36
Show file tree
Hide file tree
Showing 15 changed files with 523 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public class AdaptiveBulkheadConfig implements Serializable {
private static final float DEFAULT_DECREASE_MULTIPLIER = 0.5f;
private static final Function<Clock, Long> DEFAULT_TIMESTAMP_FUNCTION = clock -> System.nanoTime();
private static final TimeUnit DEFAULT_TIMESTAMP_UNIT = TimeUnit.NANOSECONDS;
private transient Predicate<Object> recordResultPredicate = DEFAULT_RECORD_RESULT_PREDICATE;
private static final Predicate<Object> DEFAULT_RECORD_RESULT_PREDICATE = (Object object) -> false;
private static final boolean RESET_METRICS_ON_TRANSITION = false;


@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -88,7 +89,8 @@ public class AdaptiveBulkheadConfig implements Serializable {
private Duration maxWaitDuration = DEFAULT_MAX_WAIT_DURATION;
private transient Function<Clock, Long> currentTimestampFunction = DEFAULT_TIMESTAMP_FUNCTION;
private TimeUnit timestampUnit = DEFAULT_TIMESTAMP_UNIT;
private static final Predicate<Object> DEFAULT_RECORD_RESULT_PREDICATE = (Object object) -> false;
private transient Predicate<Object> recordResultPredicate = DEFAULT_RECORD_RESULT_PREDICATE;
private boolean resetMetricsOnTransition = RESET_METRICS_ON_TRANSITION;

private AdaptiveBulkheadConfig() {
}
Expand Down Expand Up @@ -170,6 +172,10 @@ public Predicate<Object> getRecordResultPredicate() {
return recordResultPredicate;
}

public boolean isResetMetricsOnTransition() {
return resetMetricsOnTransition;
}

@Override
public String toString() {
return "AdaptiveBulkheadConfig{" +
Expand Down Expand Up @@ -260,6 +266,7 @@ public static class Builder {
private transient Function<Clock, Long> currentTimestampFunction = DEFAULT_TIMESTAMP_FUNCTION;
private TimeUnit timestampUnit = DEFAULT_TIMESTAMP_UNIT;
private Predicate<Object> recordResultPredicate = DEFAULT_RECORD_RESULT_PREDICATE;
private boolean resetMetricsOnTransition = RESET_METRICS_ON_TRANSITION;

private Builder() {
}
Expand All @@ -284,6 +291,7 @@ private Builder(AdaptiveBulkheadConfig baseConfig) {
this.currentTimestampFunction = baseConfig.currentTimestampFunction;
this.timestampUnit = baseConfig.timestampUnit;
this.recordResultPredicate = baseConfig.recordResultPredicate;
this.resetMetricsOnTransition = baseConfig.resetMetricsOnTransition;
}

/**
Expand Down Expand Up @@ -570,6 +578,11 @@ public Builder recordResult(Predicate<Object> recordResultPredicate) {
return this;
}

public Builder resetMetricsOnTransition(boolean resetMetricsOnTransition) {
this.resetMetricsOnTransition = resetMetricsOnTransition;
return this;
}

public AdaptiveBulkheadConfig build() {
AdaptiveBulkheadConfig config = new AdaptiveBulkheadConfig();
config.slidingWindowType = slidingWindowType;
Expand Down Expand Up @@ -597,6 +610,7 @@ public AdaptiveBulkheadConfig build() {
config.currentTimestampFunction = currentTimestampFunction;
config.timestampUnit = timestampUnit;
config.recordResultPredicate = recordResultPredicate;
config.resetMetricsOnTransition = resetMetricsOnTransition;
return config;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.github.resilience4j.bulkhead.adaptive.internal;

import java.util.concurrent.atomic.AtomicBoolean;

final class Activity {

private final AtomicBoolean active = new AtomicBoolean(true);

boolean isActive() {
return active.get();
}

boolean tryDeactivate() {
return active.compareAndSet(true, false);
}

@Override
public String toString() {
return "Activity{" +
"active=" + active +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

class AdaptationCalculator {

private final AdaptiveBulkhead adaptiveBulkhead;
private final AdaptiveBulkheadConfig config;
private final AdaptiveBulkhead.Metrics metrics;

AdaptationCalculator(AdaptiveBulkhead adaptiveBulkhead) {
this.adaptiveBulkhead = adaptiveBulkhead;
AdaptationCalculator(AdaptiveBulkheadConfig config, AdaptiveBulkhead.Metrics metrics) {
this.config = config;
this.metrics = metrics;
}

/**
Expand All @@ -36,7 +38,7 @@ class AdaptationCalculator {
*/
int increment() {
return fitToRange(
getActualConcurrencyLimit() + getConfig().getIncreaseSummand());
metrics.getMaxAllowedConcurrentCalls() + config.getIncreaseSummand());
}

/**
Expand All @@ -46,7 +48,7 @@ int increment() {
*/
int increase() {
return fitToRange(
(int) (getActualConcurrencyLimit() * getConfig().getIncreaseMultiplier()));
(int) Math.ceil(metrics.getMaxAllowedConcurrentCalls() * config.getIncreaseMultiplier()));
}

/**
Expand All @@ -56,19 +58,14 @@ int increase() {
*/
int decrease() {
return fitToRange(
(int) (getActualConcurrencyLimit() * getConfig().getDecreaseMultiplier()));
(int) Math.floor(metrics.getMaxAllowedConcurrentCalls() * config.getDecreaseMultiplier()));
}

private AdaptiveBulkheadConfig getConfig() {
return adaptiveBulkhead.getBulkheadConfig();
}

private int getActualConcurrencyLimit() {
return adaptiveBulkhead.getMetrics().getMaxAllowedConcurrentCalls();
}

private int fitToRange(int concurrencyLimitProposal) {
return Math.min(getConfig().getMaxConcurrentCalls(),
Math.max(getConfig().getMinConcurrentCalls(), concurrencyLimitProposal));
private int fitToRange(int proposal) {
if (proposal < config.getMinConcurrentCalls()) {
return config.getMinConcurrentCalls();
} else {
return Math.min(proposal, config.getMaxConcurrentCalls());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ class AdaptiveBulkheadMetrics implements AdaptiveBulkhead.Metrics {
private final float slowCallRateThreshold;
private final long slowCallDurationThresholdInNanos;
private final int minimumNumberOfCalls;
private final Bulkhead.Metrics internalBulkheadMetrics;
private final Bulkhead.Metrics innerMetrics;

private AdaptiveBulkheadMetrics(int slidingWindowSize,
AdaptiveBulkheadConfig.SlidingWindowType slidingWindowType,
AdaptiveBulkheadConfig adaptiveBulkheadConfig,
Bulkhead.Metrics internalBulkheadMetrics,
Bulkhead.Metrics innerMetrics,
Clock clock) {
if (slidingWindowType == AdaptiveBulkheadConfig.SlidingWindowType.COUNT_BASED) {
this.slidingWindowMetrics = new FixedSizeSlidingWindowMetrics(slidingWindowSize);
Expand All @@ -58,57 +58,56 @@ private AdaptiveBulkheadMetrics(int slidingWindowSize,
this.slowCallRateThreshold = adaptiveBulkheadConfig.getSlowCallRateThreshold();
this.slowCallDurationThresholdInNanos = adaptiveBulkheadConfig
.getSlowCallDurationThreshold().toNanos();
this.internalBulkheadMetrics = internalBulkheadMetrics;
this.innerMetrics = innerMetrics;
}

public AdaptiveBulkheadMetrics(AdaptiveBulkheadConfig adaptiveBulkheadConfig,
Bulkhead.Metrics internalBulkheadMetrics,
Clock clock) {
Bulkhead.Metrics innerMetrics,
Clock clock) {
this(adaptiveBulkheadConfig.getSlidingWindowSize(),
adaptiveBulkheadConfig.getSlidingWindowType(),
adaptiveBulkheadConfig,
internalBulkheadMetrics,
innerMetrics,
clock);
}

/**
* Records a successful call and checks if the thresholds are exceeded.
* Records a successful call
*
* @return the result of the check
* @return the ThresholdResult
*/
public Result onSuccess(long nanoseconds) {
return checkIfThresholdsExceeded(record(nanoseconds, true));
public AdaptiveBulkheadState.ThresholdResult onSuccess(long nanoseconds) {
return record(nanoseconds, true);
}

/**
* Records a failed call and checks if the thresholds are exceeded.
* Records a failed call
*
* @return the result of the check
* @return the ThresholdResult
*/
public Result onError(long nanoseconds) {
return checkIfThresholdsExceeded(record(nanoseconds, false));
public AdaptiveBulkheadState.ThresholdResult onError(long nanoseconds) {
return record(nanoseconds, false);
}

private Snapshot record(long nanoseconds, boolean success) {
boolean slow = nanoseconds > slowCallDurationThresholdInNanos;
return slidingWindowMetrics.record(nanoseconds, TimeUnit.NANOSECONDS, Outcome.of(slow, success));
private AdaptiveBulkheadState.ThresholdResult record(long nanoseconds, boolean success) {
Outcome outcome = Outcome.of(nanoseconds > slowCallDurationThresholdInNanos, success);
Snapshot snapshot = slidingWindowMetrics.record(nanoseconds, TimeUnit.NANOSECONDS, outcome);
return thresholdsExcess(snapshot);
}

/**
* Checks if the failure rate is above the threshold or if the slow calls percentage is above
* the threshold.
* Checks if the failure rate is above the threshold or if the slow calls' percentage is above the threshold.
*
* @param snapshot a metrics snapshot
* @return false, if the thresholds haven't been exceeded.
* @return thresholds haven been exceeded
*/
private Result checkIfThresholdsExceeded(Snapshot snapshot) {
private AdaptiveBulkheadState.ThresholdResult thresholdsExcess(Snapshot snapshot) {
if (isBelowMinimumNumberOfCalls(snapshot)) {
return Result.UNRELIABLE_THRESHOLDS;
} else if (snapshot.getFailureRate() >= failureRateThreshold
|| snapshot.getSlowCallRate() >= slowCallRateThreshold) {
return Result.ABOVE_THRESHOLDS;
return AdaptiveBulkheadState.ThresholdResult.UNRELIABLE;
} else if (isAboveFaultRate(snapshot)) {
return AdaptiveBulkheadState.ThresholdResult.ABOVE_FAULT_RATE;
} else {
return Result.BELOW_THRESHOLDS;
return AdaptiveBulkheadState.ThresholdResult.BELOW_FAULT_RATE;
}
}

Expand All @@ -117,6 +116,11 @@ private boolean isBelowMinimumNumberOfCalls(Snapshot snapshot) {
|| snapshot.getTotalNumberOfCalls() < minimumNumberOfCalls;
}

private boolean isAboveFaultRate(Snapshot snapshot) {
return snapshot.getFailureRate() >= failureRateThreshold
|| snapshot.getSlowCallRate() >= slowCallRateThreshold;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -185,30 +189,15 @@ Snapshot getSnapshot() {
*/
@Override
public int getAvailableConcurrentCalls() {
return internalBulkheadMetrics.getAvailableConcurrentCalls();
return innerMetrics.getAvailableConcurrentCalls();
}

/**
* {@inheritDoc}
*/
@Override
public int getMaxAllowedConcurrentCalls() {
return internalBulkheadMetrics.getMaxAllowedConcurrentCalls();
}

enum Result {
/**
* Is below the error or slow calls rate.
*/
BELOW_THRESHOLDS,
/**
* Is above the error or slow calls rate.
*/
ABOVE_THRESHOLDS,
/**
* Is below minimum number of calls which are required (per sliding window period) before
* the Adaptive Bulkhead can calculate the reliable error or slow calls rate.
*/
UNRELIABLE_THRESHOLDS
return innerMetrics.getMaxAllowedConcurrentCalls();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,32 @@

import io.github.resilience4j.bulkhead.adaptive.AdaptiveBulkhead;

import java.util.concurrent.TimeUnit;

interface AdaptiveBulkheadState {

boolean tryAcquirePermission();
enum ThresholdResult {

/**
* Is below the error or slow calls rate.
*/
BELOW_FAULT_RATE,

void acquirePermission();
/**
* Is above the error or slow calls rate.
*/
ABOVE_FAULT_RATE,

void releasePermission();
/**
* Is below minimum number of calls which are required (per sliding window period) before
* the Adaptive Bulkhead can calculate the reliable error or slow calls rate.
*/
UNRELIABLE
}

void onError(long startTime, TimeUnit timeUnit, Throwable throwable);
void onBelowThresholds();

void onSuccess(long startTime, TimeUnit timeUnit);
void onAboveThresholds();

AdaptiveBulkhead.State getState();

boolean isActive();
}
Loading

0 comments on commit d2b1e36

Please sign in to comment.