Skip to content

Commit

Permalink
[#1228] Evaluate live-channel-condition-match in Things
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 30, 2021
1 parent 99b51c1 commit 3e57a94
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.LiveChannelTimeoutStrategy;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
Expand All @@ -27,6 +29,7 @@
import org.eclipse.ditto.internal.utils.persistentactors.events.EventStrategy;
import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThing;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingBuilder;
Expand All @@ -37,6 +40,7 @@
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.things.service.common.config.DittoThingsConfig;
import org.eclipse.ditto.things.service.common.config.ThingConfig;
Expand Down Expand Up @@ -115,6 +119,22 @@ public static Props props(final ThingId thingId, final DistributedPub<ThingEvent
return props(thingId, distributedPub, new ThingMongoSnapshotAdapter(pubSubMediator));
}

@Override
public void onQuery(final Command<?> command, final WithDittoHeaders response) {
if (response.getDittoHeaders().didLiveChannelConditionMatch()) {
final var liveChannelTimeoutStrategy = response.getDittoHeaders()
.getLiveChannelTimeoutStrategy()
.orElse(LiveChannelTimeoutStrategy.FAIL);
if (liveChannelTimeoutStrategy != LiveChannelTimeoutStrategy.USE_TWIN &&
response instanceof ThingQueryCommandResponse) {
final var queryResponse = (ThingQueryCommandResponse<?>) response;
super.onQuery(command, queryResponse.setEntity(JsonFactory.nullLiteral()));
return;
}
}
super.onQuery(command, response);
}

@Override
public String persistenceId() {
return entityId.getEntityType() + ":" + entityId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.signals.WithOptionalEntity;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.headers.conditional.ConditionalHeadersValidator;
Expand Down Expand Up @@ -70,13 +71,23 @@ public Result<ThingEvent<?>> apply(final Context<ThingId> context, @Nullable fin
final var thingConditionFailed = command.getDittoHeaders()
.getCondition()
.flatMap(condition -> ThingConditionValidator.validate(command, condition, entity));
final var liveChannelConditionPassed = command.getDittoHeaders()
.getLiveChannelCondition()
.map(condition -> ThingConditionValidator.validate(command, condition, entity).isEmpty())
.orElse(false);

final Result<ThingEvent<?>> result;
if (thingConditionFailed.isPresent()) {
final var conditionFailedException = thingConditionFailed.get();
loggerWithCorrelationId.debug("Validating condition failed with exception <{}>.",
conditionFailedException.getMessage());
result = ResultFactory.newErrorResult(conditionFailedException, command);
} else if (liveChannelConditionPassed) {
final var enhancedHeaders = command.getDittoHeaders()
.toBuilder()
.putHeader(DittoHeaderDefinition.LIVE_CHANNEL_CONDITION_MATCHED.getKey(), Boolean.TRUE.toString())
.build();
result = super.apply(context, entity, nextRevision, command.setDittoHeaders(enhancedHeaders));
} else {
result = super.apply(context, entity, nextRevision, command);
}
Expand Down Expand Up @@ -105,5 +116,4 @@ protected Optional<Metadata> calculateRelativeMetadata(@Nullable final Thing ent
public boolean isDefined(final C command) {
return command instanceof ThingCommand;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import static org.assertj.core.api.Assertions.fail;
import static org.eclipse.ditto.things.model.assertions.DittoThingsAssertions.assertThat;
import static org.eclipse.ditto.things.service.persistence.actors.ETagTestUtils.retrieveThingResponse;

import java.time.Instant;
import java.util.NoSuchElementException;
Expand All @@ -23,25 +22,32 @@

import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonParseOptions;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.ShutdownReasonFactory;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.base.model.entity.Revision;
import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.test.Retry;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonParseOptions;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.Attributes;
import org.eclipse.ditto.things.model.Feature;
import org.eclipse.ditto.things.model.FeatureDefinition;
import org.eclipse.ditto.things.model.Features;
import org.eclipse.ditto.things.model.PolicyIdMissingException;
import org.eclipse.ditto.things.model.Thing;
Expand All @@ -51,10 +57,6 @@
import org.eclipse.ditto.things.model.ThingRevision;
import org.eclipse.ditto.things.model.ThingTooLargeException;
import org.eclipse.ditto.things.model.ThingsModelFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.test.Retry;
import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.ShutdownReasonFactory;
import org.eclipse.ditto.things.model.signals.commands.TestConstants;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.exceptions.FeatureNotAccessibleException;
Expand All @@ -76,11 +78,14 @@
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveAttributeResponse;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveAttributes;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeature;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeatureDefinition;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeatureDefinitionResponse;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeatureProperty;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeaturePropertyResponse;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeatures;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThings;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.things.model.signals.events.ThingCreated;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.things.model.signals.events.ThingModified;
Expand Down Expand Up @@ -361,7 +366,7 @@ private void doTestModifyThingKeepsOverwritesExistingFirstLevelFieldsWhenExplici

expectMsgEquals(
ETagTestUtils.modifyThingResponse(thingWithFirstLevelFields, thingWithDifferentFirstLevelFields,
dittoHeaders, false));
dittoHeaders, false));

assertPublishEvent(ThingModified.of(thingWithDifferentFirstLevelFields, 2L, TIMESTAMP, dittoHeaders,
null));
Expand Down Expand Up @@ -412,7 +417,8 @@ private void doTestModifyThingKeepsAlreadyExistingFirstLevelFieldsWhenNotExplici
underTest.tell(modifyThingCommand, getRef());

expectMsgEquals(
ETagTestUtils.modifyThingResponse(thingWithFirstLevelFields, minimalThing, dittoHeaders, false));
ETagTestUtils.modifyThingResponse(thingWithFirstLevelFields, minimalThing, dittoHeaders,
false));

// we expect that in the Event the minimalThing was merged with thingWithFirstLevelFields:
assertPublishEvent(ThingModified.of(thingWithFirstLevelFields, 2L, TIMESTAMP, dittoHeaders,
Expand Down Expand Up @@ -602,12 +608,14 @@ public void modifyFeatures() {
ModifyFeatures.of(thingId, featuresToModify, headersMockWithOtherAuth);
underTest.tell(modifyFeatures, getRef());
expectMsgEquals(
ETagTestUtils.modifyFeaturesResponse(thingId, featuresToModify, headersMockWithOtherAuth, false));
ETagTestUtils.modifyFeaturesResponse(thingId, featuresToModify, headersMockWithOtherAuth,
false));

final RetrieveFeatures retrieveFeatures = RetrieveFeatures.of(thingId, headersMockWithOtherAuth);
underTest.tell(retrieveFeatures, getRef());
expectMsgEquals(ETagTestUtils.retrieveFeaturesResponse(thingId, featuresToModify, featuresToModify.toJson(),
headersMockWithOtherAuth));
expectMsgEquals(
ETagTestUtils.retrieveFeaturesResponse(thingId, featuresToModify, featuresToModify.toJson(),
headersMockWithOtherAuth));
}
};
}
Expand Down Expand Up @@ -646,7 +654,8 @@ public void modifyAttributes() {
ModifyAttributes.of(thingId, attributesToModify, headersMockWithOtherAuth);
underTest.tell(modifyAttributes, getRef());
expectMsgEquals(
ETagTestUtils.modifyAttributesResponse(thingId, attributesToModify, headersMockWithOtherAuth, false));
ETagTestUtils.modifyAttributesResponse(thingId, attributesToModify, headersMockWithOtherAuth,
false));

final RetrieveAttributes retrieveAttributes = RetrieveAttributes.of(thingId, headersMockWithOtherAuth);
underTest.tell(retrieveAttributes, getRef());
Expand Down Expand Up @@ -688,7 +697,8 @@ public void modifyAttribute() {
ModifyAttribute.of(thingId, attributeKey, newAttributeValue, dittoHeadersV2);
underTest.tell(authorizedCommand, getRef());
expectMsgEquals(
ETagTestUtils.modifyAttributeResponse(thingId, attributeKey, newAttributeValue, dittoHeadersV2, false));
ETagTestUtils.modifyAttributeResponse(thingId, attributeKey, newAttributeValue, dittoHeadersV2,
false));
}
};
}
Expand Down Expand Up @@ -893,8 +903,9 @@ public void ensureSequenceNumberCorrectness() {
.withSelectedFields(versionFieldSelector)
.build();
underTest.tell(retrieveThing, getRef());
expectMsgEquals(ETagTestUtils.retrieveThingResponse(thingExpected, thingExpected.toJson(versionFieldSelector),
dittoHeadersV2));
expectMsgEquals(
ETagTestUtils.retrieveThingResponse(thingExpected, thingExpected.toJson(versionFieldSelector),
dittoHeadersV2));
}
};
}
Expand Down Expand Up @@ -940,7 +951,8 @@ public void ensureSequenceNumberCorrectnessAfterRecovery() {

Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
underTestAfterRestart.tell(retrieveThing, getRef());
expectMsgEquals(ETagTestUtils.retrieveThingResponse(thingExpected, thingExpected.toJson(versionFieldSelector),
expectMsgEquals(ETagTestUtils.retrieveThingResponse(thingExpected,
thingExpected.toJson(versionFieldSelector),
dittoHeadersV2));
});
}
Expand Down Expand Up @@ -1142,7 +1154,75 @@ public void retrieveFeatureReturnsExpected() {
final RetrieveFeature retrieveFeatureCmd = RetrieveFeature.of(thingId, gyroscopeFeatureId, dittoHeadersV2);
thingPersistenceActor.tell(retrieveFeatureCmd, getRef());
expectMsgEquals(
ETagTestUtils.retrieveFeatureResponse(thingId, gyroscopeFeature, gyroscopeFeature.toJson(), dittoHeadersV2));
ETagTestUtils.retrieveFeatureResponse(thingId, gyroscopeFeature, gyroscopeFeature.toJson(),
dittoHeadersV2));
}};
}

