From 7bdc0257c7bfeab96719fb51c84ad172bef5df41 Mon Sep 17 00:00:00 2001 From: Dominik Guggemos Date: Tue, 3 Jul 2018 10:26:00 +0200 Subject: [PATCH] lazy load strategies that handle thing commands, move handling of thing events to separate class Signed-off-by: Dominik Guggemos --- .../actors/ThingPersistenceActor.java | 559 ++++++------------ 1 file changed, 191 insertions(+), 368 deletions(-) diff --git a/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor.java b/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor.java index c1617bd617..7e771ef435 100755 --- a/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor.java +++ b/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor.java @@ -15,10 +15,14 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; @@ -215,6 +219,8 @@ public final class ThingPersistenceActor extends AbstractPersistentActor impleme private final DiagnosticLoggingAdapter log = LogUtil.obtain(this); + private static final ThingEventHandlers thingEventHandlers = new ThingEventHandlers(); + private final String thingId; private final ActorRef pubSubMediator; private final java.time.Duration activityCheckInterval; @@ -224,7 +230,7 @@ public final class ThingPersistenceActor extends AbstractPersistentActor impleme private final long snapshotThreshold; private long accessCounter; private Cancellable activityChecker; - private Thing thing; + private final AtomicReference thing = new AtomicReference<>(); ThingPersistenceActor(final String thingId, final ActorRef pubSubMediator, @@ -253,251 +259,20 @@ public final class ThingPersistenceActor extends AbstractPersistentActor impleme thingSnapshotterCreate.apply(this, pubSubMediator, snapshotDeleteOld, eventsDeleteOld, log, snapshotInterval); - handleThingEvents = ReceiveBuilder.create() - // # Thing Creation - .match(ThingCreated.class, tc -> thing = tc.getThing().toBuilder() - .setLifecycle(ThingLifecycle.ACTIVE) - .setRevision(getRevisionNumber()) - .setModified(tc.getTimestamp().orElse(null)) - .build()) - - // # Thing Modification - .match(ThingModified.class, tm -> { - // we need to use the current thing as base otherwise we would loose its state - final ThingBuilder.FromCopy copyBuilder = thing.toBuilder().setLifecycle(ThingLifecycle.ACTIVE) - .setRevision(getRevisionNumber()) - .setModified(tm.getTimestamp().orElse(null)); - - mergeThingModifications(tm.getThing(), copyBuilder); - - thing = copyBuilder.build(); - }) - - // # Thing Deletion - .match(ThingDeleted.class, td -> { - if (thing != null) { - thing = thing.toBuilder() - .setLifecycle(ThingLifecycle.DELETED) - .setRevision(getRevisionNumber()) - .setModified(td.getTimestamp().orElse(null)) - .build(); - } else { - log.warning("Thing was null when 'ThingDeleted' event should have been applied on recovery."); - } - }) - - // # ACL Modification - .match(AclModified.class, tam -> thing = thing.toBuilder() - .removeAllPermissions() - .setPermissions(tam.getAccessControlList()) - .setRevision(getRevisionNumber()) - .setModified(tam.getTimestamp().orElse(null)) - .build()) - - // # ACL Entry Creation - .match(AclEntryCreated.class, aec -> thing = thing.toBuilder() - .setPermissions(aec.getAclEntry()) - .setRevision(getRevisionNumber()) - .setModified(aec.getTimestamp().orElse(null)) - .build()) - - // # ACL Entry Modification - .match(AclEntryModified.class, taem -> thing = thing.toBuilder() - .setPermissions(taem.getAclEntry()) - .setRevision(getRevisionNumber()) - .setModified(taem.getTimestamp().orElse(null)) - .build()) - - // # ACL Entry Deletion - .match(AclEntryDeleted.class, taed -> thing = thing.toBuilder() - .removePermissionsOf(taed.getAuthorizationSubject()) - .setRevision(getRevisionNumber()) - .setModified(taed.getTimestamp().orElse(null)) - .build()) - - // # Attributes Creation - .match(AttributesCreated.class, ac -> thing = thing.toBuilder() - .setAttributes(ac.getCreatedAttributes()) - .setRevision(getRevisionNumber()) - .setModified(ac.getTimestamp().orElse(null)) - .build()) - - // # Attributes Modification - .match(AttributesModified.class, tasm -> thing = thing.toBuilder() - .setAttributes(tasm.getModifiedAttributes()) - .setRevision(getRevisionNumber()) - .setModified(tasm.getTimestamp().orElse(null)) - .build()) - - // # Attribute Modification - .match(AttributeModified.class, tam -> thing = thing.toBuilder() - .setAttribute(tam.getAttributePointer(), tam.getAttributeValue()) - .setRevision(getRevisionNumber()) - .setModified(tam.getTimestamp().orElse(null)) - .build()) - - // # Attribute Creation - .match(AttributeCreated.class, ac -> thing = thing.toBuilder() - .setAttribute(ac.getAttributePointer(), ac.getAttributeValue()) - .setRevision(getRevisionNumber()) - .setModified(ac.getTimestamp().orElse(null)) - .build()) - - // # Attributes Deletion - .match(AttributesDeleted.class, tasd -> thing = thing.toBuilder() - .removeAllAttributes() - .setRevision(getRevisionNumber()) - .setModified(tasd.getTimestamp().orElse(null)) - .build()) - - // # Attribute Deletion - .match(AttributeDeleted.class, tad -> thing = thing.toBuilder() - .removeAttribute(tad.getAttributePointer()) - .setRevision(getRevisionNumber()) - .setModified(tad.getTimestamp().orElse(null)) - .build()) - - // # Features Modification - .match(FeaturesModified.class, fm -> thing = thing.toBuilder() - .removeAllFeatures() - .setFeatures(fm.getFeatures()) - .setRevision(getRevisionNumber()) - .setModified(fm.getTimestamp().orElse(null)) - .build()) - - // # Features Creation - .match(FeaturesCreated.class, fc -> thing = thing.toBuilder() - .setFeatures(fc.getFeatures()) - .setRevision(getRevisionNumber()) - .setModified(fc.getTimestamp().orElse(null)) - .build()) - - // # Features Deletion - .match(FeaturesDeleted.class, fd -> thing = thing.toBuilder() - .removeAllFeatures() - .setRevision(getRevisionNumber()) - .setModified(fd.getTimestamp().orElse(null)) - .build()) - - // # Feature Creation - .match(FeatureCreated.class, fc -> thing = thing.toBuilder() - .setFeature(fc.getFeature()) - .setRevision(getRevisionNumber()) - .setModified(fc.getTimestamp().orElse(null)) - .build()) - - // # Feature Modification - .match(FeatureModified.class, fm -> thing = thing.toBuilder() - .setFeature(fm.getFeature()) - .setRevision(getRevisionNumber()) - .setModified(fm.getTimestamp().orElse(null)) - .build()) - - // # Feature Deletion - .match(FeatureDeleted.class, fd -> thing = thing.toBuilder() - .removeFeature(fd.getFeatureId()) - .setRevision(getRevisionNumber()) - .setModified(fd.getTimestamp().orElse(null)) - .build()) - - // # Feature Definition Creation - .match(FeatureDefinitionCreated.class, fdc -> thing = thing.toBuilder() - .setFeatureDefinition(fdc.getFeatureId(), fdc.getDefinition()) - .setRevision(getRevisionNumber()) - .setModified(fdc.getTimestamp().orElse(null)) - .build()) - - // # Feature Definition Modification - .match(FeatureDefinitionModified.class, fdm -> thing = thing.toBuilder() - .setFeatureDefinition(fdm.getFeatureId(), fdm.getDefinition()) - .setRevision(getRevisionNumber()) - .setModified(fdm.getTimestamp().orElse(null)) - .build()) - - // # Feature Definition Deletion - .match(FeatureDefinitionDeleted.class, fdd -> thing = thing.toBuilder() - .removeFeatureDefinition(fdd.getFeatureId()) - .setRevision(getRevisionNumber()) - .setModified(fdd.getTimestamp().orElse(null)) - .build()) - - // # Feature Properties Creation - .match(FeaturePropertiesCreated.class, fpc -> thing = thing.toBuilder() - .setFeatureProperties(fpc.getFeatureId(), fpc.getProperties()) - .setRevision(getRevisionNumber()) - .setModified(fpc.getTimestamp().orElse(null)) - .build()) - - // # Feature Properties Modification - .match(FeaturePropertiesModified.class, fpm -> thing = thing.toBuilder() - .setFeatureProperties(fpm.getFeatureId(), fpm.getProperties()) - .setRevision(getRevisionNumber()) - .setModified(fpm.getTimestamp().orElse(null)) - .build()) - - // # Feature Properties Deletion - .match(FeaturePropertiesDeleted.class, fpd -> thing = thing.toBuilder() - .removeFeatureProperties(fpd.getFeatureId()) - .setRevision(getRevisionNumber()) - .setModified(fpd.getTimestamp().orElse(null)) - .build()) - - // # Feature Property Creation - .match(FeaturePropertyCreated.class, fpc -> thing = thing.toBuilder() - .setFeatureProperty(fpc.getFeatureId(), fpc.getPropertyPointer(), fpc.getPropertyValue()) - .setRevision(getRevisionNumber()) - .setModified(fpc.getTimestamp().orElse(null)) - .build()) - - // # Feature Property Modification - .match(FeaturePropertyModified.class, fpm -> thing = thing.toBuilder() - .setFeatureProperty(fpm.getFeatureId(), fpm.getPropertyPointer(), fpm.getPropertyValue()) - .setRevision(getRevisionNumber()) - .setModified(fpm.getTimestamp().orElse(null)) - .build()) - - // # Feature Property Deletion - .match(FeaturePropertyDeleted.class, fpd -> thing = thing.toBuilder() - .removeFeatureProperty(fpd.getFeatureId(), fpd.getPropertyPointer()) - .setRevision(getRevisionNumber()) - .setModified(fpd.getTimestamp().orElse(null)) - .build()) - - // # Policy ID Creation - .match(PolicyIdCreated.class, pic -> { - final ThingBuilder.FromCopy thingBuilder = thing.toBuilder(); - thingBuilder.setPolicyId(pic.getPolicyId()); - thing = thingBuilder.setRevision(getRevisionNumber()) - .setModified(pic.getTimestamp().orElse(null)) - .build(); - }) - - // # Policy ID Modification - .match(PolicyIdModified.class, pim -> { - final ThingBuilder.FromCopy thingBuilder = thing.toBuilder(); - thingBuilder.setPolicyId(pim.getPolicyId()); - thing = thingBuilder.setRevision(getRevisionNumber()) - .setModified(pim.getTimestamp().orElse(null)) - .build(); - }) + handleThingEvents = ReceiveBuilder.create().matchAny(event -> { + final Thing modified = thingEventHandlers.handle(event, thing(), getRevisionNumber()); + if (modified != null) { + thing.set(modified); + } + }).build(); + } - .build(); + private Thing thing() { + return thing.get(); } - /** - * Merges the modifications from {@code thingWithModifications} to {@code builder}. Merge is implemented very - * simple: All first level fields of {@code thingWithModifications} overwrite the first level fields of {@code - * builder}. If a field does not exist in {@code thingWithModifications}, a maybe existing field in {@code - * builder} remains unchanged. - * - * @param thingWithModifications the thing containing the modifications. - * @param builder the builder to be modified. - */ - private void mergeThingModifications(final Thing thingWithModifications, final ThingBuilder.FromCopy builder) { - thingWithModifications.getPolicyId().ifPresent(builder::setPolicyId); - thingWithModifications.getAccessControlList().ifPresent(builder::setPermissions); - thingWithModifications.getAttributes().ifPresent(builder::setAttributes); - thingWithModifications.getFeatures().ifPresent(builder::setFeatures); + private void setThing(final Thing thing) { + this.thing.set(thing); } /** @@ -572,7 +347,7 @@ private long getRevisionNumber() { @Nonnull @Override public Thing getThing() { - return thing; + return thing(); } @Nonnull @@ -635,18 +410,18 @@ public Receive createReceive() { @Override public Receive createReceiveRecover() { // defines how state is updated during recovery - return handleThingEvents.orElse(ReceiveBuilder.create() + return ReceiveBuilder.create() // # Snapshot handling .match(SnapshotOffer.class, ss -> { log.debug("Got SnapshotOffer: {}", ss); - thing = thingSnapshotter.recoverThingFromSnapshotOffer(ss); + setThing(thingSnapshotter.recoverThingFromSnapshotOffer(ss)); }) // # Recovery handling .match(RecoveryCompleted.class, rc -> { - if (thing != null) { - thing = enhanceThingWithLifecycle(thing); + if (thing() != null) { + setThing(enhanceThingWithLifecycle(thing())); log.debug("Thing <{}> was recovered.", thingId); if (isThingActive()) { @@ -655,7 +430,7 @@ public Receive createReceiveRecover() { // expect life cycle to be DELETED. if it's not, then act as if this thing is deleted. if (!isThingDeleted()) { // life cycle isn't known, act as - log.error("Unknown lifecycle state <{}> for Thing <{}>.", thing.getLifecycle(), + log.error("Unknown lifecycle state <{}> for Thing <{}>.", thing().getLifecycle(), thingId); } becomeThingDeletedHandler(); @@ -664,8 +439,10 @@ public Receive createReceiveRecover() { } }) - // # Handle unknown - .matchAny(m -> log.warning("Unknown recover message: {}", m)).build()); +// // # Handle unknown +// .matchAny(m -> log.warning("Unknown recover message: {}", m)) + + .build().orElse(handleThingEvents); } /* @@ -673,13 +450,14 @@ public Receive createReceiveRecover() { * be activated. In return the strategy for the CreateThing command is not needed anymore. */ private void becomeThingCreatedHandler() { - final Collection> thingCreatedStrategies = initThingCreatedStrategies(); - final Receive receive = new StrategyAwareReceiveBuilder() - .matchEach(thingCreatedStrategies) - .matchAny(new MatchAnyAfterInitializeStrategy()) - .setPeekConsumer(getIncomingMessagesLoggerOrNull()) - .build(); - +// final Collection> thingCreatedStrategies = initThingCreatedStrategies(); +// final Receive receive = new StrategyAwareReceiveBuilder() +// .matchEach(thingCreatedStrategies) +// .matchAny(new MatchAnyAfterInitializeStrategy()) +// .setPeekConsumer(getIncomingMessagesLoggerOrNull()) +// .build(); + final LazyStrategyLoader lazyStrategyLoader = new LazyStrategyLoader(); + final Receive receive = ReceiveBuilder.create().matchAny(lazyStrategyLoader::apply).build(); getContext().become(receive, true); getContext().getParent().tell(ThingSupervisorActor.ManualReset.INSTANCE, getSelf()); @@ -687,67 +465,6 @@ private void becomeThingCreatedHandler() { thingSnapshotter.startMaintenanceSnapshots(); } - private Collection> initThingCreatedStrategies() { - final Collection> result = new ArrayList<>(); - - // Thing level - result.add(new ThingConflictStrategy()); - result.add(new ModifyThingStrategy()); - result.add(new RetrieveThingStrategy()); - result.add(new DeleteThingStrategy()); - - // Policy ID - result.add(new RetrievePolicyIdStrategy()); - result.add(new ModifyPolicyIdStrategy()); - - // ACL - result.add(new ModifyAclStrategy()); - result.add(new RetrieveAclStrategy()); - result.add(new ModifyAclEntryStrategy()); - result.add(new RetrieveAclEntryStrategy()); - result.add(new DeleteAclEntryStrategy()); - - // Attributes - result.add(new ModifyAttributesStrategy()); - result.add(new ModifyAttributeStrategy()); - result.add(new RetrieveAttributesStrategy()); - result.add(new RetrieveAttributeStrategy()); - result.add(new DeleteAttributesStrategy()); - result.add(new DeleteAttributeStrategy()); - - // Features - result.add(new ModifyFeaturesStrategy()); - result.add(new ModifyFeatureStrategy()); - result.add(new RetrieveFeaturesStrategy()); - result.add(new RetrieveFeatureStrategy()); - result.add(new DeleteFeaturesStrategy()); - result.add(new DeleteFeatureStrategy()); - - // Feature Definition - result.add(new ModifyFeatureDefinitionStrategy()); - result.add(new RetrieveFeatureDefinitionStrategy()); - result.add(new DeleteFeatureDefinitionStrategy()); - - // Feature Properties - result.add(new ModifyFeaturePropertiesStrategy()); - result.add(new ModifyFeaturePropertyStrategy()); - result.add(new RetrieveFeaturePropertiesStrategy()); - result.add(new RetrieveFeaturePropertyStrategy()); - result.add(new DeleteFeaturePropertiesStrategy()); - result.add(new DeleteFeaturePropertyStrategy()); - - // sudo - result.add(new SudoRetrieveThingStrategy()); - - // TakeSnapshot - result.addAll(thingSnapshotter.strategies()); - - // Persistence specific - result.add(new CheckForActivityStrategy()); - - return result; - } - private void becomeThingDeletedHandler() { final Collection> thingDeletedStrategies = initThingDeletedStrategies(); final Receive receive = new StrategyAwareReceiveBuilder() @@ -783,10 +500,10 @@ private Collection> initThingDeletedStrategies() { private void persistAndApplyEvent(final A event, final Consumer handler) { final A modifiedEvent; - if (thing != null) { + if (thing() != null) { // set version of event to the version of the thing final DittoHeaders newHeaders = event.getDittoHeaders().toBuilder() - .schemaVersion(thing.getImplementedSchemaVersion()) + .schemaVersion(thing().getImplementedSchemaVersion()) .build(); modifiedEvent = (A) event.setDittoHeaders(newHeaders); } else { @@ -848,12 +565,12 @@ private long nextRevision() { * @return Whether the lifecycle of the Thing is active. */ public boolean isThingActive() { - return thing.hasLifecycle(ThingLifecycle.ACTIVE); + return thing().hasLifecycle(ThingLifecycle.ACTIVE); } @Override public boolean isThingDeleted() { - return null == thing || thing.hasLifecycle(ThingLifecycle.DELETED); + return null == thing() || thing().hasLifecycle(ThingLifecycle.DELETED); } private void notifySubscribers(final ThingEvent event) { @@ -969,7 +686,7 @@ static final class CheckForActivity { * @param currentSequenceNr the current {@code lastSequenceNr()} of the ThingPersistenceActor. * @param currentAccessCounter the current {@code accessCounter} of the ThingPersistenceActor. */ - CheckForActivity(final long currentSequenceNr, final long currentAccessCounter) { + public CheckForActivity(final long currentSequenceNr, final long currentAccessCounter) { this.currentSequenceNr = currentSequenceNr; this.currentAccessCounter = currentAccessCounter; } @@ -979,7 +696,7 @@ static final class CheckForActivity { * * @return the current {@code ThingsModelFactory.lastSequenceNr()} of the ThingPersistenceActor. */ - long getCurrentSequenceNr() { + public long getCurrentSequenceNr() { return currentSequenceNr; } @@ -988,7 +705,7 @@ long getCurrentSequenceNr() { * * @return the current {@code accessCounter} of the ThingPersistenceActor. */ - long getCurrentAccessCounter() { + public long getCurrentAccessCounter() { return currentAccessCounter; } @@ -1018,6 +735,95 @@ public String toString() { } + private final class LazyStrategyLoader { + + final Map strategies = new HashMap<>(); + final Map> supplier = new HashMap<>(); + final ReceiveStrategy unhandledStrategy = new MatchAnyAfterInitializeStrategy(); + + private LazyStrategyLoader() { + // Thing level + supplier.put(CreateThing.class, () -> new ThingConflictStrategy()); + supplier.put(ModifyThing.class, () -> new ModifyThingStrategy()); + supplier.put(RetrieveThing.class, () -> new RetrieveThingStrategy()); + supplier.put(DeleteThing.class, () -> new DeleteThingStrategy()); + + // Policy ID + supplier.put(RetrievePolicyId.class, () -> new RetrievePolicyIdStrategy()); + supplier.put(ModifyPolicyId.class, () -> new ModifyPolicyIdStrategy()); + + // ACL + supplier.put(ModifyAcl.class, () -> new ModifyAclStrategy()); + supplier.put(RetrieveAcl.class, () -> new RetrieveAclStrategy()); + supplier.put(ModifyAclEntry.class, () -> new ModifyAclEntryStrategy()); + supplier.put(RetrieveAclEntry.class, () -> new RetrieveAclEntryStrategy()); + supplier.put(DeleteAclEntry.class, () -> new DeleteAclEntryStrategy()); + + // Attributes + supplier.put(ModifyAttributes.class, () -> new ModifyAttributesStrategy()); + supplier.put(ModifyAttribute.class, () -> new ModifyAttributeStrategy()); + supplier.put(RetrieveAttributes.class, () -> new RetrieveAttributesStrategy()); + supplier.put(RetrieveAttribute.class, () -> new RetrieveAttributeStrategy()); + supplier.put(DeleteAttributes.class, () -> new DeleteAttributesStrategy()); + supplier.put(DeleteAttribute.class, () -> new DeleteAttributeStrategy()); + + // Features + supplier.put(ModifyFeatures.class, () -> new ModifyFeaturesStrategy()); + supplier.put(ModifyFeature.class, () -> new ModifyFeatureStrategy()); + supplier.put(RetrieveFeatures.class, () -> new RetrieveFeaturesStrategy()); + supplier.put(RetrieveFeature.class, () -> new RetrieveFeatureStrategy()); + supplier.put(DeleteFeatures.class, () -> new DeleteFeaturesStrategy()); + supplier.put(DeleteFeature.class, () -> new DeleteFeatureStrategy()); + + // Feature Definition + supplier.put(ModifyFeatureDefinition.class, () -> new ModifyFeatureDefinitionStrategy()); + supplier.put(RetrieveFeatureDefinition.class, () -> new RetrieveFeatureDefinitionStrategy()); + supplier.put(DeleteFeatureDefinition.class, () -> new DeleteFeatureDefinitionStrategy()); + + // Feature Properties + supplier.put(ModifyFeatureProperties.class, () -> new ModifyFeaturePropertiesStrategy()); + supplier.put(ModifyFeatureProperty.class, () -> new ModifyFeaturePropertyStrategy()); + supplier.put(RetrieveFeatureProperties.class, () -> new RetrieveFeaturePropertiesStrategy()); + supplier.put(RetrieveFeatureProperty.class, () -> new RetrieveFeaturePropertyStrategy()); + supplier.put(DeleteFeatureProperties.class, () -> new DeleteFeaturePropertiesStrategy()); + supplier.put(DeleteFeatureProperty.class, () -> new DeleteFeaturePropertyStrategy()); + + // sudo + supplier.put(SudoRetrieveThing.class, () -> new SudoRetrieveThingStrategy()); + + // Persistence specific + supplier.put(CheckForActivity.class, () -> new CheckForActivityStrategy()); + + // TakeSnapshot, pre load + thingSnapshotter.strategies().forEach(s -> strategies.put(s.getMatchingClass(), s)); + } + + protected void apply(final Object message) throws Exception { + log.info("Loading strategy for class: {}", message.getClass()); + + final ReceiveStrategy strategy = strategies.computeIfAbsent(message.getClass(), cls -> { + final Supplier receiveStrategySupplier = supplier.get(message.getClass()); + if (receiveStrategySupplier != null) { + log.info("Strategy for {} does not exists, creating.", cls); + return receiveStrategySupplier.get(); + } else { + log.info("Message of type {} cannot be handled.", message.getClass()); + return null; + } + }); + + if (strategy != null) { + if (strategy.getPredicate().defined(message)) { + strategy.getApplyFunction().apply(message); + } else { + strategy.getUnhandledFunction().apply(message); + } + } else { + unhandledStrategy.getApplyFunction().apply(message); + } + } + } + /** * This strategy handles the {@link CreateThing} command. */ @@ -1067,7 +873,7 @@ protected void doApply(final CreateThing command) { } persistAndApplyEvent(thingCreated, event -> { - notifySender(CreateThingResponse.of(thing, thingCreated.getDittoHeaders())); + notifySender(CreateThingResponse.of(thing(), thingCreated.getDittoHeaders())); log.debug("Created new Thing with ID <{}>.", thingId); becomeThingCreatedHandler(); }); @@ -1208,7 +1014,7 @@ protected void doApply(final ModifyThing command) { } private void handleModifyExistingWithV1Command(final ModifyThing command) { - if (JsonSchemaVersion.V_1.equals(thing.getImplementedSchemaVersion())) { + if (JsonSchemaVersion.V_1.equals(thing().getImplementedSchemaVersion())) { handleModifyExistingV1WithV1Command(command); } else { handleModifyExistingV2WithV1Command(command); @@ -1225,7 +1031,7 @@ private void handleModifyExistingV1WithV1Command(final ModifyThing command) { applyModifyCommand(command); } else { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional existingAccessControlList = thing.getAccessControlList(); + final Optional existingAccessControlList = thing().getAccessControlList(); if (existingAccessControlList.isPresent()) { // special apply - take the ACL of the persisted thing instead of the new one in the command: final Thing modifiedThingWithOldAcl = ThingsModelFactory.newThingBuilder(command.getThing()) @@ -1250,7 +1056,7 @@ private void handleModifyExistingV1WithV1Command(final ModifyThing command) { private void handleModifyExistingV2WithV1Command(final ModifyThing command) { // remove any acl information from command and add the current policy Id - final Thing thingWithoutAcl = removeACL(copyPolicyId(thing, command.getThing())); + final Thing thingWithoutAcl = removeACL(copyPolicyId(thing(), command.getThing())); final ThingModified thingModified = ThingModified.of(thingWithoutAcl, nextRevision(), eventTimestamp(), command.getDittoHeaders()); persistAndApplyEvent(thingModified, @@ -1258,7 +1064,7 @@ private void handleModifyExistingV2WithV1Command(final ModifyThing command) { } private void handleModifyExistingWithV2Command(final ModifyThing command) { - if (JsonSchemaVersion.V_1.equals(thing.getImplementedSchemaVersion())) { + if (JsonSchemaVersion.V_1.equals(thing().getImplementedSchemaVersion())) { handleModifyExistingV1WithV2Command(command); } else { handleModifyExistingV2WithV2Command(command); @@ -1285,7 +1091,7 @@ private void handleModifyExistingV1WithV2Command(final ModifyThing command) { private void handleModifyExistingV2WithV2Command(final ModifyThing command) { // ensure the Thing contains a policy ID final Thing thingWithPolicyId = - containsPolicyId(command) ? command.getThing() : copyPolicyId(thing, command.getThing()); + containsPolicyId(command) ? command.getThing() : copyPolicyId(thing(), command.getThing()); applyModifyCommand(ModifyThing.of(command.getThingId(), thingWithPolicyId, null, @@ -1298,7 +1104,7 @@ private void applyModifyCommand(final ModifyThing command) { // make sure that the ThingModified-Event contains all data contained in the resulting thing (this is // required e.g. for updating the search-index) final long nextRevision = nextRevision(); - final ThingBuilder.FromCopy modifiedThingBuilder = thing.toBuilder() + final ThingBuilder.FromCopy modifiedThingBuilder = thing().toBuilder() .setRevision(nextRevision) .setModified(null); mergeThingModifications(command.getThing(), modifiedThingBuilder); @@ -1309,6 +1115,22 @@ private void applyModifyCommand(final ModifyThing command) { event -> notifySender(ModifyThingResponse.modified(thingId, dittoHeaders))); } + /** + * Merges the modifications from {@code thingWithModifications} to {@code builder}. Merge is implemented very + * simple: All first level fields of {@code thingWithModifications} overwrite the first level fields of {@code + * builder}. If a field does not exist in {@code thingWithModifications}, a maybe existing field in {@code + * builder} remains unchanged. + * + * @param thingWithModifications the thing containing the modifications. + * @param builder the builder to be modified. + */ + private void mergeThingModifications(final Thing thingWithModifications, final ThingBuilder.FromCopy builder) { + thingWithModifications.getPolicyId().ifPresent(builder::setPolicyId); + thingWithModifications.getAccessControlList().ifPresent(builder::setPermissions); + thingWithModifications.getAttributes().ifPresent(builder::setAttributes); + thingWithModifications.getFeatures().ifPresent(builder::setFeatures); + } + private boolean containsPolicy(final ModifyThing command) { return containsInitialPolicy(command) || containsPolicyId(command); } @@ -1358,7 +1180,7 @@ public RetrieveThingStrategy() { @Override public FI.TypedPredicate getPredicate() { - return command -> Objects.equals(thingId, command.getId()) && null != thing && !isThingDeleted(); + return command -> Objects.equals(thingId, command.getId()) && null != thing() && !isThingDeleted(); } @Override @@ -1368,8 +1190,8 @@ protected void doApply(final RetrieveThing command) { loadSnapshot(command, snapshotRevisionOptional.get(), getSender()); } else { final JsonObject thingJson = command.getSelectedFields() - .map(sf -> thing.toJson(command.getImplementedSchemaVersion(), sf)) - .orElseGet(() -> thing.toJson(command.getImplementedSchemaVersion())); + .map(sf -> thing().toJson(command.getImplementedSchemaVersion(), sf)) + .orElseGet(() -> thing().toJson(command.getImplementedSchemaVersion())); notifySender(RetrieveThingResponse.of(thingId, thingJson, command.getDittoHeaders())); } @@ -1422,7 +1244,7 @@ public SudoRetrieveThingStrategy() { @Override public FI.TypedPredicate getPredicate() { - return command -> Objects.equals(thingId, command.getId()) && null != thing && !isThingDeleted(); + return command -> Objects.equals(thingId, command.getId()) && null != thing() && !isThingDeleted(); } @Override @@ -1430,8 +1252,8 @@ protected void doApply(final SudoRetrieveThing command) { final Optional selectedFields = command.getSelectedFields(); final JsonSchemaVersion versionToUse = determineSchemaVersion(command); final JsonObject thingJson = selectedFields - .map(sf -> thing.toJson(versionToUse, sf, FieldType.regularOrSpecial())) - .orElseGet(() -> thing.toJson(versionToUse, FieldType.regularOrSpecial())); + .map(sf -> thing().toJson(versionToUse, sf, FieldType.regularOrSpecial())) + .orElseGet(() -> thing().toJson(versionToUse, FieldType.regularOrSpecial())); notifySender(SudoRetrieveThingResponse.of(thingJson, command.getDittoHeaders())); } @@ -1443,7 +1265,7 @@ private JsonSchemaVersion determineSchemaVersion(final SudoRetrieveThing command } private JsonSchemaVersion getOriginalSchemaVersion() { - return null != thing ? thing.getImplementedSchemaVersion() : JsonSchemaVersion.LATEST; + return null != thing() ? thing().getImplementedSchemaVersion() : JsonSchemaVersion.LATEST; } @Override @@ -1495,7 +1317,7 @@ public RetrievePolicyIdStrategy() { @Override protected void doApply(final RetrievePolicyId command) { - final Optional optPolicyId = thing.getPolicyId(); + final Optional optPolicyId = thing().getPolicyId(); if (optPolicyId.isPresent()) { final String policyId = optPolicyId.get(); notifySender(RetrievePolicyIdResponse.of(thingId, policyId, command.getDittoHeaders())); @@ -1526,7 +1348,7 @@ protected void doApply(final ModifyPolicyId command) { final ThingModifiedEvent eventToPersist; final ThingModifyCommandResponse response; - if (thing.getPolicyId().isPresent()) { + if (thing().getPolicyId().isPresent()) { eventToPersist = PolicyIdModified.of(thingId, command.getPolicyId(), nextRevision(), eventTimestamp(), command.getDittoHeaders()); response = ModifyPolicyIdResponse.modified(thingId, command.getDittoHeaders()); @@ -1588,7 +1410,7 @@ public ModifyAclEntryStrategy() { @Override protected void doApply(final ModifyAclEntry command) { - final AccessControlList acl = thing.getAccessControlList().orElseGet(ThingsModelFactory::emptyAcl); + final AccessControlList acl = thing().getAccessControlList().orElseGet(ThingsModelFactory::emptyAcl); final AclEntry modifiedAclEntry = command.getAclEntry(); final Validator aclValidator = AclValidator.newInstance(acl.setEntry(modifiedAclEntry), Thing.MIN_REQUIRED_PERMISSIONS); @@ -1630,7 +1452,7 @@ public DeleteAclEntryStrategy() { @Override protected void doApply(final DeleteAclEntry command) { - final AccessControlList acl = thing.getAccessControlList().orElseGet(ThingsModelFactory::emptyAcl); + final AccessControlList acl = thing().getAccessControlList().orElseGet(ThingsModelFactory::emptyAcl); final AuthorizationSubject authorizationSubject = command.getAuthorizationSubject(); final DittoHeaders dittoHeaders = command.getDittoHeaders(); @@ -1673,7 +1495,7 @@ public RetrieveAclStrategy() { @Override protected void doApply(final RetrieveAcl command) { - final AccessControlList acl = thing.getAccessControlList().orElseGet(ThingsModelFactory::emptyAcl); + final AccessControlList acl = thing().getAccessControlList().orElseGet(ThingsModelFactory::emptyAcl); final JsonObject aclJson = acl.toJson(command.getImplementedSchemaVersion()); notifySender(RetrieveAclResponse.of(thingId, aclJson, command.getDittoHeaders())); } @@ -1695,7 +1517,7 @@ public RetrieveAclEntryStrategy() { @Override protected void doApply(final RetrieveAclEntry command) { - final AccessControlList acl = thing.getAccessControlList().orElseGet(ThingsModelFactory::emptyAcl); + final AccessControlList acl = thing().getAccessControlList().orElseGet(ThingsModelFactory::emptyAcl); final AuthorizationSubject authorizationSubject = command.getAuthorizationSubject(); final DittoHeaders dittoHeaders = command.getDittoHeaders(); if (acl.contains(authorizationSubject)) { @@ -1729,7 +1551,7 @@ protected void doApply(final ModifyAttributes command) { final ThingModifiedEvent eventToPersist; final ThingModifyCommandResponse response; - if (thing.getAttributes().isPresent()) { + if (thing().getAttributes().isPresent()) { eventToPersist = AttributesModified.of(thingId, command.getAttributes(), nextRevision(), eventTimestamp(), dittoHeaders); response = ModifyAttributesResponse.modified(thingId, dittoHeaders); @@ -1760,7 +1582,7 @@ public ModifyAttributeStrategy() { @Override protected void doApply(final ModifyAttribute command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional optionalAttributes = thing.getAttributes(); + final Optional optionalAttributes = thing().getAttributes(); final ThingModifiedEvent eventToPersist; final ThingModifyCommandResponse response; @@ -1799,7 +1621,7 @@ public DeleteAttributesStrategy() { protected void doApply(final DeleteAttributes command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - if (thing.getAttributes().isPresent()) { + if (thing().getAttributes().isPresent()) { final AttributesDeleted attributesDeleted = AttributesDeleted.of(command.getThingId(), nextRevision(), eventTimestamp(), dittoHeaders); persistAndApplyEvent(attributesDeleted, @@ -1828,7 +1650,7 @@ public DeleteAttributeStrategy() { protected void doApply(final DeleteAttribute command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); final JsonPointer attributeJsonPointer = command.getAttributePointer(); - final Optional attributesOptional = thing.getAttributes(); + final Optional attributesOptional = thing().getAttributes(); if (attributesOptional.isPresent()) { final Attributes attributes = attributesOptional.get(); if (attributes.contains(attributeJsonPointer)) { @@ -1862,7 +1684,7 @@ public RetrieveAttributesStrategy() { @Override protected void doApply(final RetrieveAttributes command) { - final Optional optionalAttributes = thing.getAttributes(); + final Optional optionalAttributes = thing().getAttributes(); final DittoHeaders dittoHeaders = command.getDittoHeaders(); if (optionalAttributes.isPresent()) { @@ -1894,7 +1716,7 @@ public RetrieveAttributeStrategy() { @Override protected void doApply(final RetrieveAttribute command) { - final Optional optionalAttributes = thing.getAttributes(); + final Optional optionalAttributes = thing().getAttributes(); final DittoHeaders dittoHeaders = command.getDittoHeaders(); if (optionalAttributes.isPresent()) { @@ -1932,7 +1754,7 @@ protected void doApply(final ModifyFeatures command) { final ThingModifiedEvent eventToPersist; final ThingModifyCommandResponse response; - if (thing.getFeatures().isPresent()) { + if (thing().getFeatures().isPresent()) { eventToPersist = FeaturesModified.of(command.getId(), command.getFeatures(), nextRevision(), eventTimestamp(), dittoHeaders); response = ModifyFeaturesResponse.modified(thingId, dittoHeaders); @@ -1963,7 +1785,7 @@ public ModifyFeatureStrategy() { @Override protected void doApply(final ModifyFeature command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional features = thing.getFeatures(); + final Optional features = thing().getFeatures(); final ThingModifiedEvent eventToPersist; final ThingModifyCommandResponse response; @@ -1997,7 +1819,7 @@ public DeleteFeaturesStrategy() { @Override protected void doApply(final DeleteFeatures command) { - if (thing.getFeatures().isPresent()) { + if (thing().getFeatures().isPresent()) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); final FeaturesDeleted featuresDeleted = FeaturesDeleted.of(thingId, nextRevision(), eventTimestamp(), dittoHeaders); @@ -2027,7 +1849,7 @@ public DeleteFeatureStrategy() { @Override protected void doApply(final DeleteFeature command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional featureIdOptional = thing.getFeatures() + final Optional featureIdOptional = thing().getFeatures() .flatMap(features -> features.getFeature(command.getFeatureId())) .map(Feature::getId); @@ -2058,7 +1880,7 @@ public RetrieveFeaturesStrategy() { @Override protected void doApply(final RetrieveFeatures command) { - final Optional optionalFeatures = thing.getFeatures(); + final Optional optionalFeatures = thing().getFeatures(); if (optionalFeatures.isPresent()) { final Features features = optionalFeatures.get(); final Optional selectedFields = command.getSelectedFields(); @@ -2089,7 +1911,8 @@ public RetrieveFeatureStrategy() { @Override protected void doApply(final RetrieveFeature command) { - final Optional feature = thing.getFeatures().flatMap(fs -> fs.getFeature(command.getFeatureId())); + final Optional feature = + thing().getFeatures().flatMap(fs -> fs.getFeature(command.getFeatureId())); if (feature.isPresent()) { final Feature f = feature.get(); final Optional selectedFields = command.getSelectedFields(); @@ -2121,7 +1944,7 @@ public ModifyFeatureDefinitionStrategy() { @Override protected void doApply(final ModifyFeatureDefinition command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional features = thing.getFeatures(); + final Optional features = thing().getFeatures(); if (features.isPresent()) { final Optional feature = features.get().getFeature(command.getFeatureId()); @@ -2169,7 +1992,7 @@ public DeleteFeatureDefinitionStrategy() { @Override protected void doApply(final DeleteFeatureDefinition command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional features = thing.getFeatures(); + final Optional features = thing().getFeatures(); if (features.isPresent()) { final Optional feature = features.get().getFeature(command.getFeatureId()); @@ -2210,7 +2033,7 @@ public RetrieveFeatureDefinitionStrategy() { @Override protected void doApply(final RetrieveFeatureDefinition command) { - final Optional optionalFeatures = thing.getFeatures(); + final Optional optionalFeatures = thing().getFeatures(); if (optionalFeatures.isPresent()) { final Optional optionalDefinition = optionalFeatures.flatMap(features -> features @@ -2246,7 +2069,7 @@ public ModifyFeaturePropertiesStrategy() { @Override protected void doApply(final ModifyFeatureProperties command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional features = thing.getFeatures(); + final Optional features = thing().getFeatures(); final String featureId = command.getFeatureId(); if (features.isPresent()) { @@ -2295,7 +2118,7 @@ public ModifyFeaturePropertyStrategy() { @Override protected void doApply(final ModifyFeatureProperty command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional features = thing.getFeatures(); + final Optional features = thing().getFeatures(); final String featureId = command.getFeatureId(); if (features.isPresent()) { @@ -2347,7 +2170,7 @@ public DeleteFeaturePropertiesStrategy() { @Override protected void doApply(final DeleteFeatureProperties command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional features = thing.getFeatures(); + final Optional features = thing().getFeatures(); final String featureId = command.getFeatureId(); if (features.isPresent()) { @@ -2389,7 +2212,7 @@ public DeleteFeaturePropertyStrategy() { @Override protected void doApply(final DeleteFeatureProperty command) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); - final Optional featuresOptional = thing.getFeatures(); + final Optional featuresOptional = thing().getFeatures(); final String featureId = command.getFeatureId(); if (featuresOptional.isPresent()) { @@ -2439,7 +2262,7 @@ public RetrieveFeaturePropertiesStrategy() { @Override protected void doApply(final RetrieveFeatureProperties command) { - final Optional optionalFeatures = thing.getFeatures(); + final Optional optionalFeatures = thing().getFeatures(); final String featureId = command.getFeatureId(); final DittoHeaders dittoHeaders = command.getDittoHeaders(); @@ -2480,7 +2303,7 @@ public RetrieveFeaturePropertyStrategy() { @Override protected void doApply(final RetrieveFeatureProperty command) { - final Optional featureOptional = thing.getFeatures() + final Optional featureOptional = thing().getFeatures() .flatMap(features -> features.getFeature(command.getFeatureId())); if (featureOptional.isPresent()) { final DittoHeaders dittoHeaders = command.getDittoHeaders(); @@ -2646,7 +2469,7 @@ abstract class AbstractThingCommandStrategy extends AbstractR @Override public FI.TypedPredicate getPredicate() { - return command -> null != thing && thing.getId() + return command -> null != thing() && thing().getId() .filter(command.getId()::equals) .isPresent(); }