Skip to content

Commit

Permalink
Issue resilience4j#201 BulkheadOnLimitIncreasedEvent, BulkheadOnLimit…
Browse files Browse the repository at this point in the history
…DecreasedEvent merged into one event
  • Loading branch information
hexmind authored and Tomasz Skowroński committed Jan 5, 2024
1 parent a4d0c7e commit a46ae24
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,10 +647,7 @@ interface Metrics extends Bulkhead.Metrics {
*/
interface AdaptiveEventPublisher extends EventPublisher<AdaptiveBulkheadEvent> {

// TODO Maybe we can replace these 2 events by 1 BulkheadOnLimitChangedEvent(oldValue, newValue)
EventPublisher onLimitIncreased(EventConsumer<BulkheadOnLimitIncreasedEvent> eventConsumer);

EventPublisher onLimitDecreased(EventConsumer<BulkheadOnLimitDecreasedEvent> eventConsumer);
EventPublisher onLimitChanged(EventConsumer<BulkheadOnLimitChangedEvent> eventConsumer);

EventPublisher onSuccess(EventConsumer<BulkheadOnSuccessEvent> eventConsumer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,11 @@ public interface AdaptiveBulkheadEvent {
*/
enum Type {
/**
* A AdaptiveBulkheadEvent which informs that a limit has been increased
* A AdaptiveBulkheadEvent which informs that a limit has been changed
*/
LIMIT_INCREASED(false),
LIMIT_CHANGED(false),
/**
* A AdaptiveBulkheadEvent which informs that a limit has been decreased
*/
LIMIT_DECREASED(false),
/**
* An adaptive bulkhead event which informs that an error has been recorded
* A AdaptiveBulkheadEvent which informs that a limit has been changed
*/
ERROR(false),
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,39 @@
import io.github.resilience4j.core.lang.NonNull;

/**
* A BulkheadEvent which informs that a limit has been increased
* A BulkheadEvent which informs that a limit has been changed
*/
public class BulkheadOnLimitIncreasedEvent extends AbstractAdaptiveBulkheadEvent {
public class BulkheadOnLimitChangedEvent extends AbstractAdaptiveBulkheadEvent {

private final int oldMaxConcurrentCalls;
private final int newMaxConcurrentCalls;

public BulkheadOnLimitIncreasedEvent(String bulkheadName, int newMaxConcurrentCalls) {
public BulkheadOnLimitChangedEvent(String bulkheadName, int oldMaxConcurrentCalls, int newMaxConcurrentCalls) {
super(bulkheadName);
this.oldMaxConcurrentCalls = oldMaxConcurrentCalls;
this.newMaxConcurrentCalls = newMaxConcurrentCalls;
}

@NonNull
@Override
public Type getEventType() {
return Type.LIMIT_INCREASED;
return Type.LIMIT_CHANGED;
}

public int getNewMaxConcurrentCalls() {
return newMaxConcurrentCalls;
}

public boolean isIncrease() {
return newMaxConcurrentCalls > oldMaxConcurrentCalls;
}

@Override
public String toString() {
return String.format("%s: Bulkhead '%s' recorded a limit increase: %s",
return String.format("%s: Bulkhead '%s' recorded a limit change from %s to %s",
getCreationTime(),
getBulkheadName(),
oldMaxConcurrentCalls,
newMaxConcurrentCalls
);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ public Type getEventType() {
return Type.STATE_TRANSITION;
}

public AdaptiveBulkhead.State getFromState() {
return fromState;
}

public AdaptiveBulkhead.State getToState() {
return toState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,9 @@ class AdaptiveBulkheadEventProcessor extends EventProcessor<AdaptiveBulkheadEven
AdaptiveBulkhead.AdaptiveEventPublisher, EventConsumer<AdaptiveBulkheadEvent> {

@Override
public EventPublisher<?> onLimitIncreased(
EventConsumer<BulkheadOnLimitIncreasedEvent> eventConsumer) {
registerConsumer(BulkheadOnLimitIncreasedEvent.class.getName(), eventConsumer);
return this;
}

@Override
public EventPublisher<?> onLimitDecreased(
EventConsumer<BulkheadOnLimitDecreasedEvent> eventConsumer) {
registerConsumer(BulkheadOnLimitDecreasedEvent.class.getName(), eventConsumer);
public EventPublisher<?> onLimitChanged(
EventConsumer<BulkheadOnLimitChangedEvent> eventConsumer) {
registerConsumer(BulkheadOnLimitChangedEvent.class.getName(), eventConsumer);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class AdaptiveBulkheadStateMachine implements AdaptiveBulkhead {
private final AdaptationCalculator adaptationCalculator;

public AdaptiveBulkheadStateMachine(@NonNull String name,
@NonNull AdaptiveBulkheadConfig adaptiveBulkheadConfig) {
@NonNull AdaptiveBulkheadConfig adaptiveBulkheadConfig) {
this.name = name;
this.adaptiveBulkheadConfig = Objects
.requireNonNull(adaptiveBulkheadConfig, "Config must not be null");
Expand Down Expand Up @@ -104,7 +104,7 @@ public void onSuccess(long duration, TimeUnit durationUnit) {
@Override
public void onError(long startTime, TimeUnit durationUnit, Throwable throwable) {
if (adaptiveBulkheadConfig.getIgnoreExceptionPredicate().test(throwable)) {
releasePermission();
releasePermission();
publishBulkheadEvent(new BulkheadOnIgnoreEvent(
shortName(innerBulkhead), throwable));
} else if (startTime != 0
Expand Down Expand Up @@ -155,7 +155,7 @@ public void transitionToSlowStart() {
}

private void stateTransition(State newState,
UnaryOperator<AdaptiveBulkheadState> newStateGenerator) {
UnaryOperator<AdaptiveBulkheadState> newStateGenerator) {
LOG.debug("stateTransition to {}", newState);
AdaptiveBulkheadState previous = stateReference.getAndUpdate(newStateGenerator);
publishBulkheadEvent(new BulkheadOnStateTransitionEvent(
Expand All @@ -164,12 +164,11 @@ private void stateTransition(State newState,

private void changeConcurrencyLimit(int newValue) {
int oldValue = innerBulkhead.getBulkheadConfig().getMaxConcurrentCalls();
if (newValue > oldValue) {
if (newValue != oldValue) {
changeInternals(oldValue, newValue);
publishBulkheadOnLimitIncreasedEvent(newValue);
} else if (newValue < oldValue) {
changeInternals(oldValue, newValue);
publishBulkheadOnLimitDecreasedEvent(newValue);
publishBulkheadOnLimitChangedEvent(oldValue, newValue);
} else {
LOG.trace("change of concurrency limit is ignored");
}
}

Expand All @@ -184,16 +183,11 @@ private void changeInternals(int oldValue, int newValue) {
}
}

private void publishBulkheadOnLimitIncreasedEvent(int maxConcurrentCalls) {
publishBulkheadEvent(new BulkheadOnLimitIncreasedEvent(
private void publishBulkheadOnLimitChangedEvent(int oldMaxConcurrentCalls, int newMaxConcurrentCalls) {
publishBulkheadEvent(new BulkheadOnLimitChangedEvent(
shortName(innerBulkhead),
maxConcurrentCalls));
}

private void publishBulkheadOnLimitDecreasedEvent(int maxConcurrentCalls) {
publishBulkheadEvent(new BulkheadOnLimitDecreasedEvent(
shortName(innerBulkhead),
maxConcurrentCalls));
oldMaxConcurrentCalls,
newMaxConcurrentCalls));
}

/**
Expand Down Expand Up @@ -366,7 +360,7 @@ public AdaptiveBulkhead.State getState() {
* Get metrics of the AdaptiveBulkhead
*/
@Override
public AdaptiveBulkheadMetrics getMetrics() {
public AdaptiveBulkheadMetrics getMetrics() {
return adaptiveBulkheadMetrics;
}

Expand All @@ -380,7 +374,7 @@ public String toString() {
/**
* @param eventSupplier the event supplier to be pushed to consumers
*/
private void publishBulkheadEvent(AdaptiveBulkheadEvent eventSupplier) {
private void publishBulkheadEvent(AdaptiveBulkheadEvent eventSupplier) {
if (eventProcessor.hasConsumers()) {
eventProcessor.consumeEvent(eventSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import io.github.resilience4j.bulkhead.adaptive.AdaptiveBulkhead;
import io.github.resilience4j.bulkhead.adaptive.AdaptiveBulkheadConfig;
import io.github.resilience4j.bulkhead.adaptive.event.BulkheadOnLimitDecreasedEvent;
import io.github.resilience4j.bulkhead.adaptive.event.BulkheadOnLimitIncreasedEvent;
import io.github.resilience4j.bulkhead.adaptive.event.BulkheadOnLimitChangedEvent;
import io.github.resilience4j.bulkhead.adaptive.event.BulkheadOnStateTransitionEvent;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -21,8 +20,7 @@ public class AdaptiveBulkheadStateMachineTest {
public static final int RATE_THRESHOLD = 50;

private AdaptiveBulkheadStateMachine bulkhead;
private final List<BulkheadOnLimitIncreasedEvent> limitIncreases = new LinkedList<>();
private final List<BulkheadOnLimitDecreasedEvent> limitDecreases = new LinkedList<>();
private final List<BulkheadOnLimitChangedEvent> limitChanges = new LinkedList<>();
private final List<BulkheadOnStateTransitionEvent> stateTransitions = new LinkedList<>();

@Before
Expand All @@ -39,8 +37,7 @@ public void setup() {
.slowCallDurationThreshold(Duration.ofMillis(SLOW_CALL_DURATION_THRESHOLD))
.build();
bulkhead = (AdaptiveBulkheadStateMachine) AdaptiveBulkhead.of("test", config);
bulkhead.getEventPublisher().onLimitIncreased(limitIncreases::add);
bulkhead.getEventPublisher().onLimitDecreased(limitDecreases::add);
bulkhead.getEventPublisher().onLimitChanged(limitChanges::add);
bulkhead.getEventPublisher().onStateTransition(stateTransitions::add);
}

Expand All @@ -54,16 +51,13 @@ public void testTransitionToCongestionAvoidance() {
for (int i = 0; i < 10; i++) {
onError(failure);
}

assertThat(limitIncreases)
.extracting(BulkheadOnLimitIncreasedEvent::getNewMaxConcurrentCalls)
.containsExactly(24, 48, 96);
assertThat(limitDecreases)
.extracting(BulkheadOnLimitDecreasedEvent::getNewMaxConcurrentCalls)
.containsExactly(48, 24, 12);
assertThat(stateTransitions)
.extracting(BulkheadOnStateTransitionEvent::getFromState)
.containsExactly(AdaptiveBulkhead.State.SLOW_START);

assertThat(limitChanges)
.extracting(BulkheadOnLimitChangedEvent::getNewMaxConcurrentCalls)
.containsExactly(24, 48, 96, 48, 24, 12);
assertThat(limitChanges)
.extracting(BulkheadOnLimitChangedEvent::isIncrease)
.containsExactly(true, true, true, false, false, false);
assertThat(stateTransitions)
.extracting(BulkheadOnStateTransitionEvent::getToState)
.containsExactly(AdaptiveBulkhead.State.CONGESTION_AVOIDANCE);
Expand All @@ -77,14 +71,12 @@ public void testCongestionAvoidanceBelowThresholds() {
onSuccess();
}

assertThat(limitDecreases)
.isEmpty();
assertThat(limitIncreases)
.extracting(BulkheadOnLimitIncreasedEvent::getNewMaxConcurrentCalls)
assertThat(limitChanges)
.extracting(BulkheadOnLimitChangedEvent::getNewMaxConcurrentCalls)
.containsExactly(13, 14, 15);
assertThat(stateTransitions)
.extracting(BulkheadOnStateTransitionEvent::getFromState)
.containsExactly(AdaptiveBulkhead.State.SLOW_START);
assertThat(limitChanges)
.extracting(BulkheadOnLimitChangedEvent::isIncrease)
.containsExactly(true, true, true);
assertThat(stateTransitions)
.extracting(BulkheadOnStateTransitionEvent::getToState)
.containsExactly(AdaptiveBulkhead.State.CONGESTION_AVOIDANCE);
Expand All @@ -100,14 +92,12 @@ public void testCongestionAvoidanceAboveThresholds() {
onError(failure);
}

assertThat(limitIncreases)
.isEmpty();
assertThat(limitDecreases)
.extracting(BulkheadOnLimitDecreasedEvent::getNewMaxConcurrentCalls)
assertThat(limitChanges)
.extracting(BulkheadOnLimitChangedEvent::getNewMaxConcurrentCalls)
.containsExactly(6, 3, 2);
assertThat(stateTransitions)
.extracting(BulkheadOnStateTransitionEvent::getFromState)
.containsExactly(AdaptiveBulkhead.State.SLOW_START);
assertThat(limitChanges)
.extracting(BulkheadOnLimitChangedEvent::isIncrease)
.containsExactly(false, false, false);
assertThat(stateTransitions)
.extracting(BulkheadOnStateTransitionEvent::getToState)
.containsExactly(AdaptiveBulkhead.State.CONGESTION_AVOIDANCE);
Expand Down

0 comments on commit a46ae24

Please sign in to comment.