Skip to content

Commit

Permalink
Add test for validating search consistency in case of conflicting sea…
Browse files Browse the repository at this point in the history
…rch updaters

Tests scenario where cluster sharding contains 2 updaters for same thing. (Can happen on failing cluster restarts)

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Mar 14, 2022
1 parent ad0ef9f commit 65a9e7a
Show file tree
Hide file tree
Showing 5 changed files with 564 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import com.mongodb.reactivestreams.client.MongoDatabase;

import akka.NotUsed;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ClassicActorSystemProvider;
import akka.stream.Attributes;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
Expand Down Expand Up @@ -113,16 +113,16 @@ public static SearchUpdaterStream of(final UpdaterConfig updaterConfig,
/**
* Start a perpetual search updater stream killed only by the kill-switch.
*
* @param actorContext where to create actors for this stream.
* @param actorSystem where to create actors for this stream.
* @return kill-switch to terminate the stream.
*/
public KillSwitch start(final ActorContext actorContext) {
public KillSwitch start(final ClassicActorSystemProvider actorSystem) {
final Source<Source<AbstractWriteModel, NotUsed>, NotUsed> restartSource = createRestartSource();
final Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> restartSink = createRestartSink();

return restartSource.viaMat(KillSwitches.single(), Keep.right())
.toMat(restartSink, Keep.left())
.run(actorContext.system());
.run(actorSystem);
}

private Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private SearchUpdaterRootActor(final SearchConfig searchConfig,
SearchUpdaterStream.of(updaterConfig, actorSystem, thingsShard, policiesShard, updaterShard,
changeQueueActor, dittoMongoClient.getDefaultDatabase(), blockedNamespaces,
searchUpdateMapper);
updaterStreamKillSwitch = searchUpdaterStream.start(getContext());
updaterStreamKillSwitch = searchUpdaterStream.start(actorSystem);

final var searchUpdaterPersistence =
MongoThingsSearchUpdaterPersistence.of(dittoMongoClient.getDefaultDatabase(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.thingsearch.service.updater.actors;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Loading

0 comments on commit 65a9e7a

Please sign in to comment.