Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to filtering duplicated signals #1038

Merged
merged 2 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
## [Unreleased]

### ...
### Bugfixes

#### ([1038](https://github.com/allegro/hermes/pull/1038)) Fix to filtering duplicated signals

## 1.0.3 (10.06.2019)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType;

import java.time.Clock;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

class SignalsFilter {

Expand All @@ -27,14 +25,14 @@ class SignalsFilter {
this.clock = clock;
}

Set<Signal> filterSignals(List<Signal> signals) {
Set<Signal> filteredSignals = Collections.newSetFromMap(new LinkedHashMap<>(signals.size()));
List<Signal> filterSignals(List<Signal> signals) {
List<Signal> filteredSignals = new ArrayList<>(signals.size());

for (Signal signal : signals) {
boolean merged = merge(filteredSignals, signal);
if (!merged) {
if (signal.canExecuteNow(clock.millis())) {
filteredSignals.add(signal);
addWithoutDuplicationMergeableSignals(filteredSignals, signal);
} else {
taskQueue.offer(signal);
}
Expand All @@ -44,7 +42,17 @@ Set<Signal> filterSignals(List<Signal> signals) {
return filteredSignals;
}

private boolean merge(Set<Signal> filteredSignals, Signal signal) {
private void addWithoutDuplicationMergeableSignals(List<Signal> filteredSignals, Signal signal) {
if (MERGEABLE_SIGNALS.containsKey(signal.getType())) {
if (!filteredSignals.contains(signal)) {
filteredSignals.add(signal);
}
} else {
filteredSignals.add(signal);
}
}

private boolean merge(List<Signal> filteredSignals, Signal signal) {
SignalType signalTypeToMerge = MERGEABLE_SIGNALS.get(signal.getType());
if (signalTypeToMerge != null) {
return filteredSignals.remove(signal.createChild(signalTypeToMerge));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package pl.allegro.tech.hermes.consumers.supervisor.process

import pl.allegro.tech.hermes.api.SubscriptionName
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset
import pl.allegro.tech.hermes.consumers.queue.MpscQueue
import pl.allegro.tech.hermes.consumers.queue.WaitFreeDrainMpscQueue
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType
import spock.lang.Specification
import spock.lang.Unroll

import java.time.Clock
import java.time.Instant
import java.time.ZoneId

import static pl.allegro.tech.hermes.api.SubscriptionName.fromString
import static pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset.subscriptionPartitionOffset
import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.COMMIT
import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.RETRANSMIT
import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.START
import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.STOP
import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.UPDATE_SUBSCRIPTION
import static pl.allegro.tech.hermes.consumers.supervisor.process.Signal.SignalType.UPDATE_TOPIC

class SignalsFilterTest extends Specification {

private final Clock clock = Clock.fixed(Instant.ofEpochMilli(1024), ZoneId.systemDefault())
Expand All @@ -21,57 +31,88 @@ class SignalsFilterTest extends Specification {
def "should filter out contradicting signals like START & STOP or STOP & START for the same subscription"() {
given:
List<Signal> signals = [
Signal.of(SignalType.STOP, subscription('A')),
Signal.of(SignalType.STOP, subscription('B')),
Signal.of(SignalType.STOP, subscription('C')),
Signal.of(SignalType.START, subscription('D')),
Signal.of(SignalType.START, subscription('A'))
Signal.of(STOP, subscription('A')),
Signal.of(STOP, subscription('B')),
Signal.of(STOP, subscription('C')),
Signal.of(START, subscription('D')),
Signal.of(START, subscription('A'))
]

when:
Set<Signal> filteredSignals = filter.filterSignals(signals)
List<Signal> filteredSignals = filter.filterSignals(signals)

then:
filteredSignals == [
Signal.of(SignalType.STOP, subscription('B')),
Signal.of(SignalType.STOP, subscription('C')),
Signal.of(SignalType.START, subscription('D'))
] as Set
Signal.of(STOP, subscription('B')),
Signal.of(STOP, subscription('C')),
Signal.of(START, subscription('D'))
]
}

def "should remove duplicate signals keeping the most recent one"() {
@Unroll
def "should remove duplicated signals #signalType"() {
given:
List<Signal> signals = [
Signal.of(SignalType.UPDATE_TOPIC, subscription('A'), 'first-update'),
Signal.of(SignalType.UPDATE_TOPIC, subscription('A'), 'second-update'),
Signal.of(signalType, subscription('A')),
Signal.of(signalType, subscription('A')),
Signal.of(signalType, subscription('B'))
]

when:
Set<Signal> filteredSignals = filter.filterSignals(signals)
List<Signal> filteredSignals = filter.filterSignals(signals)

then:
filteredSignals == [
Signal.of(SignalType.UPDATE_TOPIC, subscription('A')),
] as Set
filteredSignals
Signal.of(signalType, subscription('A')),
Signal.of(signalType, subscription('B'))
]

where:
signalType << [STOP, START]
}

@Unroll
def "should not remove duplicated #signalType signals"() {
given:
List<Signal> signals = [
Signal.of(signalType, subscription('A'), firstSignalPayload),
Signal.of(signalType, subscription('A'), secondSignalPayload)
]

when:
List<Signal> filteredSignals = filter.filterSignals(signals)

then:
filteredSignals == signals

where:
signalType | firstSignalPayload | secondSignalPayload
UPDATE_TOPIC | 'first-update' | 'second-update'
UPDATE_SUBSCRIPTION | 'first-update' | 'second-update'
COMMIT | [offset(1, 10), offset(2, 11)] | [offset(1, 11)]
RETRANSMIT | null | null
}

def "should remove signals that should be executed later and put them back on task queue"() {
given:
Object payload = null
List<Signal> signals = [
Signal.of(SignalType.START, subscription('A'), payload, 2048),
Signal.of(START, subscription('A'), payload, 2048),
]

when:
Set<Signal> filteredSignals = filter.filterSignals(signals)
List<Signal> filteredSignals = filter.filterSignals(signals)

then:
filteredSignals == [] as Set
taskQueue.drain({ s -> s == Signal.of(SignalType.START, subscription('A')) })
filteredSignals == []
taskQueue.drain({ s -> s == Signal.of(START, subscription('A')) })
}

private static SubscriptionName subscription(String suffix) {
return fromString("group.topic\$sub$suffix")
}

private SubscriptionName subscription(String suffix) {
return SubscriptionName.fromString("group.topic\$sub$suffix")
private static SubscriptionPartitionOffset offset(int partition, long offset) {
return subscriptionPartitionOffset("group_topic", 'group.topic$sub', partition, offset)
}
}