Skip to content

Commit

Permalink
remove streaming of thing events functionality because it isn't used …
Browse files Browse the repository at this point in the history
…anymore;

deleted ThingTag class and all usage of it;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Dec 7, 2021
1 parent 1278363 commit f37bb93
Show file tree
Hide file tree
Showing 10 changed files with 5 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ protected final Source<T, NotUsed> createSource(final SudoStreamPids command) {
// resume from lower bound
final var pidWithSeqNr = entityUnmapper.apply(command.getLowerBound());
pidSource =
readJournal.getJournalPidsAbove(pidWithSeqNr.getPersistenceId(), batchSize,
materializer);
readJournal.getJournalPidsAbove(pidWithSeqNr.getPersistenceId(), batchSize, materializer);
} else {
// no lower bound; read from event journals with restart-source
pidSource = readJournal.getJournalPids(batchSize, maxIdleTime, materializer);
}

return pidSource.map(pid -> mapEntity(new PidWithSeqNr(pid, 0L))).log("pid-streaming", log);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingsModelFactory;
import org.eclipse.ditto.internal.models.streaming.BatchedEntityIdWithRevisions;
import org.eclipse.ditto.base.model.signals.JsonParsable;
import org.eclipse.ditto.internal.utils.cluster.GlobalMappingStrategies;
import org.eclipse.ditto.internal.utils.cluster.MappingStrategies;
import org.eclipse.ditto.internal.utils.cluster.MappingStrategiesBuilder;
import org.eclipse.ditto.base.model.signals.JsonParsable;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingsModelFactory;

