Skip to content

Commit

Permalink
Issue resilience4j#201 IncreaseInterval for SlowStartState added
Browse files Browse the repository at this point in the history
  • Loading branch information
hexmind committed Jul 19, 2023
1 parent d2b1e36 commit 3cfa94f
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ static <T> Supplier<Future<T>> decorateFuture(AdaptiveBulkhead bulkhead, Supplie
}
long start = bulkhead.getCurrentTimestamp();
try {
return new BulkheadFuture<T>(bulkhead, supplier.get(), start);
return new BulkheadFuture<>(bulkhead, supplier.get(), start);
} catch (Throwable e) {
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ public class AdaptiveBulkheadConfig implements Serializable {
private static final int DEFAULT_INCREASE_SUMMAND = 1;
private static final float DEFAULT_INCREASE_MULTIPLIER = 2f;
private static final float DEFAULT_DECREASE_MULTIPLIER = 0.5f;
private static final int DEFAULT_INCREASE_INTERVAL = 1;
private static final Function<Clock, Long> DEFAULT_TIMESTAMP_FUNCTION = clock -> System.nanoTime();
private static final TimeUnit DEFAULT_TIMESTAMP_UNIT = TimeUnit.NANOSECONDS;
private static final Predicate<Object> DEFAULT_RECORD_RESULT_PREDICATE = (Object object) -> false;
private static final boolean RESET_METRICS_ON_TRANSITION = false;
private static final boolean DEFAULT_RESET_METRICS_ON_TRANSITION = false;


@SuppressWarnings("unchecked")
Expand All @@ -83,14 +84,15 @@ public class AdaptiveBulkheadConfig implements Serializable {
private int minConcurrentCalls = DEFAULT_MIN_CONCURRENT_CALLS;
private int initialConcurrentCalls = DEFAULT_INITIAL_CONCURRENT_CALLS;
private int maxConcurrentCalls = DEFAULT_MAX_CONCURRENT_CALLS;
private int increaseSummand = DEFAULT_INCREASE_SUMMAND;
private int increaseAugend = DEFAULT_INCREASE_SUMMAND;
private float decreaseMultiplier = DEFAULT_DECREASE_MULTIPLIER;
private float increaseMultiplier = DEFAULT_INCREASE_MULTIPLIER;
private int increaseInterval = DEFAULT_INCREASE_INTERVAL;
private Duration maxWaitDuration = DEFAULT_MAX_WAIT_DURATION;
private transient Function<Clock, Long> currentTimestampFunction = DEFAULT_TIMESTAMP_FUNCTION;
private TimeUnit timestampUnit = DEFAULT_TIMESTAMP_UNIT;
private transient Predicate<Object> recordResultPredicate = DEFAULT_RECORD_RESULT_PREDICATE;
private boolean resetMetricsOnTransition = RESET_METRICS_ON_TRANSITION;
private boolean resetMetricsOnTransition = DEFAULT_RESET_METRICS_ON_TRANSITION;

private AdaptiveBulkheadConfig() {
}
Expand Down Expand Up @@ -144,8 +146,8 @@ public int getMaxConcurrentCalls() {
return maxConcurrentCalls;
}

public int getIncreaseSummand() {
return increaseSummand;
public int getIncreaseAugend() {
return increaseAugend;
}

public float getDecreaseMultiplier() {
Expand All @@ -156,6 +158,10 @@ public float getIncreaseMultiplier() {
return increaseMultiplier;
}

public int getIncreaseInterval() {
return increaseInterval;
}

public Duration getMaxWaitDuration() {
return maxWaitDuration;
}
Expand Down Expand Up @@ -191,7 +197,7 @@ public String toString() {
", minConcurrentCalls=" + minConcurrentCalls +
", initialConcurrentCalls=" + initialConcurrentCalls +
", maxConcurrentCalls=" + maxConcurrentCalls +
", increaseSummand=" + increaseSummand +
", increaseAugend=" + increaseAugend +
", decreaseMultiplier=" + decreaseMultiplier +
", increaseMultiplier=" + increaseMultiplier +
", maxWaitDuration=" + maxWaitDuration +
Expand Down Expand Up @@ -238,7 +244,7 @@ public static Builder builder(AdaptiveBulkheadConfig bulkheadConfig) {
return new Builder(bulkheadConfig);
}

public static class Builder {
public static class Builder implements Serializable {

private float failureRateThreshold = DEFAULT_FAILURE_RATE_THRESHOLD_PERCENTAGE;
private int minimumNumberOfCalls = DEFAULT_MINIMUM_NUMBER_OF_CALLS;
Expand All @@ -251,22 +257,23 @@ public static class Builder {
private int minConcurrentCalls = DEFAULT_MIN_CONCURRENT_CALLS;
private int maxConcurrentCalls = DEFAULT_MAX_CONCURRENT_CALLS;
private int initialConcurrentCalls = DEFAULT_INITIAL_CONCURRENT_CALLS;
private int increaseSummand = DEFAULT_INCREASE_SUMMAND;
private int increaseAugend = DEFAULT_INCREASE_SUMMAND;
private float decreaseMultiplier = DEFAULT_DECREASE_MULTIPLIER;
private float increaseMultiplier = DEFAULT_INCREASE_MULTIPLIER;
private int increaseInterval = DEFAULT_INCREASE_INTERVAL;
private Duration maxWaitDuration = DEFAULT_MAX_WAIT_DURATION;
@Nullable
private Predicate<Throwable> recordExceptionPredicate;
private transient Predicate<Throwable> recordExceptionPredicate;
@Nullable
private Predicate<Throwable> ignoreExceptionPredicate;
private transient Predicate<Throwable> ignoreExceptionPredicate;
@SuppressWarnings("unchecked")
private Class<? extends Throwable>[] recordExceptions = new Class[0];
@SuppressWarnings("unchecked")
private Class<? extends Throwable>[] ignoreExceptions = new Class[0];
private transient Function<Clock, Long> currentTimestampFunction = DEFAULT_TIMESTAMP_FUNCTION;
private TimeUnit timestampUnit = DEFAULT_TIMESTAMP_UNIT;
private Predicate<Object> recordResultPredicate = DEFAULT_RECORD_RESULT_PREDICATE;
private boolean resetMetricsOnTransition = RESET_METRICS_ON_TRANSITION;
private transient Predicate<Object> recordResultPredicate = DEFAULT_RECORD_RESULT_PREDICATE;
private boolean resetMetricsOnTransition = DEFAULT_RESET_METRICS_ON_TRANSITION;

private Builder() {
}
Expand All @@ -285,9 +292,10 @@ private Builder(AdaptiveBulkheadConfig baseConfig) {
this.minConcurrentCalls = baseConfig.minConcurrentCalls;
this.maxConcurrentCalls = baseConfig.maxConcurrentCalls;
this.initialConcurrentCalls = baseConfig.initialConcurrentCalls;
this.increaseSummand = baseConfig.increaseSummand;
this.increaseAugend = baseConfig.increaseAugend;
this.decreaseMultiplier = baseConfig.decreaseMultiplier;
this.increaseMultiplier = baseConfig.increaseMultiplier;
this.increaseInterval = baseConfig.increaseInterval;
this.currentTimestampFunction = baseConfig.currentTimestampFunction;
this.timestampUnit = baseConfig.timestampUnit;
this.recordResultPredicate = baseConfig.recordResultPredicate;
Expand All @@ -303,8 +311,8 @@ private Builder(AdaptiveBulkheadConfig baseConfig) {
* percentage which means that all recorded calls must be slower than {@link
* #slowCallDurationThreshold(Duration)}.
*
* @param slowCallRateThreshold the slow calls threshold in percentage
* @return the AdaptiveBulkheadConfig.Builder
* @param slowCallRateThreshold the slow calls' threshold in percentage
* @return the Builder
* @throws IllegalArgumentException if {@code slowCallRateThreshold <= 0 ||
* slowCallRateThreshold > 100}
*/
Expand All @@ -319,10 +327,10 @@ public Builder slowCallRateThreshold(float slowCallRateThreshold) {

/**
* Configures the duration threshold above which calls are considered as slow and increase
* the slow calls percentage. Default value is 60 seconds.
* the slow calls' percentage. Default value is 60 seconds.
*
* @param slowCallDurationThreshold the duration above which calls are considered as slow
* @return the AdaptiveBulkheadConfig.Builder
* @return the Builder
* @throws IllegalArgumentException if {@code slowCallDurationThreshold.toNanos() < 1}
*/
public Builder slowCallDurationThreshold(Duration slowCallDurationThreshold) {
Expand All @@ -344,7 +352,7 @@ public Builder slowCallDurationThreshold(Duration slowCallDurationThreshold) {
*
* @param slidingWindowSize the size of the sliding window when the AdaptiveBulkhead is
* closed.
* @return the AdaptiveBulkheadConfig.Builder
* @return the Builder
* @throws IllegalArgumentException if {@code slidingWindowSize < 1}
*/
public Builder slidingWindowSize(int slidingWindowSize) {
Expand All @@ -363,7 +371,7 @@ public Builder slidingWindowSize(int slidingWindowSize) {
*
* @param slidingWindowType the type of the sliding window. Either COUNT_BASED or
* TIME_BASED.
* @return the AdaptiveBulkheadConfig.Builder
* @return the Builder
*/
public Builder slidingWindowType(SlidingWindowType slidingWindowType) {
this.slidingWindowType = slidingWindowType;
Expand All @@ -377,7 +385,7 @@ public Builder slidingWindowType(SlidingWindowType slidingWindowType) {
* percentage.
*
* @param failureRateThreshold the failure rate threshold in percentage
* @return the AdaptiveBulkheadConfig.Builder
* @return the Builder
* @throws IllegalArgumentException if {@code failureRateThreshold <= 0 ||
* failureRateThreshold > 100}
*/
Expand Down Expand Up @@ -499,12 +507,12 @@ public final Builder initialConcurrentCalls(int initialConcurrentCalls) {
return this;
}

public final Builder increaseSummand(int increaseSummand) {
if (increaseSummand <= 0) {
public final Builder increaseAugend(int increaseAugend) {
if (increaseAugend <= 0) {
throw new IllegalArgumentException(
"increaseSummand must greater than 0");
"increaseAugend must greater than 0");
}
this.increaseSummand = increaseSummand;
this.increaseAugend = increaseAugend;
return this;
}

Expand All @@ -526,6 +534,21 @@ public final Builder increaseMultiplier(float increaseMultiplier) {
return this;
}

/**
* Increase concurrency limit every nth success in SlowStartState where n is a value of increaseInterval.
*
* @param increaseInterval minimum 1
* @return the Builder
*/
public final Builder increaseInterval(int increaseInterval) {
if (increaseInterval < 1) {
throw new IllegalArgumentException(
"increaseInterval must be at least 1");
}
this.increaseInterval = increaseInterval;
return this;
}

/**
* Configures a maximum amount of time which the calling thread will wait to enter the
* bulkhead. If bulkhead has space available, entry is guaranteed and immediate. If bulkhead
Expand All @@ -537,7 +560,7 @@ public final Builder increaseMultiplier(float increaseMultiplier) {
* most likely have a negative effect on application throughput.
*
* @param maxWaitDuration maximum wait time for bulkhead entry
* @return the BulkheadConfig.Builder
* @return the Builder
*/
public AdaptiveBulkheadConfig.Builder maxWaitDuration(Duration maxWaitDuration) {
if (maxWaitDuration.toMillis() < 0) {
Expand All @@ -556,7 +579,7 @@ public AdaptiveBulkheadConfig.Builder maxWaitDuration(Duration maxWaitDuration)
*
* @param currentTimestampFunction function that computes current timestamp.
* @param timeUnit TimeUnit of timestamp returned by the function.
* @return the AdaptiveBulkheadConfig.Builder
* @return the Builder
*/
public Builder currentTimestampFunction(Function<Clock, Long> currentTimestampFunction, TimeUnit timeUnit) {
this.timestampUnit = timeUnit;
Expand Down Expand Up @@ -597,9 +620,10 @@ public AdaptiveBulkheadConfig build() {
config.minConcurrentCalls = minConcurrentCalls;
config.maxConcurrentCalls = maxConcurrentCalls;
config.initialConcurrentCalls = initialConcurrentCalls;
config.increaseSummand = increaseSummand;
config.increaseAugend = increaseAugend;
config.decreaseMultiplier = decreaseMultiplier;
config.increaseMultiplier = increaseMultiplier;
config.increaseInterval = increaseInterval;
config.maxWaitDuration = maxWaitDuration;
config.recordExceptionPredicate = PredicateCreator
.createExceptionsPredicate(recordExceptionPredicate, recordExceptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class AdaptationCalculator {
*/
int increment() {
return fitToRange(
metrics.getMaxAllowedConcurrentCalls() + config.getIncreaseSummand());
metrics.getMaxAllowedConcurrentCalls() + config.getIncreaseAugend());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public AdaptiveBulkheadMetrics(AdaptiveBulkheadConfig adaptiveBulkheadConfig,
* @return the ThresholdResult
*/
public AdaptiveBulkheadState.ThresholdResult onSuccess(long nanoseconds) {
return record(nanoseconds, true);
return recordCall(nanoseconds, true);
}

/**
Expand All @@ -86,10 +86,10 @@ public AdaptiveBulkheadState.ThresholdResult onSuccess(long nanoseconds) {
* @return the ThresholdResult
*/
public AdaptiveBulkheadState.ThresholdResult onError(long nanoseconds) {
return record(nanoseconds, false);
return recordCall(nanoseconds, false);
}

private AdaptiveBulkheadState.ThresholdResult record(long nanoseconds, boolean success) {
private AdaptiveBulkheadState.ThresholdResult recordCall(long nanoseconds, boolean success) {
Outcome outcome = Outcome.of(nanoseconds > slowCallDurationThresholdInNanos, success);
Snapshot snapshot = slidingWindowMetrics.record(nanoseconds, TimeUnit.NANOSECONDS, outcome);
return thresholdsExcess(snapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ public void incrementLimit() {

@Override
public void increaseLimit() {
changeConcurrencyLimit(adaptationCalculator.increase());
if (metrics.getNumberOfBufferedCalls() % config.getIncreaseInterval() == 0) {
changeConcurrencyLimit(adaptationCalculator.increase());
} else {
LOG.trace("'{}' ignored an increase of concurrency limit", name);
}
}

@Override
Expand Down Expand Up @@ -266,6 +270,7 @@ public void onThresholdExcess(ThresholdResult thresholdResult) {
switch (thresholdResult) {
case BELOW_FAULT_RATE -> stateReference.get().onBelowThresholds();
case ABOVE_FAULT_RATE -> stateReference.get().onAboveThresholds();
case UNRELIABLE -> LOG.trace("cannot calculate the reliable error or slow calls rate");
}
}
}
Expand Down Expand Up @@ -297,7 +302,7 @@ private void tryPublishEvent(AdaptiveBulkheadEvent event) {
try {
eventProcessor.consumeEvent(event);
LOG.debug("'{}' published an event {}: {}", name, event.getEventType(), event);
} catch (Throwable t) {
} catch (Exception t) {
LOG.debug("'{}' consumer failure: Event {} not published:", name, event.getEventType(), t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,26 @@ public void testInvalidIncreaseMultiplierConfig() {
.hasMessage("increaseMultiplier must greater than 1");
}

@Test
public void testIncreaseIntervalConfig() {
int increaseInterval = 2;
assertThat(AdaptiveBulkheadConfig.custom()
.increaseInterval(increaseInterval)
.build()
.getIncreaseInterval())
.isEqualTo(increaseInterval)
.isNotEqualTo(DEFAULT_BULKHEAD.getIncreaseInterval());
}

@Test
public void testInvalidIncreaseIntervalConfig() {
assertThatThrownBy(() -> AdaptiveBulkheadConfig.custom()
.increaseInterval(0)
.build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("increaseInterval must be at least 1");
}

@Test
public void testSlowCallDurationThresholdConfig() {
assertThatThrownBy(() -> AdaptiveBulkheadConfig.custom()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void shouldReturnFailureWithBulkheadFullException() throws Exception {
AdaptiveBulkhead bulkhead = AdaptiveBulkhead.of("test", config);
bulkhead.tryAcquirePermission();
bulkhead.tryAcquirePermission();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isZero();
given(future.get()).willReturn("Hello world");
given(helloWorldService.returnHelloWorldFuture()).willReturn(future);
Supplier<Future<String>> supplier = AdaptiveBulkhead.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void shouldReturnTheCorrectName() {

@Test
public void testToString() {
assertThat(bulkhead.toString()).isEqualTo("AdaptiveBulkhead 'test'");
assertThat(bulkhead).hasToString("AdaptiveBulkhead 'test'");
}

@Test
Expand Down

0 comments on commit 3cfa94f

Please sign in to comment.