@Test
public void retrieveFeaturePropertyWithLiveChannelCondition() {
final ThingId thingId = ThingId.of("org.eclipse.ditto", "thing1");
final String gyroscopeFeatureId = "Gyroscope.0";
final Feature gyroscopeFeature = ThingsModelFactory.newFeatureBuilder()
.definition(FeatureDefinition.fromJson("[\"a:b:c\"]"))
.properties(ThingsModelFactory.newFeaturePropertiesBuilder()
.set("status", JsonFactory.newObjectBuilder()
.set("minRangeValue", -2000)
.set("xValue", -0.05071427300572395)
.set("units", "Deg/s")
.set("yValue", -0.4192921817302704)
.set("zValue", 0.20766231417655945)
.set("maxRangeValue", 2000)
.build())
.build())
.withId(gyroscopeFeatureId)
.build();
final Thing thing = ThingsModelFactory.newThingBuilder()
.setId(thingId)
.setPolicyId(POLICY_ID)
.setFeature(gyroscopeFeature)
.build();

new TestKit(actorSystem) {{
final ActorRef thingPersistenceActor = createPersistenceActorFor(thing);

// create Thing
final CreateThing createThing = CreateThing.of(thing, null, dittoHeadersV2);
thingPersistenceActor.tell(createThing, getRef());
expectMsgClass(CreateThingResponse.class);

// query: live channel condition matched with no twin fallback
final var matchingHeaders = dittoHeadersV2.toBuilder()
.liveChannelCondition(String.format("exists(/features/%s)", gyroscopeFeatureId))
.build();
final var command1 =
RetrieveFeatureDefinition.of(thingId, gyroscopeFeatureId, matchingHeaders);
thingPersistenceActor.tell(command1, getRef());
final var response1 = expectMsgClass(RetrieveFeatureDefinitionResponse.class);
assertThat(response1.getDittoHeaders().didLiveChannelConditionMatch()).isTrue();
assertThat(response1.getDefinition()).isEqualTo(FeatureDefinition.fromJson(JsonFactory.nullArray()));

// query: live channel condition matched with twin fallback
final var propertiesPointer = JsonPointer.of("/status/minRangeValue");
final var withTwinFallback = matchingHeaders.toBuilder()
.putHeader(DittoHeaderDefinition.ON_LIVE_CHANNEL_TIMEOUT.getKey(), "use-twin")
.build();
final var command2 =
RetrieveFeatureProperty.of(thingId, gyroscopeFeatureId, propertiesPointer, withTwinFallback);
thingPersistenceActor.tell(command2, getRef());
final var response2 = expectMsgClass(RetrieveFeaturePropertyResponse.class);
assertThat(response2.getDittoHeaders().didLiveChannelConditionMatch()).isTrue();
assertThat(response2.getPropertyValue()).isEqualTo(JsonValue.of(-2000));

// query: live channel condition does not match
final var mismatchingHeaders = dittoHeadersV2.toBuilder()
.liveChannelCondition("exists(/features/nonexistentFeature)")
.build();
final var command3 = RetrieveThing.of(thingId, mismatchingHeaders);
thingPersistenceActor.tell(command3, getRef());
final var response3 = expectMsgClass(RetrieveThingResponse.class);
assertThat(response3.getDittoHeaders().didLiveChannelConditionMatch()).isFalse();
assertThat(response3.getEntity().asObject()).isNotEmpty();
}};
}

Expand All @@ -1158,7 +1238,8 @@ public void createThingInV2AndUpdateWithV2AndChangedPolicyId() {
JsonSchemaVersion.V_2,
thing_2,
JsonSchemaVersion.V_2,
modifyThing -> ETagTestUtils.modifyThingResponse(thing, thing_2, modifyThing.getDittoHeaders(),
modifyThing -> ETagTestUtils.modifyThingResponse(thing, thing_2,
modifyThing.getDittoHeaders(),
false));
assertPublishEvent(ThingModified.of(thing_2, 2L, TIMESTAMP, headersUsed, null));
}};
Expand Down

0 comments on commit 3e57a94

Please sign in to comment.