/**
* {@link MappingStrategies} for the Things service containing all {@link Jsonifiable} types known to Things.
Expand Down Expand Up @@ -63,9 +62,6 @@ public static ThingsMappingStrategies getInstance() {
private static MappingStrategies getThingsMappingStrategies() {
return MappingStrategiesBuilder.newInstance()
.add(Thing.class, jsonObject -> ThingsModelFactory.newThing(jsonObject)) // do not replace with lambda!
.add(ThingTag.class, jsonObject -> ThingTag.fromJson(jsonObject)) // do not replace with lambda!
.add(BatchedEntityIdWithRevisions.typeOf(ThingTag.class),
BatchedEntityIdWithRevisions.deserializer(jsonObject -> ThingTag.fromJson(jsonObject)))
.putAll(GlobalMappingStrategies.getInstance())
.build();
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@
import java.util.regex.Pattern;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.internal.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.internal.utils.persistence.mongo.DefaultPersistenceStreamingActor;
import org.eclipse.ditto.internal.utils.persistence.mongo.SnapshotStreamingActor;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr;
import org.eclipse.ditto.things.api.ThingTag;
import org.eclipse.ditto.things.model.ThingId;

import akka.actor.ActorRef;
Expand All @@ -32,12 +28,6 @@
*/
public final class ThingsPersistenceStreamingActorCreator {

/**
* The name of the event streaming actor. Must agree with
* {@link org.eclipse.ditto.things.api.ThingsMessagingConstants#THINGS_STREAM_PROVIDER_ACTOR_PATH}.
*/
public static final String EVENT_STREAMING_ACTOR_NAME = "persistenceStreamingActor";

/**
* The name of the snapshot streaming actor. Must agree with
* {@link org.eclipse.ditto.things.api.ThingsMessagingConstants#THINGS_SNAPSHOT_STREAMING_ACTOR_PATH}.
Expand All @@ -50,19 +40,6 @@ private ThingsPersistenceStreamingActorCreator() {
throw new AssertionError();
}

/**
* Create an actor for streaming from the event journal.
*
* @param actorCreator function to create a named actor with.
* @return a reference of the created actor.
*/
public static ActorRef startEventStreamingActor(final BiFunction<String, Props, ActorRef> actorCreator) {
final var props = DefaultPersistenceStreamingActor.props(ThingTag.class,
ThingsPersistenceStreamingActorCreator::createElement,
ThingsPersistenceStreamingActorCreator::createPidWithSeqNr);
return actorCreator.apply(EVENT_STREAMING_ACTOR_NAME, props);
}

/**
* Create an actor that streams from the snapshot store.
*
Expand All @@ -76,14 +53,6 @@ public static ActorRef startSnapshotStreamingActor(final BiFunction<String, Prop
return actorCreator.apply(SNAPSHOT_STREAMING_ACTOR_NAME, props);
}

private static ThingTag createElement(final PidWithSeqNr pidWithSeqNr) {
return ThingTag.of(pid2EntityId(pidWithSeqNr.getPersistenceId()), pidWithSeqNr.getSequenceNr());
}

private static PidWithSeqNr createPidWithSeqNr(final EntityIdWithRevision<?> thingTag) {
return new PidWithSeqNr(entityId2Pid(thingTag.getEntityId()), thingTag.getRevision());
}

private static ThingId pid2EntityId(final String pid) {
final String id = PERSISTENCE_ID_PATTERN.matcher(pid).replaceFirst("");
return ThingId.of(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ private ThingsRootActor(final ThingsConfig thingsConfig,
final ActorRef healthCheckingActor = startChildActor(DefaultHealthCheckingActorFactory.ACTOR_NAME,
DefaultHealthCheckingActorFactory.props(healthCheckingActorOptions, MongoHealthChecker.props()));

final ActorRef eventStreamingActor =
ThingsPersistenceStreamingActorCreator.startEventStreamingActor(this::startChildActor);
final ActorRef snapshotStreamingActor =
ThingsPersistenceStreamingActorCreator.startSnapshotStreamingActor(this::startChildActor);

Expand All @@ -113,7 +111,6 @@ private ThingsRootActor(final ThingsConfig thingsConfig,
startChildActor(PersistenceCleanupActor.NAME, cleanupActorProps);

pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());
pubSubMediator.tell(DistPubSubAccess.put(eventStreamingActor), getSelf());
pubSubMediator.tell(DistPubSubAccess.put(snapshotStreamingActor), getSelf());

bindHttpStatusRoute(thingsConfig.getHttpConfig(), healthCheckingActor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.policies.api.PolicyReferenceTag;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.api.ThingTag;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
Expand Down Expand Up @@ -160,7 +159,6 @@ private Receive recoveredBehavior() {
return shutdownBehaviour.createReceive()
.match(ThingEvent.class, this::processThingEvent)
.match(AbstractWriteModel.class, this::onNextWriteModel)
.match(ThingTag.class, this::processThingTag)
.match(PolicyReferenceTag.class, this::processPolicyReferenceTag)
.match(UpdateThing.class, this::updateThing)
.match(UpdateThingResponse.class, this::processUpdateThingResponse)
Expand Down Expand Up @@ -261,22 +259,6 @@ private void enqueueMetadata(final Metadata metadata) {
changeQueueActor.tell(metadata.withOrigin(getSelf()), getSelf());
}

private void processThingTag(final ThingTag thingTag) {
log.debug("Received new Thing Tag for thing <{}> with revision <{}>: <{}>.",
thingId, thingRevision, thingTag.asIdentifierString());

if (thingTag.getRevision() > thingRevision) {
log.debug("The Thing Tag for the thing <{}> has the revision {} which is greater than the current actor's"
+ " sequence number <{}>.", thingId, thingTag.getRevision(), thingRevision);
thingRevision = thingTag.getRevision();
// TODO what triggers a ThingTag?
enqueueMetadata(UpdateReason.UNKNOWN);
} else {
log.debug("Dropping <{}> because my thingRevision=<{}>", thingTag, thingRevision);
}
acknowledge(thingTag);
}

private void updateThing(final UpdateThing updateThing) {
log.withCorrelationId(updateThing)
.info("Requested to update search index <{}> by <{}>", updateThing, getSender());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.eclipse.ditto.internal.utils.pubsub.DistributedSub;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.policies.api.PolicyReferenceTag;
import org.eclipse.ditto.things.api.ThingTag;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
Expand Down Expand Up @@ -124,7 +123,6 @@ static Props props(final DistributedSub thingEventSub,
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ThingEvent.class, this::processThingEvent)
.match(ThingTag.class, this::processThingTag)
.match(PolicyReferenceTag.class, this::processPolicyReferenceTag)
.matchEquals(ShardRegion.getShardRegionStateInstance(), getShardRegionState ->
shardRegion.forward(getShardRegionState, getContext()))
Expand Down Expand Up @@ -163,13 +161,6 @@ private void handleRetrieveStatisticsDetails(final RetrieveStatisticsDetails com
.apply(command.getDittoHeaders()), getContext().dispatcher()).to(getSender());
}

private void processThingTag(final ThingTag thingTag) {
final String elementIdentifier = thingTag.asIdentifierString();
log.withCorrelationId("things-tags-sync-" + elementIdentifier)
.debug("Forwarding incoming ThingTag '{}'", elementIdentifier);
forwardJsonifiableToShardRegion(thingTag, ThingTag::getEntityId);
}

private void updateThings(final ThingsOutOfSync updateThings) {
// log all thing IDs because getting this command implies out-of-sync things.
log.withCorrelationId(updateThings)
Expand Down

0 comments on commit f37bb93

Please sign in to comment.