Skip to content

Commit

Permalink
Merge pull request #1288 from bosch-io/bugfix/pubsub-self-address
Browse files Browse the repository at this point in the history
Test and fix false positives of ClusterStateSyncBehavior due to unserialized local actor refs
  • Loading branch information
thjaeckle committed Jan 24, 2022
2 parents e94c554 + 82372db commit 93b816c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,15 @@ private void doRemoveSubscriber(final ActorRef subscriber) {
// NOT thread-safe
private void writeLocalDData() {
final LiteralUpdate diff = createAndSetDDataUpdate();
ackDData.getWriter()
.put(ownAddress, diff, (Replicator.WriteConsistency) Replicator.writeLocal())
.whenComplete((unused, error) -> {
if (error != null) {
log.error(error, "Failed to update local DData");
}
});
if (!diff.isEmpty()) {
ackDData.getWriter()
.put(ownAddress, diff, (Replicator.WriteConsistency) Replicator.writeLocal())
.whenComplete((unused, error) -> {
if (error != null) {
log.error(error, "Failed to update local DData");
}
});
}
}

// NOT thread-safe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
Expand All @@ -24,6 +25,7 @@

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Timers;
import akka.cluster.Cluster;
Expand Down Expand Up @@ -106,9 +108,10 @@ default AbstractActor.Receive getClusterStateSyncBehavior() {
default void syncClusterState(final Control trigger) {
log().info("Start to sync cluster state");

final var self = self();
final var resultFuture = getDData().getReader()
.getAllShards((Replicator.ReadConsistency) Replicator.readLocal())
.thenApply(this::checkClusterState)
.thenApply(maps -> checkClusterState(maps, self))
.handle((result, error) -> result != null ? result : new SyncError(error));

Patterns.pipe(resultFuture, context().dispatcher()).to(self());
Expand Down Expand Up @@ -161,15 +164,17 @@ default void removeStaleAddresses(final Set<Address> staleAddresses) {
* Compare distributed data addresses against the current cluster state.
*
* @param maps The content of the distributed data.
* @param self the self actor reference.
* @return result of comparing distributed data addresses against the cluster state.
*/
default SyncResult checkClusterState(final List<? extends ORMultiMap<T, ?>> maps) {
default SyncResult checkClusterState(final List<? extends ORMultiMap<T, ?>> maps, final ActorRef self) {
final var clusterState = getCluster().state();
final var clusterAddresses = getClusterMemberAddresses(clusterState);
final var clusterAddresses = getClusterMemberAddresses(clusterState, self);
final var ddataAddresses = getDDataAddresses(maps);
final var isSelfMemberInCluster = isMemberStayingInCluster(getCluster().selfMember());
if (isSelfMemberInCluster) {
final boolean isMyAddressMissing = !ddataAddresses.contains(getCluster().selfAddress());
final boolean isMyAddressMissing = !ddataAddresses.contains(getCluster().selfAddress()) &&
!ddataAddresses.contains(self.path().address());
final Set<Address> staleAddresses = ddataAddresses.stream()
.filter(address -> !clusterAddresses.contains(address))
.collect(Collectors.toSet());
Expand Down Expand Up @@ -216,15 +221,18 @@ static boolean isMemberStayingInCluster(final Member member) {

/**
* Retrieve a set of addresses of cluster members that will stay in the cluster.
* Enhance the address set with the unserialized self address.
*
* @param clusterState The cluster state.
* @param self reference of this actor.
* @return The addresses of members staying in the cluster.
*/
static Set<Address> getClusterMemberAddresses(final ClusterEvent.CurrentClusterState clusterState) {
return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
static Set<Address> getClusterMemberAddresses(final ClusterEvent.CurrentClusterState clusterState,
final ActorRef self) {
final var clusterMemberStream = StreamSupport.stream(clusterState.getMembers().spliterator(), false)
.filter(ClusterStateSyncBehavior::isMemberStayingInCluster)
.map(Member::address)
.collect(Collectors.toSet());
.map(Member::address);
return Stream.concat(clusterMemberStream, Stream.of(self.path().address())).collect(Collectors.toSet());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,12 @@ private CompletionStage<SubscriptionsReader> performDDataOp(final boolean localS
final LiteralUpdate nextUpdate = subscriptions.export();
// take snapshot to give to the subscriber; clear accumulated incremental changes.
snapshot = subscriptions.snapshot();
ddataOp = ddata.getWriter().put(subscriber, nextUpdate.diff(previousUpdate), writeConsistency);
final var diff = nextUpdate.diff(previousUpdate);
if (!diff.isEmpty()) {
ddataOp = ddata.getWriter().put(subscriber, nextUpdate.diff(previousUpdate), writeConsistency);
} else {
ddataOp = CompletableFuture.completedStage(null);
}
previousUpdate = nextUpdate;
topicSizeMetric.set(subscriptions.estimateSize());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ public void clusterStateOutOfSync() {
final var addressMap = Map.of(nonexistentAddress, "unknown-value");
final DData<Address, String, LiteralUpdate> ddata = mockDistributedData(addressMap);
final ActorRef underTest = system1.actorOf(AckUpdater.props(config, ownAddress, ddata));
underTest.tell(DeclareAcks.of(underTest, "group", Set.of("label")), getRef());
Mockito.verify(ddata.getWriter(), Mockito.timeout(5000)).put(eq(ownAddress), any(), any());

// WHEN: AckUpdater is requested to sync against the cluster state
underTest.tell(ClusterStateSyncBehavior.Control.SYNC_CLUSTER_STATE, ActorRef.noSender());
Expand All @@ -249,7 +251,7 @@ public void clusterStateOutOfSync() {
}

@Test
public void clusterStateInSync() {
public void clusterStateInSync() throws Exception {
new TestKit(system1) {{
// GIVEN: distributed data contains entry for the current cluster member and no extraneous entries
final var config = PubSubConfig.of(system1);
Expand All @@ -264,10 +266,9 @@ public void clusterStateInSync() {
// THEN: AckUpdater does not modify ddata more than needed
Mockito.verify(ddata.getReader(), Mockito.timeout(5000))
.getAllShards(eq((Replicator.ReadConsistency) Replicator.readLocal()));
Mockito.verify(ddata.getWriter(), Mockito.timeout(5000).times(1))
.put(eq(ownAddress), any(), any());
Mockito.verify(ddata.getWriter(), Mockito.never())
.removeAddress(any(), eq((Replicator.WriteConsistency) Replicator.writeLocal()));
Thread.sleep(3000);
Mockito.verify(ddata.getWriter(), Mockito.never()).put(eq(ownAddress), any(), any());
Mockito.verify(ddata.getWriter(), Mockito.never()).removeAddress(any(), any());
}};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.internal.utils.pubsub.actors;

import static org.eclipse.ditto.internal.utils.pubsub.actors.ClusterMemberRemovedAware.writeLocal;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;

Expand All @@ -21,6 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;

import org.eclipse.ditto.internal.utils.pubsub.api.SubAck;
import org.eclipse.ditto.internal.utils.pubsub.api.Subscribe;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataReader;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataWriter;
Expand Down Expand Up @@ -77,10 +80,10 @@ public void clusterStateOutOfSync() {
final CompressedDData ddata = mockDistributedData(addressMap);
final ActorRef underTest = system.actorOf(SubUpdater.props(config, subscriber.ref(), ddata));

// WHEN: AckUpdater is requested to sync against the cluster state
// WHEN: SubUpdater is requested to sync against the cluster state
underTest.tell(ClusterStateSyncBehavior.Control.SYNC_CLUSTER_STATE, ActorRef.noSender());

// THEN: AckUpdater removes the extraneous entry
// THEN: SubUpdater removes the extraneous entry
Mockito.verify(ddata.getReader(), Mockito.timeout(5000))
.getAllShards(eq((Replicator.ReadConsistency) Replicator.readLocal()));
Mockito.verify(ddata.getWriter(), Mockito.timeout(5000))
Expand All @@ -93,23 +96,28 @@ public void clusterStateOutOfSync() {
}

@Test
public void clusterStateInSync() {
public void clusterStateInSync() throws Exception {
new TestKit(system) {{
// GIVEN: distributed data contains entry for the current cluster member and no extraneous entries
final var subscriberRef = TestProbe.apply(system).ref();
final var cluster = Cluster.get(system);
final var config = PubSubConfig.of(system);
final var addressMap = Map.of(mockActorRefWithAddress(subscriberRef, cluster), "unknown-value");
final var addressMap = Map.of(subscriberRef, "unknown-value");
final CompressedDData ddata = mockDistributedData(addressMap);
final ActorRef underTest = system.actorOf(SubUpdater.props(config, subscriberRef, ddata));

// WHEN: AckUpdater is requested to sync against the cluster state
// GIVEN: local subscriptions are not empty
final var subscribe = Subscribe.of(List.of("topic"), subscriberRef, writeLocal(), true, null);
underTest.tell(subscribe, getRef());
expectMsgClass(SubAck.class);

// WHEN: SubUpdater is requested to sync against the cluster state
underTest.tell(ClusterStateSyncBehavior.Control.SYNC_CLUSTER_STATE, ActorRef.noSender());

// THEN: AckUpdater does not modify ddata more than needed
// THEN: SubUpdater does not modify ddata more than needed
Mockito.verify(ddata.getReader(), Mockito.timeout(5000))
.getAllShards(eq((Replicator.ReadConsistency) Replicator.readLocal()));
Mockito.verify(ddata.getWriter(), Mockito.never()).put(any(), any(), any());
Thread.sleep(3000);
Mockito.verify(ddata.getWriter(), Mockito.times(1)).put(any(), any(), any());
Mockito.verify(ddata.getWriter(), Mockito.never()).removeAddress(any(), any());
}};
}
Expand All @@ -133,6 +141,7 @@ private static CompressedDData mockDistributedData(final Map<ActorRef, String> r
final var writer = Mockito.mock(DDataWriter.class);
Mockito.when(mock.getReader()).thenReturn(reader);
Mockito.when(mock.getWriter()).thenReturn(writer);
Mockito.when(mock.getSeeds()).thenReturn(List.of(1, 2));
Mockito.when(reader.get(any(), any())).thenReturn(CompletableFuture.completedStage(Optional.of(map)));
Mockito.when(reader.getAllShards(any())).thenReturn(CompletableFuture.completedStage(List.of(map)));
Mockito.when(writer.put(any(), any(), any())).thenReturn(CompletableFuture.completedStage(null));
Expand Down

0 comments on commit 93b816c

Please sign in to comment.