From a46ae24be5dde4c38068a4eae1283510c5d3e294 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hexmind=20Tomasz=20Skowro=C5=84ski?= <5780288+hexmind@users.noreply.github.com> Date: Sat, 25 Mar 2023 15:35:46 +0100 Subject: [PATCH] Issue #201 BulkheadOnLimitIncreasedEvent, BulkheadOnLimitDecreasedEvent merged into one event --- .../bulkhead/adaptive/AdaptiveBulkhead.java | 5 +- .../adaptive/event/AdaptiveBulkheadEvent.java | 10 ++-- ....java => BulkheadOnLimitChangedEvent.java} | 17 ++++-- .../event/BulkheadOnLimitDecreasedEvent.java | 53 ------------------- .../event/BulkheadOnStateTransitionEvent.java | 4 -- .../AdaptiveBulkheadEventProcessor.java | 13 ++--- .../AdaptiveBulkheadStateMachine.java | 32 +++++------ .../AdaptiveBulkheadStateMachineTest.java | 50 +++++++---------- 8 files changed, 52 insertions(+), 132 deletions(-) rename resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/{BulkheadOnLimitIncreasedEvent.java => BulkheadOnLimitChangedEvent.java} (69%) delete mode 100644 resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitDecreasedEvent.java diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/AdaptiveBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/AdaptiveBulkhead.java index d88011b3a..e3c27146e 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/AdaptiveBulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/AdaptiveBulkhead.java @@ -647,10 +647,7 @@ interface Metrics extends Bulkhead.Metrics { */ interface AdaptiveEventPublisher extends EventPublisher { - // TODO Maybe we can replace these 2 events by 1 BulkheadOnLimitChangedEvent(oldValue, newValue) - EventPublisher onLimitIncreased(EventConsumer eventConsumer); - - EventPublisher onLimitDecreased(EventConsumer eventConsumer); + EventPublisher onLimitChanged(EventConsumer eventConsumer); EventPublisher onSuccess(EventConsumer eventConsumer); diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/AdaptiveBulkheadEvent.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/AdaptiveBulkheadEvent.java index 37237710c..6e8d65130 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/AdaptiveBulkheadEvent.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/AdaptiveBulkheadEvent.java @@ -34,15 +34,11 @@ public interface AdaptiveBulkheadEvent { */ enum Type { /** - * A AdaptiveBulkheadEvent which informs that a limit has been increased + * A AdaptiveBulkheadEvent which informs that a limit has been changed */ - LIMIT_INCREASED(false), + LIMIT_CHANGED(false), /** - * A AdaptiveBulkheadEvent which informs that a limit has been decreased - */ - LIMIT_DECREASED(false), - /** - * An adaptive bulkhead event which informs that an error has been recorded + * A AdaptiveBulkheadEvent which informs that a limit has been changed */ ERROR(false), /** diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitIncreasedEvent.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitChangedEvent.java similarity index 69% rename from resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitIncreasedEvent.java rename to resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitChangedEvent.java index 4f28d6a04..227367745 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitIncreasedEvent.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitChangedEvent.java @@ -21,32 +21,39 @@ import io.github.resilience4j.core.lang.NonNull; /** - * A BulkheadEvent which informs that a limit has been increased + * A BulkheadEvent which informs that a limit has been changed */ -public class BulkheadOnLimitIncreasedEvent extends AbstractAdaptiveBulkheadEvent { +public class BulkheadOnLimitChangedEvent extends AbstractAdaptiveBulkheadEvent { + private final int oldMaxConcurrentCalls; private final int newMaxConcurrentCalls; - public BulkheadOnLimitIncreasedEvent(String bulkheadName, int newMaxConcurrentCalls) { + public BulkheadOnLimitChangedEvent(String bulkheadName, int oldMaxConcurrentCalls, int newMaxConcurrentCalls) { super(bulkheadName); + this.oldMaxConcurrentCalls = oldMaxConcurrentCalls; this.newMaxConcurrentCalls = newMaxConcurrentCalls; } @NonNull @Override public Type getEventType() { - return Type.LIMIT_INCREASED; + return Type.LIMIT_CHANGED; } public int getNewMaxConcurrentCalls() { return newMaxConcurrentCalls; } + public boolean isIncrease() { + return newMaxConcurrentCalls > oldMaxConcurrentCalls; + } + @Override public String toString() { - return String.format("%s: Bulkhead '%s' recorded a limit increase: %s", + return String.format("%s: Bulkhead '%s' recorded a limit change from %s to %s", getCreationTime(), getBulkheadName(), + oldMaxConcurrentCalls, newMaxConcurrentCalls ); } diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitDecreasedEvent.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitDecreasedEvent.java deleted file mode 100644 index f4a16e152..000000000 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnLimitDecreasedEvent.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright 2019 Mahmoud Romeh - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * - */ -package io.github.resilience4j.bulkhead.adaptive.event; - -import io.github.resilience4j.core.lang.NonNull; - -/** - * A BulkheadEvent which informs that a limit has been decreased - */ -public class BulkheadOnLimitDecreasedEvent extends AbstractAdaptiveBulkheadEvent { - - private final int newMaxConcurrentCalls; - - public BulkheadOnLimitDecreasedEvent(String bulkheadName, int newMaxConcurrentCalls) { - super(bulkheadName); - this.newMaxConcurrentCalls = newMaxConcurrentCalls; - } - - @NonNull - @Override - public Type getEventType() { - return Type.LIMIT_DECREASED; - } - - public int getNewMaxConcurrentCalls() { - return newMaxConcurrentCalls; - } - - @Override - public String toString() { - return String.format("%s: Bulkhead '%s' recorded a limit decrease: %s", - getCreationTime(), - getBulkheadName(), - newMaxConcurrentCalls - ); - } -} diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnStateTransitionEvent.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnStateTransitionEvent.java index 6b35181ee..d240fa987 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnStateTransitionEvent.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/event/BulkheadOnStateTransitionEvent.java @@ -41,10 +41,6 @@ public Type getEventType() { return Type.STATE_TRANSITION; } - public AdaptiveBulkhead.State getFromState() { - return fromState; - } - public AdaptiveBulkhead.State getToState() { return toState; } diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadEventProcessor.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadEventProcessor.java index 0d39d0cb8..b332b3189 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadEventProcessor.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadEventProcessor.java @@ -10,16 +10,9 @@ class AdaptiveBulkheadEventProcessor extends EventProcessor { @Override - public EventPublisher onLimitIncreased( - EventConsumer eventConsumer) { - registerConsumer(BulkheadOnLimitIncreasedEvent.class.getName(), eventConsumer); - return this; - } - - @Override - public EventPublisher onLimitDecreased( - EventConsumer eventConsumer) { - registerConsumer(BulkheadOnLimitDecreasedEvent.class.getName(), eventConsumer); + public EventPublisher onLimitChanged( + EventConsumer eventConsumer) { + registerConsumer(BulkheadOnLimitChangedEvent.class.getName(), eventConsumer); return this; } diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadStateMachine.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadStateMachine.java index 2eeeda8dc..a7518dff5 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadStateMachine.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadStateMachine.java @@ -53,7 +53,7 @@ public class AdaptiveBulkheadStateMachine implements AdaptiveBulkhead { private final AdaptationCalculator adaptationCalculator; public AdaptiveBulkheadStateMachine(@NonNull String name, - @NonNull AdaptiveBulkheadConfig adaptiveBulkheadConfig) { + @NonNull AdaptiveBulkheadConfig adaptiveBulkheadConfig) { this.name = name; this.adaptiveBulkheadConfig = Objects .requireNonNull(adaptiveBulkheadConfig, "Config must not be null"); @@ -104,7 +104,7 @@ public void onSuccess(long duration, TimeUnit durationUnit) { @Override public void onError(long startTime, TimeUnit durationUnit, Throwable throwable) { if (adaptiveBulkheadConfig.getIgnoreExceptionPredicate().test(throwable)) { - releasePermission(); + releasePermission(); publishBulkheadEvent(new BulkheadOnIgnoreEvent( shortName(innerBulkhead), throwable)); } else if (startTime != 0 @@ -155,7 +155,7 @@ public void transitionToSlowStart() { } private void stateTransition(State newState, - UnaryOperator newStateGenerator) { + UnaryOperator newStateGenerator) { LOG.debug("stateTransition to {}", newState); AdaptiveBulkheadState previous = stateReference.getAndUpdate(newStateGenerator); publishBulkheadEvent(new BulkheadOnStateTransitionEvent( @@ -164,12 +164,11 @@ private void stateTransition(State newState, private void changeConcurrencyLimit(int newValue) { int oldValue = innerBulkhead.getBulkheadConfig().getMaxConcurrentCalls(); - if (newValue > oldValue) { + if (newValue != oldValue) { changeInternals(oldValue, newValue); - publishBulkheadOnLimitIncreasedEvent(newValue); - } else if (newValue < oldValue) { - changeInternals(oldValue, newValue); - publishBulkheadOnLimitDecreasedEvent(newValue); + publishBulkheadOnLimitChangedEvent(oldValue, newValue); + } else { + LOG.trace("change of concurrency limit is ignored"); } } @@ -184,16 +183,11 @@ private void changeInternals(int oldValue, int newValue) { } } - private void publishBulkheadOnLimitIncreasedEvent(int maxConcurrentCalls) { - publishBulkheadEvent(new BulkheadOnLimitIncreasedEvent( + private void publishBulkheadOnLimitChangedEvent(int oldMaxConcurrentCalls, int newMaxConcurrentCalls) { + publishBulkheadEvent(new BulkheadOnLimitChangedEvent( shortName(innerBulkhead), - maxConcurrentCalls)); - } - - private void publishBulkheadOnLimitDecreasedEvent(int maxConcurrentCalls) { - publishBulkheadEvent(new BulkheadOnLimitDecreasedEvent( - shortName(innerBulkhead), - maxConcurrentCalls)); + oldMaxConcurrentCalls, + newMaxConcurrentCalls)); } /** @@ -366,7 +360,7 @@ public AdaptiveBulkhead.State getState() { * Get metrics of the AdaptiveBulkhead */ @Override - public AdaptiveBulkheadMetrics getMetrics() { + public AdaptiveBulkheadMetrics getMetrics() { return adaptiveBulkheadMetrics; } @@ -380,7 +374,7 @@ public String toString() { /** * @param eventSupplier the event supplier to be pushed to consumers */ - private void publishBulkheadEvent(AdaptiveBulkheadEvent eventSupplier) { + private void publishBulkheadEvent(AdaptiveBulkheadEvent eventSupplier) { if (eventProcessor.hasConsumers()) { eventProcessor.consumeEvent(eventSupplier); } diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadStateMachineTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadStateMachineTest.java index 319837752..7b98185e4 100644 --- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadStateMachineTest.java +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/adaptive/internal/AdaptiveBulkheadStateMachineTest.java @@ -2,8 +2,7 @@ import io.github.resilience4j.bulkhead.adaptive.AdaptiveBulkhead; import io.github.resilience4j.bulkhead.adaptive.AdaptiveBulkheadConfig; -import io.github.resilience4j.bulkhead.adaptive.event.BulkheadOnLimitDecreasedEvent; -import io.github.resilience4j.bulkhead.adaptive.event.BulkheadOnLimitIncreasedEvent; +import io.github.resilience4j.bulkhead.adaptive.event.BulkheadOnLimitChangedEvent; import io.github.resilience4j.bulkhead.adaptive.event.BulkheadOnStateTransitionEvent; import org.junit.Before; import org.junit.Test; @@ -21,8 +20,7 @@ public class AdaptiveBulkheadStateMachineTest { public static final int RATE_THRESHOLD = 50; private AdaptiveBulkheadStateMachine bulkhead; - private final List limitIncreases = new LinkedList<>(); - private final List limitDecreases = new LinkedList<>(); + private final List limitChanges = new LinkedList<>(); private final List stateTransitions = new LinkedList<>(); @Before @@ -39,8 +37,7 @@ public void setup() { .slowCallDurationThreshold(Duration.ofMillis(SLOW_CALL_DURATION_THRESHOLD)) .build(); bulkhead = (AdaptiveBulkheadStateMachine) AdaptiveBulkhead.of("test", config); - bulkhead.getEventPublisher().onLimitIncreased(limitIncreases::add); - bulkhead.getEventPublisher().onLimitDecreased(limitDecreases::add); + bulkhead.getEventPublisher().onLimitChanged(limitChanges::add); bulkhead.getEventPublisher().onStateTransition(stateTransitions::add); } @@ -54,16 +51,13 @@ public void testTransitionToCongestionAvoidance() { for (int i = 0; i < 10; i++) { onError(failure); } - - assertThat(limitIncreases) - .extracting(BulkheadOnLimitIncreasedEvent::getNewMaxConcurrentCalls) - .containsExactly(24, 48, 96); - assertThat(limitDecreases) - .extracting(BulkheadOnLimitDecreasedEvent::getNewMaxConcurrentCalls) - .containsExactly(48, 24, 12); - assertThat(stateTransitions) - .extracting(BulkheadOnStateTransitionEvent::getFromState) - .containsExactly(AdaptiveBulkhead.State.SLOW_START); + + assertThat(limitChanges) + .extracting(BulkheadOnLimitChangedEvent::getNewMaxConcurrentCalls) + .containsExactly(24, 48, 96, 48, 24, 12); + assertThat(limitChanges) + .extracting(BulkheadOnLimitChangedEvent::isIncrease) + .containsExactly(true, true, true, false, false, false); assertThat(stateTransitions) .extracting(BulkheadOnStateTransitionEvent::getToState) .containsExactly(AdaptiveBulkhead.State.CONGESTION_AVOIDANCE); @@ -77,14 +71,12 @@ public void testCongestionAvoidanceBelowThresholds() { onSuccess(); } - assertThat(limitDecreases) - .isEmpty(); - assertThat(limitIncreases) - .extracting(BulkheadOnLimitIncreasedEvent::getNewMaxConcurrentCalls) + assertThat(limitChanges) + .extracting(BulkheadOnLimitChangedEvent::getNewMaxConcurrentCalls) .containsExactly(13, 14, 15); - assertThat(stateTransitions) - .extracting(BulkheadOnStateTransitionEvent::getFromState) - .containsExactly(AdaptiveBulkhead.State.SLOW_START); + assertThat(limitChanges) + .extracting(BulkheadOnLimitChangedEvent::isIncrease) + .containsExactly(true, true, true); assertThat(stateTransitions) .extracting(BulkheadOnStateTransitionEvent::getToState) .containsExactly(AdaptiveBulkhead.State.CONGESTION_AVOIDANCE); @@ -100,14 +92,12 @@ public void testCongestionAvoidanceAboveThresholds() { onError(failure); } - assertThat(limitIncreases) - .isEmpty(); - assertThat(limitDecreases) - .extracting(BulkheadOnLimitDecreasedEvent::getNewMaxConcurrentCalls) + assertThat(limitChanges) + .extracting(BulkheadOnLimitChangedEvent::getNewMaxConcurrentCalls) .containsExactly(6, 3, 2); - assertThat(stateTransitions) - .extracting(BulkheadOnStateTransitionEvent::getFromState) - .containsExactly(AdaptiveBulkhead.State.SLOW_START); + assertThat(limitChanges) + .extracting(BulkheadOnLimitChangedEvent::isIncrease) + .containsExactly(false, false, false); assertThat(stateTransitions) .extracting(BulkheadOnStateTransitionEvent::getToState) .containsExactly(AdaptiveBulkhead.State.CONGESTION_AVOIDANCE);