Skip to content

Commit

Permalink
some cleanups in ThingCommandEnforcement
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed May 4, 2022
1 parent cf24bb9 commit 409d7f0
Showing 1 changed file with 31 additions and 33 deletions.
Expand Up @@ -161,14 +161,11 @@ public final class ThingCommandEnforcement
* Creates a new instance of the thing command enforcer.
*
* @param actorSystem TODO TJ doc
* @param ackReceiverActor
* @param policiesShardRegion the policies shard region to load policies from and to use in order to create new
* (inline) policies when creating new things.
* @param creationRestrictionEnforcer the CreationRestrictionEnforcer to apply in order to enforce creation of new
* things based on its config.
* @param enforcementConfig the configuration to apply for this command enforcement implementation.
* @param preEnforcer
* @param liveSignalPub
*/
public ThingCommandEnforcement(final ActorSystem actorSystem,
final ActorRef ackReceiverActor,
Expand All @@ -193,7 +190,8 @@ public ThingCommandEnforcement(final ActorSystem actorSystem,
this.preEnforcer = preEnforcer;
policyIdReferencePlaceholderResolver =
PolicyIdReferencePlaceholderResolver.of(policiesShardRegion, enforcementConfig.getAskWithRetryConfig(),
actorSystem.getScheduler(), actorSystem.getDispatcher()); // TODO TJ need to configure different executor?
actorSystem.getScheduler(),
actorSystem.getDispatcher()); // TODO TJ need to configure different executor?
this.liveSignalPub = liveSignalPub;
responseReceiverCache = ResponseReceiverCache.lookup(actorSystem);
}
Expand Down Expand Up @@ -300,11 +298,6 @@ private boolean isWotTdRequestingThingQueryCommand(final ThingCommand<?> thingCo

/**
* TODO TJ move where? and call from where and when?
* @param command
* @param response
* @param startTime
* @param policyEnforcer
* @return
*/
private CompletionStage<ThingQueryCommandResponse<?>> doSmartChannelSelection(final ThingQueryCommand<?> command,
final ThingQueryCommandResponse<?> response, final Instant startTime, final PolicyEnforcer policyEnforcer) {
Expand Down Expand Up @@ -365,10 +358,8 @@ private Function<Object, CompletionStage<ThingQueryCommandResponse<?>>> getFallb
} else if (response instanceof AskException || response instanceof AskTimeoutException) {
return applyTimeoutStrategy(liveCommand, twinResponse);
} else {
final var errorToReport = reportErrorOrResponse(
"before building JsonView for live response via smart channel selection",
throw reportErrorOrResponse("before building JsonView for live response via smart channel selection",
response, null, liveCommand.getDittoHeaders());
throw errorToReport;
}
};
}
Expand All @@ -390,6 +381,7 @@ static boolean isLiveQueryCommandWithTimeoutStrategy(final Signal<?> command) {
Signal.isChannelLive(command);
}

