Skip to content

Commit

Permalink
make publishing of ThingSnapshotTaken event configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Feb 7, 2024
1 parent beb7318 commit aa8d8fa
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import javax.annotation.concurrent.ThreadSafe;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.eclipse.ditto.base.api.persistence.PersistenceLifecycle;
import org.eclipse.ditto.base.api.persistence.SnapshotTaken;
import org.eclipse.ditto.base.model.entity.Revision;
Expand All @@ -35,10 +38,6 @@

import com.typesafe.config.Config;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;

/**
* A {@link org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter} for snapshotting a
* {@link org.eclipse.ditto.things.model.Thing}.
Expand All @@ -48,25 +47,31 @@ public final class ThingMongoSnapshotAdapter extends AbstractMongoSnapshotAdapte

private static final Logger LOGGER = LoggerFactory.getLogger(ThingMongoSnapshotAdapter.class);

static final String THING_SNAPSHOT_TAKEN_EVENT_PUBLISHING_ENABLED =
"thing-snapshot-taken-event-publishing-enabled";

private final ActorRef pubSubMediator;
private final boolean snapshotTakenEventPublishingEnabled;

/**
* @param actorSystem the actor system in which to load the extension
* @param config the config of the extension.
*/
@SuppressWarnings("unused")
public ThingMongoSnapshotAdapter(final ActorSystem actorSystem, final Config config) {
this(DistributedPubSub.get(actorSystem).mediator());
this(DistributedPubSub.get(actorSystem).mediator(), config);

}

/**
* Constructs a new {@code ThingMongoSnapshotAdapter}.
*
* @param pubSubMediator Pekko pubsub mediator with which to publish snapshot events.
*/
public ThingMongoSnapshotAdapter(final ActorRef pubSubMediator) {
public ThingMongoSnapshotAdapter(final ActorRef pubSubMediator, final Config config) {
super(LOGGER);
this.pubSubMediator = pubSubMediator;
snapshotTakenEventPublishingEnabled = config.getBoolean(THING_SNAPSHOT_TAKEN_EVENT_PUBLISHING_ENABLED);
}

@Override
Expand Down Expand Up @@ -95,20 +100,22 @@ protected Optional<JsonField> getRevisionJsonField(final Thing entity) {

@Override
protected void onSnapshotStoreConversion(final Thing thing, final JsonObject thingJson) {
final Optional<ThingId> thingId = thing.getEntityId();
if (thingId.isPresent()) {
final var thingSnapshotTaken = ThingSnapshotTaken.newBuilder(thingId.get(),
thing.getRevision().map(Revision::toLong).orElse(0L),
thing.getLifecycle()
.map(ThingLifecycle::name)
.flatMap(PersistenceLifecycle::forName)
.orElse(PersistenceLifecycle.ACTIVE),
thingJson)
.timestamp(Instant.now())
.build();
publishThingSnapshotTaken(thingSnapshotTaken);
} else {
LOGGER.warn("Could not publish snapshot taken event for thing <{}>.", thing);
if (snapshotTakenEventPublishingEnabled) {
final Optional<ThingId> thingId = thing.getEntityId();
if (thingId.isPresent()) {
final var thingSnapshotTaken = ThingSnapshotTaken.newBuilder(thingId.get(),
thing.getRevision().map(Revision::toLong).orElse(0L),
thing.getLifecycle()
.map(ThingLifecycle::name)
.flatMap(PersistenceLifecycle::forName)
.orElse(PersistenceLifecycle.ACTIVE),
thingJson)
.timestamp(Instant.now())
.build();
publishThingSnapshotTaken(thingSnapshotTaken);
} else {
LOGGER.warn("Could not publish snapshot taken event for thing <{}>.", thing);
}
}
}

Expand Down
8 changes: 7 additions & 1 deletion things/service/src/main/resources/things.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ ditto {
"org.eclipse.ditto.things.service.enforcement.pre.ModifyToCreateThingTransformer", // always keep this as first transformer in order to guarantee that all following transformers know that the command is creating a thing instead of modifying it
"org.eclipse.ditto.things.service.signaltransformation.placeholdersubstitution.ThingsPlaceholderSubstitution"
]
snapshot-adapter = "org.eclipse.ditto.things.service.persistence.serializer.ThingMongoSnapshotAdapter"
snapshot-adapter = {
extension-class = "org.eclipse.ditto.things.service.persistence.serializer.ThingMongoSnapshotAdapter"
extension-config {
thing-snapshot-taken-event-publishing-enabled = true
thing-snapshot-taken-event-publishing-enabled = ${?THING_SNAPSHOT_TAKEN_EVENT_PUBLISHING_ENABLED}
}
}
}

mongodb {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.persistence.SnapshotMetadata;
import org.apache.pekko.persistence.SnapshotOffer;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.assertj.core.api.JUnitSoftAssertions;
import org.bson.BsonDocument;
import org.eclipse.ditto.base.api.persistence.PersistenceLifecycle;
Expand All @@ -30,12 +37,7 @@
import org.junit.Rule;
import org.junit.Test;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.persistence.SnapshotMetadata;
import org.apache.pekko.persistence.SnapshotOffer;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.testkit.javadsl.TestKit;
import com.typesafe.config.ConfigFactory;

/**
* Unit test for {@link ThingMongoSnapshotAdapter}.
Expand All @@ -56,7 +58,9 @@ public final class ThingMongoSnapshotAdapterTest {
public void setUp() {
system = ActorSystem.create();
pubSubProbe = TestProbe.apply(system);
underTest = new ThingMongoSnapshotAdapter(pubSubProbe.ref());
underTest = new ThingMongoSnapshotAdapter(pubSubProbe.ref(), ConfigFactory.parseMap(
Map.of(ThingMongoSnapshotAdapter.THING_SNAPSHOT_TAKEN_EVENT_PUBLISHING_ENABLED, true)
));
}

@After
Expand Down

0 comments on commit aa8d8fa

Please sign in to comment.