Skip to content

Commit

Permalink
fixed bugs in thing policy enrichment + "copy policy from"
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jun 20, 2022
1 parent ebff0be commit a48a0ef
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 51 deletions.
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.internal.utils.cacheloaders;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -40,7 +42,7 @@
import scala.compat.java8.FutureConverters;

/**
* Helper/Pattern class providing a "ask with retry" pattern based on a provided {@link AskWithRetryConfig}.
* Helper/Pattern class providing an "ask with retry" pattern based on a provided {@link AskWithRetryConfig}.
*/
public final class AskWithRetry {

Expand Down Expand Up @@ -110,6 +112,7 @@ public static <M, A> CompletionStage<A> askWithRetry(final ActorRef actorToAsk,
* @param <A> the type of the answer.
* @return a CompletionStage which is completed by applying the passed in {@code responseMapper} function on the
* response of the asked message or which is completed exceptionally with the Exception.
* @throws java.lang.NullPointerException if any of the passed arguments was {@code null}.
*/
public static <M, A> CompletionStage<A> askWithRetry(final ActorRef actorToAsk,
final M message,
Expand All @@ -118,6 +121,13 @@ public static <M, A> CompletionStage<A> askWithRetry(final ActorRef actorToAsk,
final Executor executor,
final Function<Object, A> responseMapper) {

checkNotNull(actorToAsk, "actorToAsk");
checkNotNull(message, "message");
checkNotNull(config, "config");
checkNotNull(scheduler, "scheduler");
checkNotNull(executor, "executor");
checkNotNull(responseMapper, "responseMapper");

final DittoHeaders dittoHeaders;
if (message instanceof WithDittoHeaders withDittoHeaders) {
dittoHeaders = withDittoHeaders.getDittoHeaders();
Expand Down
Expand Up @@ -23,15 +23,15 @@
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.placeholders.PlaceholderReferenceNotSupportedException;
import org.eclipse.ditto.placeholders.PlaceholderReferenceUnknownFieldException;
import org.eclipse.ditto.internal.utils.akka.logging.AutoCloseableSlf4jLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.placeholders.PlaceholderReferenceNotSupportedException;
import org.eclipse.ditto.placeholders.PlaceholderReferenceUnknownFieldException;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
Expand All @@ -48,16 +48,16 @@ public final class PolicyIdReferencePlaceholderResolver implements ReferencePlac

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(PolicyIdReferencePlaceholderResolver.class);

private final ActorRef commandForwarderActor;
private final ActorRef thingsShardRegion;
private final AskWithRetryConfig askWithRetryConfig;
private final ActorSystem actorSystem;
private final Map<ReferencePlaceholder.ReferencedEntityType, ResolveEntityReferenceStrategy>
supportedEntityTypesToActionMap = new EnumMap<>(ReferencePlaceholder.ReferencedEntityType.class);
private final Set<CharSequence> supportedEntityTypeNames;

private PolicyIdReferencePlaceholderResolver(final ActorRef commandForwarderActor,
private PolicyIdReferencePlaceholderResolver(final ActorRef thingsShardRegion,
final AskWithRetryConfig askWithRetryConfig, final ActorSystem actorSystem) {
this.commandForwarderActor = commandForwarderActor;
this.thingsShardRegion = thingsShardRegion;
this.askWithRetryConfig = askWithRetryConfig;
this.actorSystem = actorSystem;
initializeSupportedEntityTypeReferences();
Expand Down Expand Up @@ -105,7 +105,7 @@ private CompletionStage<String> handlePolicyIdReference(final ReferencePlacehold
.withSelectedFields(referencePlaceholder.getReferencedField().toFieldSelector())
.build();

return AskWithRetry.askWithRetry(commandForwarderActor, retrieveThingCommand, askWithRetryConfig, actorSystem,
return AskWithRetry.askWithRetry(thingsShardRegion, retrieveThingCommand, askWithRetryConfig, actorSystem,
response -> handleRetrieveThingResponse(response, referencePlaceholder, dittoHeaders)
);
}
Expand Down Expand Up @@ -168,16 +168,16 @@ private PlaceholderReferenceNotSupportedException notSupportedException(
* Creates a new {@link PolicyIdReferencePlaceholderResolver} responsible for resolving a policy id of a referenced
* entity.
*
* @param commandForwarderActor the ActorRef of the {@code EdgeCommandForwarderActor} which to ask for "retrieve"
* commands.
* @param thingsShardRegion the ActorRef of the things shard region to retrieve things from.
* @param askWithRetryConfig the configuration for the "ask with retry" pattern applied when asking for retrieves.
* @param actorSystem the actorSystem to load scheduler and dispatcher to use for the "ask with retry" pattern from.
* @return the created PolicyIdReferencePlaceholderResolver instance.
*/
public static PolicyIdReferencePlaceholderResolver of(final ActorRef commandForwarderActor,
final AskWithRetryConfig askWithRetryConfig, final ActorSystem actorSystem) {
public static PolicyIdReferencePlaceholderResolver of(final ActorRef thingsShardRegion,
final AskWithRetryConfig askWithRetryConfig,
final ActorSystem actorSystem) {

return new PolicyIdReferencePlaceholderResolver(commandForwarderActor, askWithRetryConfig, actorSystem);
return new PolicyIdReferencePlaceholderResolver(thingsShardRegion, askWithRetryConfig, actorSystem);
}

interface ResolveEntityReferenceStrategy {
Expand Down
Expand Up @@ -142,10 +142,12 @@ final class ThingCommandEnforcement
* @param actorSystem the actor system to load config, dispatchers from.
* @param policiesShardRegion the policies shard region to load policies from and to use in order to create new
* (inline) policies when creating new things.
* @param thingsShardRegion the things shard region to load things from (e.g. for "_copyPolicyFrom" feature)
* @param enforcementConfig the configuration to apply for this command enforcement implementation.
*/
public ThingCommandEnforcement(final ActorSystem actorSystem,
final ActorRef policiesShardRegion,
final ActorRef thingsShardRegion,
final EnforcementConfig enforcementConfig) {

this.actorSystem = actorSystem;
Expand All @@ -158,8 +160,8 @@ public ThingCommandEnforcement(final ActorSystem actorSystem,
DittoLoggerFactory.getThreadSafeLogger(ThingCommandEnforcement.class.getName() +
".namespace." + loggedNamespace)));

policyIdReferencePlaceholderResolver = PolicyIdReferencePlaceholderResolver.of(policiesShardRegion,
enforcementConfig.getAskWithRetryConfig(), actorSystem);
policyIdReferencePlaceholderResolver = PolicyIdReferencePlaceholderResolver.of(
thingsShardRegion, enforcementConfig.getAskWithRetryConfig(), actorSystem);
}

@Override
Expand Down Expand Up @@ -448,16 +450,9 @@ private CompletionStage<CreateThing> enforceCreateThingBySelf(final ThingCommand
});
} else {
// Other commands cannot be authorized by policy contained in self.
final DittoRuntimeException error;
if (thingCommand instanceof ThingModifyCommand) {
error = ThingNotModifiableException.newBuilder(thingCommand.getEntityId())
.dittoHeaders(thingCommand.getDittoHeaders())
.build();
} else {
error = ThingNotAccessibleException.newBuilder(thingCommand.getEntityId())
.dittoHeaders(thingCommand.getDittoHeaders())
.build();
}
final DittoRuntimeException error = ThingNotAccessibleException.newBuilder(thingCommand.getEntityId())
.dittoHeaders(thingCommand.getDittoHeaders())
.build();
LOGGER.withCorrelationId(command)
.info("Enforcer was not existing for Thing <{}> and no auth info was inlined, responding with: {} - {}",
thingCommand.getEntityId(), error.getClass().getSimpleName(), error.getMessage());
Expand Down
Expand Up @@ -38,13 +38,15 @@ public final class ThingEnforcement extends AbstractEnforcementReloaded<Signal<?

public ThingEnforcement(final ActorSystem actorSystem,
final ActorRef policiesShardRegion,
final ActorRef thingsShardRegion,
final EnforcementConfig enforcementConfig) {

enforcementStrategies = List.of(
new LiveSignalEnforcement(),
new ThingCommandEnforcement(
actorSystem,
policiesShardRegion,
thingsShardRegion,
enforcementConfig
)
);
Expand Down
Expand Up @@ -197,7 +197,7 @@ private CompletionStage<Optional<RetrievePolicyResponse>> retrieveInlinedPolicyF
}
).exceptionally(error -> {
log.withCorrelationId(getCorrelationIdOrNull(error, retrievePolicy))
.error("Retrieving inlined policy after RetrieveThing", error);
.error(error, "Retrieving inlined policy after RetrieveThing");
return Optional.empty();
});
}
Expand Down
Expand Up @@ -27,13 +27,16 @@
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.internal.utils.persistentactors.AbstractPersistenceSupervisor;
import org.eclipse.ditto.internal.utils.persistentactors.TargetActorWithMessage;
import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
import org.eclipse.ditto.internal.utils.pubsub.LiveSignalPub;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingUnavailableException;
Expand Down Expand Up @@ -66,6 +69,7 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor<Th

private final ActorRef pubSubMediator;
private final ActorRef policiesShardRegion;
private final ActorRef thingsShardRegion;
private final DistributedPub<ThingEvent<?>> distributedPubThingEventsForTwin;
@Nullable private final ThingPersistenceActorPropsFactory thingPersistenceActorPropsFactory;
private final DefaultEnforcementConfig enforcementConfig;
Expand All @@ -78,7 +82,7 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor<Th

@SuppressWarnings("unused")
private ThingSupervisorActor(final ActorRef pubSubMediator,
final ActorRef policiesShardRegion,
@Nullable final ActorRef policiesShardRegion,
final DistributedPub<ThingEvent<?>> distributedPubThingEventsForTwin,
final LiveSignalPub liveSignalPub,
@Nullable final ThingPersistenceActorPropsFactory thingPersistenceActorPropsFactory,
Expand All @@ -88,13 +92,14 @@ private ThingSupervisorActor(final ActorRef pubSubMediator,
super(blockedNamespaces);

this.pubSubMediator = pubSubMediator;
this.policiesShardRegion = policiesShardRegion;
this.distributedPubThingEventsForTwin = distributedPubThingEventsForTwin;
this.thingPersistenceActorPropsFactory = thingPersistenceActorPropsFactory;
persistenceActorChild = thingPersistenceActorRef;
enforcementConfig = DefaultEnforcementConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
);
final DefaultScopedConfig dittoScoped =
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config());
enforcementConfig = DefaultEnforcementConfig.of(dittoScoped);
final DittoThingsConfig thingsConfig = DittoThingsConfig.of(dittoScoped);

materializer = Materializer.createMaterializer(getContext());
responseReceiverCache = ResponseReceiverCache.lookup(getContext().getSystem());

Expand All @@ -108,9 +113,25 @@ private ThingSupervisorActor(final ActorRef pubSubMediator,
thingPersistenceActorSelection = getContext().actorSelection(PERSISTENCE_ACTOR_NAME);
}

final ShardRegionProxyActorFactory shardRegionProxyActorFactory =
ShardRegionProxyActorFactory.newInstance(getContext().getSystem(), thingsConfig.getClusterConfig());

if (null != policiesShardRegion) {
this.policiesShardRegion = policiesShardRegion;
} else {
this.policiesShardRegion = shardRegionProxyActorFactory.getShardRegionProxyActor(
PoliciesMessagingConstants.CLUSTER_ROLE,
PoliciesMessagingConstants.SHARD_REGION
);
}
thingsShardRegion = shardRegionProxyActorFactory.getShardRegionProxyActor(
ThingsMessagingConstants.CLUSTER_ROLE,
ThingsMessagingConstants.SHARD_REGION
);

try {
inlinePolicyEnrichment = new SupervisorInlinePolicyEnrichment(getContext().getSystem(), log, getEntityId(),
thingPersistenceActorSelection, policiesShardRegion, enforcementConfig);
thingPersistenceActorSelection, this.policiesShardRegion, enforcementConfig);
} catch (final Exception e) {
throw new IllegalStateException("Entity Id could not be retrieved", e);
}
Expand All @@ -128,13 +149,25 @@ private ThingSupervisorActor(final ActorRef pubSubMediator,
* </p>
*
* @param pubSubMediator the pub/sub mediator ActorRef to required for the creation of the ThingEnforcerActor.
* @param policiesShardRegion the shard region of the "policies" shard in order to e.g. load policies.
* @param distributedPubThingEventsForTwin distributed-pub access for publishing thing events on "twin" channel.
* @param liveSignalPub distributed-pub access for "live" channel.
* @param propsFactory factory for creating Props to be used for creating
* @param blockedNamespaces the blocked namespaces functionality to retrieve/subscribe for blocked namespaces.
* @return the {@link Props} to create this actor.
*/
public static Props props(final ActorRef pubSubMediator,
final DistributedPub<ThingEvent<?>> distributedPubThingEventsForTwin,
final LiveSignalPub liveSignalPub,
final ThingPersistenceActorPropsFactory propsFactory,
@Nullable final BlockedNamespaces blockedNamespaces) {

return Props.create(ThingSupervisorActor.class, pubSubMediator, null,
distributedPubThingEventsForTwin, liveSignalPub, propsFactory, null, blockedNamespaces);
}

/**
* Props for creating a {@code ThingSupervisorActor} inside of unit tests.
*/
public static Props props(final ActorRef pubSubMediator,
final ActorRef policiesShardRegion,
final DistributedPub<ThingEvent<?>> distributedPubThingEventsForTwin,
Expand Down Expand Up @@ -270,6 +303,7 @@ protected Props getPersistenceEnforcerProps(final ThingId entityId) {
final ThingEnforcement thingEnforcement = new ThingEnforcement(
actorSystem,
policiesShardRegion,
thingsShardRegion,
enforcementConfig
);

Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.cluster.RetrieveStatisticsDetailsResponseSupplier;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.health.DefaultHealthCheckingActorFactory;
import org.eclipse.ditto.internal.utils.health.HealthCheckingActorOptions;
Expand All @@ -39,7 +38,6 @@
import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
import org.eclipse.ditto.internal.utils.pubsub.LiveSignalPub;
import org.eclipse.ditto.internal.utils.pubsub.ThingEventPubSubFactory;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.things.service.common.config.ThingsConfig;
Expand Down Expand Up @@ -87,22 +85,13 @@ private ThingsRootActor(final ThingsConfig thingsConfig,
final DistributedPub<ThingEvent<?>> distributedPubThingEventsForTwin = pubSubFactory.startDistributedPub();
final LiveSignalPub liveSignalPub = LiveSignalPub.of(getContext(), distributedAcks);

final ShardRegionProxyActorFactory shardRegionProxyActorFactory =
ShardRegionProxyActorFactory.newInstance(actorSystem, clusterConfig);

final ActorRef policiesShardRegion = shardRegionProxyActorFactory.getShardRegionProxyActor(
PoliciesMessagingConstants.CLUSTER_ROLE,
PoliciesMessagingConstants.SHARD_REGION
);

final BlockedNamespaces blockedNamespaces = BlockedNamespaces.of(actorSystem);
// start cluster singleton that writes to the distributed cache of blocked namespaces
final Props blockedNamespacesUpdaterProps = BlockedNamespacesUpdater.props(blockedNamespaces, pubSubMediator);
ClusterUtil.startSingleton(actorSystem, getContext(), CLUSTER_ROLE,
BlockedNamespacesUpdater.ACTOR_NAME, blockedNamespacesUpdaterProps);

final Props thingSupervisorActorProps = getThingSupervisorActorProps(pubSubMediator,
policiesShardRegion,
distributedPubThingEventsForTwin,
liveSignalPub,
propsFactory,
Expand Down Expand Up @@ -184,13 +173,12 @@ private void handleRetrieveStatisticsDetails(final RetrieveStatisticsDetails com
}

private static Props getThingSupervisorActorProps(final ActorRef pubSubMediator,
final ActorRef policiesShardRegion,
final DistributedPub<ThingEvent<?>> distributedPubThingEventsForTwin,
final LiveSignalPub liveSignalPub,
final ThingPersistenceActorPropsFactory propsFactory,
final BlockedNamespaces blockedNamespaces) {

return ThingSupervisorActor.props(pubSubMediator, policiesShardRegion, distributedPubThingEventsForTwin,
return ThingSupervisorActor.props(pubSubMediator, distributedPubThingEventsForTwin,
liveSignalPub, propsFactory, blockedNamespaces);
}

Expand Down

0 comments on commit a48a0ef

Please sign in to comment.