@SuppressWarnings("unchecked")
private static <T extends DittoHeadersSettable<?>> T setAdditionalHeaders(final T settable,
final DittoHeaders commandHeaders) {
final DittoHeaders dittoHeaders = settable.getDittoHeaders();
Expand Down Expand Up @@ -488,22 +480,25 @@ static <T extends ThingQueryCommandResponse<T>> T buildJsonViewForThingQueryComm
* @param authorizedThingCommand command to prepare.
* @return the passed in authorizedThingCommand.
*/
private ThingCommand<?> prepareThingCommandBeforeSendingToPersistence(final ThingCommand<?> authorizedThingCommand) {
private ThingCommand<?> prepareThingCommandBeforeSendingToPersistence(
final ThingCommand<?> authorizedThingCommand) {

if (NAMESPACE_INSPECTION_LOGGERS.containsKey(authorizedThingCommand.getEntityId().getNamespace())) {
final ThreadSafeDittoLogger namespaceLogger = NAMESPACE_INSPECTION_LOGGERS
.get(authorizedThingCommand.getEntityId().getNamespace()).withCorrelationId(authorizedThingCommand);
if (authorizedThingCommand instanceof ThingModifyCommand) {
final JsonValue value = ((ThingModifyCommand<?>) authorizedThingCommand).getEntity().orElse(null);
if (null != value) {
final Set<ResourceKey> resourceKeys = calculateLeaves(authorizedThingCommand.getResourcePath(), value);
final Set<ResourceKey> resourceKeys =
calculateLeaves(authorizedThingCommand.getResourcePath(), value);
namespaceLogger.info("Forwarding modify command type <{}> with resourceKeys <{}>",
authorizedThingCommand.getType(),
resourceKeys);
}
}
namespaceLogger
.debug("Forwarding command type <{}>: <{}>", authorizedThingCommand.getType(), authorizedThingCommand);
.debug("Forwarding command type <{}>: <{}>", authorizedThingCommand.getType(),
authorizedThingCommand);
}
return authorizedThingCommand;
}
Expand Down Expand Up @@ -664,7 +659,8 @@ private CompletionStage<Policy> retrievePolicyWithEnforcement(final DittoHeaders
.build();

return AskWithRetry.askWithRetry(policiesShardRegion, RetrievePolicy.of(policyId, adjustedHeaders),
enforcementConfig.getAskWithRetryConfig(), actorSystem.getScheduler(), actorSystem.getDispatcher(), // TODO TJ need to make scheduler and dispatcher configurable?
enforcementConfig.getAskWithRetryConfig(), actorSystem.getScheduler(), actorSystem.getDispatcher(),
// TODO TJ need to make scheduler and dispatcher configurable?
response -> {
if (response instanceof RetrievePolicyResponse rpr) {
return rpr.getPolicy();
Expand Down Expand Up @@ -757,11 +753,11 @@ private static ThingCommand<?> transformModifyThingToCreateThing(final ThingComm
* Authorize a thing-command by a policy enforcer.
*
* @param <T> type of the thing-command.
* @param policyEnforcer the policy enforcer.
* @param enforcer the enforcer.
* @param command the command to authorize.
* @return optionally the authorized command extended by read subjects.
*/
static <T extends ThingCommand<T>> T authorizeByPolicyOrThrow(final Enforcer policyEnforcer,
static <T extends ThingCommand<T>> T authorizeByPolicyOrThrow(final Enforcer enforcer,
final ThingCommand<T> command) {

final var thingResourceKey = PoliciesResourceType.thingResource(command.getResourcePath());
Expand All @@ -770,29 +766,28 @@ static <T extends ThingCommand<T>> T authorizeByPolicyOrThrow(final Enforcer pol

final boolean commandAuthorized;
if (command instanceof MergeThing mergeThing) {
commandAuthorized = enforceMergeThingCommand(policyEnforcer, mergeThing, thingResourceKey,
authorizationContext);
commandAuthorized = enforceMergeThingCommand(enforcer, mergeThing, thingResourceKey, authorizationContext);
} else if (command instanceof ThingModifyCommand) {
commandAuthorized = policyEnforcer.hasUnrestrictedPermissions(thingResourceKey, authorizationContext,
commandAuthorized = enforcer.hasUnrestrictedPermissions(thingResourceKey, authorizationContext,
Permission.WRITE);
} else {
commandAuthorized =
policyEnforcer.hasPartialPermissions(thingResourceKey, authorizationContext, Permission.READ);
enforcer.hasPartialPermissions(thingResourceKey, authorizationContext, Permission.READ);
}

final var condition = dittoHeaders.getCondition();
if (!(command instanceof CreateThing) && condition.isPresent()) {
enforceReadPermissionOnCondition(condition.get(), policyEnforcer, dittoHeaders, () ->
enforceReadPermissionOnCondition(condition.get(), enforcer, dittoHeaders, () ->
ThingConditionFailedException.newBuilderForInsufficientPermission(dittoHeaders).build());
}
final var liveChannelCondition = dittoHeaders.getLiveChannelCondition();
if ((command instanceof ThingQueryCommand) && liveChannelCondition.isPresent()) {
enforceReadPermissionOnCondition(liveChannelCondition.get(), policyEnforcer, dittoHeaders, () ->
enforceReadPermissionOnCondition(liveChannelCondition.get(), enforcer, dittoHeaders, () ->
ThingConditionFailedException.newBuilderForInsufficientLiveChannelPermission(dittoHeaders).build());
}

if (commandAuthorized) {
return addEffectedReadSubjectsToThingSignal(command, policyEnforcer);
return addEffectedReadSubjectsToThingSignal(command, enforcer);
} else {
throw errorForThingCommand(command);
}
Expand All @@ -819,15 +814,15 @@ static <T extends Signal<T>> T addEffectedReadSubjectsToThingSignal(final Signal
}

private static void enforceReadPermissionOnCondition(final String condition,
final Enforcer policyEnforcer,
final Enforcer enforcer,
final DittoHeaders dittoHeaders,
final Supplier<DittoRuntimeException> exceptionSupplier) {

final var authorizationContext = dittoHeaders.getAuthorizationContext();
final var rootNode = tryParseRqlCondition(condition, dittoHeaders);
final var resourceKeys = determineResourceKeys(rootNode, dittoHeaders);

if (!policyEnforcer.hasUnrestrictedPermissions(resourceKeys, authorizationContext, Permission.READ)) {
if (!enforcer.hasUnrestrictedPermissions(resourceKeys, authorizationContext, Permission.READ)) {
throw exceptionSupplier.get();
}
}
Expand Down Expand Up @@ -862,17 +857,19 @@ private static ResourceKey tryGetResourceKey(final String fieldName, final Ditto
}
}

private static boolean enforceMergeThingCommand(final Enforcer policyEnforcer,
final MergeThing command, final ResourceKey thingResourceKey,
private static boolean enforceMergeThingCommand(final Enforcer enforcer,
final MergeThing command,
final ResourceKey thingResourceKey,
final AuthorizationContext authorizationContext) {
if (policyEnforcer.hasUnrestrictedPermissions(thingResourceKey, authorizationContext, Permission.WRITE)) {

if (enforcer.hasUnrestrictedPermissions(thingResourceKey, authorizationContext, Permission.WRITE)) {
// unrestricted permissions at thingResourceKey level
return true;
} else if (policyEnforcer.hasPartialPermissions(thingResourceKey, authorizationContext, Permission.WRITE)) {
} else if (enforcer.hasPartialPermissions(thingResourceKey, authorizationContext, Permission.WRITE)) {
// in case of partial permissions at thingResourceKey level check all leaves of merge patch for
// unrestricted permissions
final Set<ResourceKey> resourceKeys = calculateLeaves(command.getPath(), command.getValue());
return policyEnforcer.hasUnrestrictedPermissions(resourceKeys, authorizationContext, Permission.WRITE);
return enforcer.hasUnrestrictedPermissions(resourceKeys, authorizationContext, Permission.WRITE);
} else {
// not even partial permission
return false;
Expand Down Expand Up @@ -983,7 +980,8 @@ private CompletionStage<CreateThing> createPolicyAndReturnCreateThing(final Crea
return preEnforcer.apply(createPolicy)
.thenCompose(msg -> Patterns.ask(policiesShardRegion, msg,
enforcementConfig.getAskWithRetryConfig().getAskTimeout()
.multipliedBy(5L)) // don't retry creating policy (not idempotent!) - but increase default timeout for doing so
.multipliedBy(
5L)) // don't retry creating policy (not idempotent!) - but increase default timeout for doing so
.thenApply(policyResponse -> {
handlePolicyResponseForCreateThing(createPolicy, createThing, policyResponse);
return createThing;
Expand Down

0 comments on commit 409d7f0

Please sign in to comment.