diff --git a/CHANGELOG.md b/CHANGELOG.md index 4884116cd9..8cbfe31c16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ ## [Unreleased] -### ... +### Bugfixes + +#### ([1038](https://github.com/allegro/hermes/pull/1038)) Fix to filtering duplicated signals ## 1.0.3 (10.06.2019) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilter.java index 4209f89c17..ff2b56f963 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilter.java @@ -5,11 +5,9 @@ import pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType; import java.time.Clock; -import java.util.Collections; -import java.util.LinkedHashMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; class SignalsFilter { @@ -27,14 +25,14 @@ class SignalsFilter { this.clock = clock; } - Set filterSignals(List signals) { - Set filteredSignals = Collections.newSetFromMap(new LinkedHashMap<>(signals.size())); + List filterSignals(List signals) { + List filteredSignals = new ArrayList<>(signals.size()); for (Signal signal : signals) { boolean merged = merge(filteredSignals, signal); if (!merged) { if (signal.canExecuteNow(clock.millis())) { - filteredSignals.add(signal); + addWithoutDuplicationMergeableSignals(filteredSignals, signal); } else { taskQueue.offer(signal); } @@ -44,7 +42,17 @@ Set filterSignals(List signals) { return filteredSignals; } - private boolean merge(Set filteredSignals, Signal signal) { + private void addWithoutDuplicationMergeableSignals(List filteredSignals, Signal signal) { + if (MERGEABLE_SIGNALS.containsKey(signal.getType())) { + if (!filteredSignals.contains(signal)) { + filteredSignals.add(signal); + } + } else { + filteredSignals.add(signal); + } + } + + private boolean merge(List filteredSignals, Signal signal) { SignalType signalTypeToMerge = MERGEABLE_SIGNALS.get(signal.getType()); if (signalTypeToMerge != null) { return filteredSignals.remove(signal.createChild(signalTypeToMerge)); diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilterTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilterTest.groovy index 9e68dfb1ea..6724d4ed80 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilterTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/SignalsFilterTest.groovy @@ -1,15 +1,25 @@ package pl.allegro.tech.hermes.consumers.supervisor.process import pl.allegro.tech.hermes.api.SubscriptionName +import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset import pl.allegro.tech.hermes.consumers.queue.MpscQueue import pl.allegro.tech.hermes.consumers.queue.WaitFreeDrainMpscQueue -import pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType import spock.lang.Specification +import spock.lang.Unroll import java.time.Clock import java.time.Instant import java.time.ZoneId +import static pl.allegro.tech.hermes.api.SubscriptionName.fromString +import static pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset.subscriptionPartitionOffset +import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.COMMIT +import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.RETRANSMIT +import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.START +import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.STOP +import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.UPDATE_SUBSCRIPTION +import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.UPDATE_TOPIC + class SignalsFilterTest extends Specification { private final Clock clock = Clock.fixed(Instant.ofEpochMilli(1024), ZoneId.systemDefault()) @@ -21,57 +31,88 @@ class SignalsFilterTest extends Specification { def "should filter out contradicting signals like START & STOP or STOP & START for the same subscription"() { given: List signals = [ - Signal.of(SignalType.STOP, subscription('A')), - Signal.of(SignalType.STOP, subscription('B')), - Signal.of(SignalType.STOP, subscription('C')), - Signal.of(SignalType.START, subscription('D')), - Signal.of(SignalType.START, subscription('A')) + Signal.of(STOP, subscription('A')), + Signal.of(STOP, subscription('B')), + Signal.of(STOP, subscription('C')), + Signal.of(START, subscription('D')), + Signal.of(START, subscription('A')) ] when: - Set filteredSignals = filter.filterSignals(signals) + List filteredSignals = filter.filterSignals(signals) then: filteredSignals == [ - Signal.of(SignalType.STOP, subscription('B')), - Signal.of(SignalType.STOP, subscription('C')), - Signal.of(SignalType.START, subscription('D')) - ] as Set + Signal.of(STOP, subscription('B')), + Signal.of(STOP, subscription('C')), + Signal.of(START, subscription('D')) + ] } - def "should remove duplicate signals keeping the most recent one"() { + @Unroll + def "should remove duplicated signals #signalType"() { given: List signals = [ - Signal.of(SignalType.UPDATE_TOPIC, subscription('A'), 'first-update'), - Signal.of(SignalType.UPDATE_TOPIC, subscription('A'), 'second-update'), + Signal.of(signalType, subscription('A')), + Signal.of(signalType, subscription('A')), + Signal.of(signalType, subscription('B')) ] when: - Set filteredSignals = filter.filterSignals(signals) + List filteredSignals = filter.filterSignals(signals) then: filteredSignals == [ - Signal.of(SignalType.UPDATE_TOPIC, subscription('A')), - ] as Set - filteredSignals + Signal.of(signalType, subscription('A')), + Signal.of(signalType, subscription('B')) + ] + + where: + signalType << [STOP, START] + } + + @Unroll + def "should not remove duplicated #signalType signals"() { + given: + List signals = [ + Signal.of(signalType, subscription('A'), firstSignalPayload), + Signal.of(signalType, subscription('A'), secondSignalPayload) + ] + + when: + List filteredSignals = filter.filterSignals(signals) + + then: + filteredSignals == signals + + where: + signalType | firstSignalPayload | secondSignalPayload + UPDATE_TOPIC | 'first-update' | 'second-update' + UPDATE_SUBSCRIPTION | 'first-update' | 'second-update' + COMMIT | [offset(1, 10), offset(2, 11)] | [offset(1, 11)] + RETRANSMIT | null | null } def "should remove signals that should be executed later and put them back on task queue"() { given: Object payload = null List signals = [ - Signal.of(SignalType.START, subscription('A'), payload, 2048), + Signal.of(START, subscription('A'), payload, 2048), ] when: - Set filteredSignals = filter.filterSignals(signals) + List filteredSignals = filter.filterSignals(signals) then: - filteredSignals == [] as Set - taskQueue.drain({ s -> s == Signal.of(SignalType.START, subscription('A')) }) + filteredSignals == [] + taskQueue.drain({ s -> s == Signal.of(START, subscription('A')) }) + } + + private static SubscriptionName subscription(String suffix) { + return fromString("group.topic\$sub$suffix") } - private SubscriptionName subscription(String suffix) { - return SubscriptionName.fromString("group.topic\$sub$suffix") + private static SubscriptionPartitionOffset offset(int partition, long offset) { + return subscriptionPartitionOffset("group_topic", 'group.topic$sub', partition, offset) } }