Skip to content

Commit

Permalink
adapt enforcement of live events and responses;
Browse files Browse the repository at this point in the history
live responses are filtered based on the policy of the thing;
extend tests in LiveSignalEnforcementTest;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Sep 28, 2021
1 parent f148112 commit c880317
Show file tree
Hide file tree
Showing 15 changed files with 271 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ enum EnforcementConfigValue implements KnownConfigValue {
/**
* Whether to enable dispatching live responses from channels other than the subscribers.
*/
GLOBAL_LIVE_RESPONSE_DISPATCHING("global-live-response-dispatching", false);
GLOBAL_LIVE_RESPONSE_DISPATCHING("global-live-response-dispatching", true);

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.exceptions.EventSendNotAllowedException;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;

import akka.actor.ActorRef;
Expand Down Expand Up @@ -149,7 +150,7 @@ private CompletionStage<Contextual<WithDittoHeaders>> doEnforce(final SignalWith
final SendClaimMessage<?> sendClaimMessage = (SendClaimMessage<?>) liveSignal;
return publishMessageCommand(sendClaimMessage, enforcer);
} else if (liveSignal instanceof CommandResponse) {
return enforceLiveCommandResponse(liveSignal, correlationIdOpt.get());
return enforceLiveCommandResponse((CommandResponse<?>) liveSignal, correlationIdOpt.get(), enforcer);
} else {
final Optional<StreamingType> streamingType = StreamingType.fromSignal(liveSignal);
if (streamingType.isPresent()) {
Expand All @@ -163,50 +164,63 @@ private CompletionStage<Contextual<WithDittoHeaders>> doEnforce(final SignalWith
}
} else {
// drop live command to nonexistent things and respond with error.
log(liveSignal).info(
"Command of type <{}> with ID <{}> could not be dispatched as no enforcer could be" +
" looked up! Answering with ThingNotAccessibleException.", liveSignal.getType(),
log(liveSignal).info("Command of type <{}> with ID <{}> could not be dispatched as no enforcer " +
"could be looked up! Answering with ThingNotAccessibleException.", liveSignal.getType(),
liveSignal.getEntityId());
throw ThingNotAccessibleException.newBuilder(ThingId.of(entityId().getId()))
.dittoHeaders(liveSignal.getDittoHeaders())
.build();
}
}

private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommandResponse(final Signal<?> liveSignal,
final String correlationId) {
private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommandResponse(
final CommandResponse<?> liveResponse, final String correlationId, final Enforcer enforcer) {

final Optional<Cache<String, ActorRef>> responseReceiversOptional = context.getResponseReceivers();
if (responseReceiversOptional.isPresent()) {
final Cache<String, ActorRef> responseReceivers = responseReceiversOptional.get();
return responseReceivers.get(correlationId).thenApply(responseReceiverEntry -> {
if (responseReceiverEntry.isPresent()) {
responseReceivers.invalidate(correlationId);
final ActorRef responseReceiver = responseReceiverEntry.get();
log().debug("Scheduling CommandResponse <{}> to original sender <{}>", liveSignal,
responseReceiver);
return withMessageToReceiver(liveSignal, responseReceiverEntry.get());
} else {
if (log().isDebugEnabled()) {
log().debug("Got <{}> with unknown correlation ID: <{}>", liveSignal.getType(), liveSignal);
} else {
log().info("Got <{}> with unknown correlation ID: <{}>", liveSignal.getType(), correlationId);
}
return withMessageToReceiver(null, null);
}
});
return returnWithMessageToReceiver(responseReceivers, liveResponse, correlationId, enforcer);
} else {
if (log().isDebugEnabled()) {
log().debug("Got live response when global dispatching is inactive: <{}>", liveSignal);
log().debug("Got live response when global dispatching is inactive: <{}>", liveResponse);
} else {
log().info("Got live response when global dispatching is inactive: <{}> with correlation ID <{}>",
liveSignal.getType(),
liveSignal.getDittoHeaders().getCorrelationId().orElse(""));
liveResponse.getType(),
liveResponse.getDittoHeaders().getCorrelationId().orElse(""));
}
return CompletableFuture.completedFuture(withMessageToReceiver(null, null));
}
}

private CompletionStage<Contextual<WithDittoHeaders>> returnWithMessageToReceiver(
final Cache<String, ActorRef> responseReceivers, final CommandResponse<?> liveResponse,
final String correlationId, final Enforcer enforcer) {
return responseReceivers.get(correlationId).thenApply(responseReceiverEntry -> {
if (responseReceiverEntry.isPresent()) {
responseReceivers.invalidate(correlationId);
final ActorRef responseReceiver = responseReceiverEntry.get();
final CommandResponse<?> response;
if (liveResponse instanceof ThingQueryCommandResponse) {
response = ThingCommandEnforcement.buildJsonViewForThingQueryCommandResponse(
(ThingQueryCommandResponse<?>) liveResponse, enforcer);
} else {
response = liveResponse;
}

log().debug("Scheduling CommandResponse <{}> to original sender <{}>", liveResponse,
responseReceiver);
return withMessageToReceiver(response, responseReceiver);
} else {
if (log().isDebugEnabled()) {
log().debug("Got <{}> with unknown correlation ID: <{}>", liveResponse.getType(), liveResponse);
} else {
log().info("Got <{}> with unknown correlation ID: <{}>", liveResponse.getType(), correlationId);
}
return withMessageToReceiver(null, null);
}
});
}

private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final StreamingType streamingType,
final Signal<?> liveSignal, final Enforcer enforcer) {

Expand Down Expand Up @@ -234,12 +248,9 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final St
private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveEvent(final Signal<?> liveSignal,
final Enforcer enforcer) {

// enforce Live Events
final boolean authorized = enforcer.hasUnrestrictedPermissions(
// only check access to root resource for now
PoliciesResourceType.thingResource("/"),
liveSignal.getDittoHeaders().getAuthorizationContext(),
WRITE);
PoliciesResourceType.thingResource(liveSignal.getResourcePath()),
liveSignal.getDittoHeaders().getAuthorizationContext(), WRITE);

if (authorized) {
log(liveSignal).info("Live Event was authorized: <{}>", liveSignal);
Expand Down Expand Up @@ -326,10 +337,8 @@ private CompletionStage<Signal<?>> addToResponseReceiver(final Signal<?> signal)
}

private static boolean isAuthorized(final MessageCommand<?, ?> command, final Enforcer enforcer) {
return enforcer.hasUnrestrictedPermissions(
extractMessageResourceKey(command),
command.getDittoHeaders().getAuthorizationContext(),
WRITE);
return enforcer.hasUnrestrictedPermissions(extractMessageResourceKey(command),
command.getDittoHeaders().getAuthorizationContext(), WRITE);
}

private static ResourceKey extractMessageResourceKey(final MessageCommand<?, ?> command) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private static boolean hasUnrestrictedWritePermission(final Enforcer enforcer, f
*
* @param response the response.
* @param enforcer the enforcer.
* @return response with view on entity restricted by enforcer..
* @return response with view on entity restricted by enforcer.
*/
public static <T extends PolicyQueryCommandResponse<T>> T buildJsonViewForPolicyQueryCommandResponse(
final PolicyQueryCommandResponse<T> response,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public final class ThingCommandEnforcement
/**
* Json fields that are always shown regardless of authorization.
*/
private static final JsonFieldSelector THING_QUERY_COMMAND_RESPONSE_ALLOWLIST =
public static final JsonFieldSelector THING_QUERY_COMMAND_RESPONSE_ALLOWLIST =
JsonFactory.newFieldSelector(Thing.JsonFields.ID);

private final ActorRef thingsShardRegion;
Expand Down Expand Up @@ -496,7 +496,7 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceCreateThingForNonex
* @param enforcer the enforcer.
* @return response with view on entity restricted by enforcer.
*/
private static <T extends ThingQueryCommandResponse<T>> T buildJsonViewForThingQueryCommandResponse(
public static <T extends ThingQueryCommandResponse<T>> T buildJsonViewForThingQueryCommandResponse(
final ThingQueryCommandResponse<T> response, final Enforcer enforcer) {

final JsonValue entity = response.getEntity();
Expand Down

0 comments on commit c880317

Please sign in to comment.