Skip to content

Commit

Permalink
made AckUpdater work with ddata sharding
Browse files Browse the repository at this point in the history
having a shard count > 1
* added a unit test testing with 2 systems and remote declared acks

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed May 12, 2021
1 parent 8e275e8 commit 391e3a2
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
Expand All @@ -40,17 +38,20 @@
import org.eclipse.ditto.internal.utils.pubsub.api.ReceiveRemoteAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoteAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoveSubscriberAcks;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DData;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.internal.utils.pubsub.ddata.ack.Grouped;
import org.eclipse.ditto.internal.utils.pubsub.ddata.ack.GroupedRelation;
import org.eclipse.ditto.internal.utils.pubsub.ddata.literal.LiteralUpdate;
import org.eclipse.ditto.json.JsonValue;

import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ORMultiMap;
import akka.cluster.ddata.Replicator;
import akka.event.LoggingAdapter;
Expand All @@ -75,6 +76,7 @@ public final class AckUpdater extends AbstractActorWithTimers implements Cluster
private final java.util.Set<ActorRef> ddataChangeRecipients;
private final java.util.Set<ActorRef> localChangeRecipients;
private final Gauge ackSizeMetric;
private final Map<Key<?>, Map<Address, List<Grouped<String>>>> cachedRemoteAcks;

private Map<String, Set<String>> remoteAckLabels = Map.of();
private Map<String, Set<String>> remoteGroups = Map.of();
Expand All @@ -89,6 +91,7 @@ protected AckUpdater(final PubSubConfig config,
ddataChangeRecipients = new HashSet<>();
localChangeRecipients = new HashSet<>();
ackSizeMetric = DittoMetrics.gauge("pubsub-ack-size-bytes");
cachedRemoteAcks = new HashMap<>();
subscribeForClusterMemberRemovedAware();
ackDData.getReader().receiveChanges(getSelf());
getTimers().startTimerAtFixedRate(Clock.TICK, Clock.TICK, config.getUpdateInterval());
Expand Down Expand Up @@ -195,16 +198,28 @@ private void tick(final Clock tick) {
private void onChanged(final Replicator.Changed<?> event) {
final Map<Address, List<Grouped<String>>> mmap = Grouped.deserializeORMultiMap(
((ORMultiMap<Address, String>) event.dataValue()), JsonValue::asString);
final List<Grouped<String>> remoteGroupedAckLabels = getRemoteGroupedAckLabelsOrderByAddress(mmap);

cachedRemoteAcks.put(event.key(), mmap);
// calculate all sharded maps:
final Map<Address, List<Grouped<String>>> completeMmap = new HashMap<>();
cachedRemoteAcks.values()
.forEach(map -> map.forEach((address, groups) ->
completeMmap.merge(address, groups, (g1, g2) -> {
g1.addAll(g2);
return g1;
}))
);

final List<Grouped<String>> remoteGroupedAckLabels = getRemoteGroupedAckLabelsOrderByAddress(completeMmap);
remoteGroups = getRemoteGroups(remoteGroupedAckLabels);
remoteAckLabels = getRemoteAckLabels(remoteGroupedAckLabels);

for (final ActorRef localLoser : getLocalLosers(mmap)) {
for (final ActorRef localLoser : getLocalLosers(completeMmap)) {
doRemoveSubscriber(localLoser);
failSubscribe(localLoser);
}

final RemoteAcksChanged ddataChanged = RemoteAcksChanged.of(mmap);
final RemoteAcksChanged ddataChanged = RemoteAcksChanged.of(completeMmap);
ddataChangeRecipients.forEach(recipient -> recipient.tell(ddataChanged, getSelf()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.internal.utils.pubsub.actors;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -21,6 +23,8 @@
import org.eclipse.ditto.internal.utils.pubsub.LiteralDDataProvider;
import org.eclipse.ditto.internal.utils.pubsub.api.AcksDeclared;
import org.eclipse.ditto.internal.utils.pubsub.api.DeclareAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.ReceiveRemoteAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoteAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DData;
import org.eclipse.ditto.internal.utils.pubsub.ddata.literal.AbstractConfigAwareDDataProvider;
Expand Down Expand Up @@ -154,6 +158,58 @@ public void localDeclarationsWithoutGroupDoNotOverrideEachOther() {
}};
}

@Test
public void delclarationsInSeveralSystemsMaintainKnownRemoteDeclaredAcks() {
new TestKit(system1) {{
final ActorRef underTest1 = system1.actorOf(AckSupervisor.props(ddata1));
final ActorRef underTest2 = system2.actorOf(AckSupervisor.props(ddata2));
final TestProbe s1a = TestProbe.apply("s1-a", system1);
final TestProbe s1b = TestProbe.apply("s1-b", system1);
final TestProbe s2a = TestProbe.apply("s2-a", system2);
final TestProbe s2b = TestProbe.apply("s2-b", system2);

underTest1.tell(ReceiveRemoteAcks.of(getRef()), getRef());

final String someGroup = "some-group";
final String groupedTopic = "grouped-topic";
final String standaloneTopic1 = "standalone-topic-1";
final String standaloneTopic2 = "standalone-topic-2";

// GIVEN: ack labels are declared on cluster node 1
underTest1.tell(DeclareAcks.of(s1a.ref(), someGroup, Set.of(groupedTopic)), getRef());
expectMsgClass(AcksDeclared.class);
assertThat(expectMsgClass(RemoteAcksChanged.class).contains(groupedTopic)).isTrue();

// GIVEN:
// ack labels are declared on cluster node 2
underTest2.tell(DeclareAcks.of(s2a.ref(), someGroup, Set.of(groupedTopic)), getRef());
expectMsgClass(AcksDeclared.class);
assertThat(expectMsgClass(RemoteAcksChanged.class).contains(groupedTopic)).isTrue();
// ack labels are declared on cluster node 1 without group
underTest1.tell(DeclareAcks.of(s1a.ref(), null, Set.of(standaloneTopic1, standaloneTopic2)),
getRef());
expectMsgClass(AcksDeclared.class);
final RemoteAcksChanged remoteAcksChanged = expectMsgClass(RemoteAcksChanged.class);
assertThat(remoteAcksChanged.contains(groupedTopic)).isTrue();
assertThat(remoteAcksChanged.contains(standaloneTopic1)).isTrue();
assertThat(remoteAcksChanged.contains(standaloneTopic2)).isTrue();

// WHEN: ddata is replicated
waitForHeartBeats(system1, this);
waitForHeartBeats(system2, this);

// THEN:
// without group those ack labels may not be declared again:
underTest1.tell(DeclareAcks.of(s1b.ref(), null, Set.of(standaloneTopic1)), getRef());
expectMsgClass(AcknowledgementLabelNotUniqueException.class);
underTest2.tell(DeclareAcks.of(s2b.ref(), null, Set.of(standaloneTopic2)), getRef());
expectMsgClass(AcknowledgementLabelNotUniqueException.class);
// with same group as above an ack labels may be declared again:
underTest2.tell(DeclareAcks.of(s2b.ref(), someGroup, Set.of(groupedTopic)), getRef());
expectMsgClass(AcksDeclared.class);
}};
}

private Config getTestConf() {
return ConfigFactory.load("pubsub-factory-test.conf");
}
Expand Down

0 comments on commit 391e3a2

Please sign in to comment.