Skip to content
Permalink
Browse files

fixed blocking the thread in ThingCommandEnforcement

* don't instantiate ConciergeForwarderActor twice

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Apr 18, 2019
1 parent 8ee6b09 commit 15423e6e098a67760fdf61183aa428bb82c88627
@@ -22,10 +22,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;

@@ -78,7 +76,6 @@
import org.eclipse.ditto.services.utils.cache.entry.Entry;
import org.eclipse.ditto.signals.commands.base.CommandToExceptionRegistry;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayServiceTimeoutException;
import org.eclipse.ditto.signals.commands.policies.PolicyCommand;
import org.eclipse.ditto.signals.commands.policies.PolicyErrorResponse;
import org.eclipse.ditto.signals.commands.policies.exceptions.PolicyConflictException;
@@ -197,8 +194,28 @@ private void enforceThingCommandByNonexistentEnforcer(final Entry<EntityId> enfo
replyToSender(error);
} else {
// Without prior enforcer in cache, enforce CreateThing by self.
enforceCreateThingBySelf().ifPresent(pair ->
handleInitialCreateThing(pair.createThing, pair.enforcer));
enforceCreateThingBySelf().whenCompleteAsync((pair, throwable) -> {
if (throwable != null) {
Throwable cause = throwable;
if (throwable instanceof CompletionException) {
cause = throwable.getCause();
}

if (cause instanceof DittoRuntimeException) {
LOGGER.debug("DittoRuntimeException during enforceThingCommandByNonexistentEnforcer - {}: {}",
cause.getClass().getSimpleName(), cause.getMessage());
replyToSender(cause);
} else {
LOGGER.warn("Error during thing by itself enforcement - {}: {}",
cause.getClass().getSimpleName(), cause.getMessage());
replyToSender(GatewayInternalErrorException.newBuilder()
.cause(cause)
.build());
}
} else if (pair != null) {
handleInitialCreateThing(pair.createThing, pair.enforcer);
}
}, getEnforcementExecutor());
}
}

@@ -671,21 +688,24 @@ private static DittoRuntimeException errorForThingCommand(final ThingCommand thi
*
* @return optionally the authorized command extended by read subjects.
*/
private Optional<CreateThingWithEnforcer> enforceCreateThingBySelf() {
private CompletionStage<CreateThingWithEnforcer> enforceCreateThingBySelf() {

final ThingCommand thingCommand = transformModifyThingToCreateThing(signal());
final Optional<CreateThingWithEnforcer> result;
if (thingCommand instanceof CreateThing) {
final CreateThing createThing = replaceInitialPolicyWithCopiedPolicyIfPresent((CreateThing) thingCommand);
final Optional<JsonObject> initialPolicyOptional = createThing.getInitialPolicy();
if (initialPolicyOptional.isPresent()) {
result = enforceCreateThingByOwnInlinedPolicy(createThing, initialPolicyOptional.get());
} else {
final Optional<AccessControlList> aclOptional =
createThing.getThing().getAccessControlList().filter(acl -> !acl.isEmpty());
result = aclOptional.map(aclEntries -> enforceCreateThingByOwnAcl(createThing, aclEntries))
.orElseGet(() -> enforceCreateThingByAuthorizationContext(createThing));
}
final CompletionStage<CreateThing> createThingFuture = replaceInitialPolicyWithCopiedPolicyIfPresent((CreateThing) thingCommand);
return createThingFuture.thenApply(createThing -> {
final Optional<JsonObject> initialPolicyOptional = createThing.getInitialPolicy();
if (initialPolicyOptional.isPresent()) {
return enforceCreateThingByOwnInlinedPolicy(createThing, initialPolicyOptional.get())
.orElse(null);
} else {
final Optional<AccessControlList> aclOptional =
createThing.getThing().getAccessControlList().filter(acl -> !acl.isEmpty());
return aclOptional.map(aclEntries -> enforceCreateThingByOwnAcl(createThing, aclEntries))
.orElseGet(() -> enforceCreateThingByAuthorizationContext(createThing))
.orElse(null);
}
});
} else {
// Other commands cannot be authorized by ACL or policy contained in self.
final DittoRuntimeException error =
@@ -695,66 +715,43 @@ private static DittoRuntimeException errorForThingCommand(final ThingCommand thi
log(thingCommand).info("Enforcer was not existing for Thing <{}> and no auth info was inlined, " +
"responding with: {}", thingCommand.getThingId(), error);
replyToSender(error);
result = Optional.empty();
return CompletableFuture.completedFuture(null);
}
return result;
}

private CreateThing replaceInitialPolicyWithCopiedPolicyIfPresent(final CreateThing createThing) {
private CompletionStage<CreateThing> replaceInitialPolicyWithCopiedPolicyIfPresent(final CreateThing createThing) {

final JsonObject initialPolicyOrCopiedPolicy = getInitialPolicyOrCopiedPolicy(createThing).orElse(null);
return CreateThing.of(createThing.getThing(), initialPolicyOrCopiedPolicy, createThing.getDittoHeaders());
return getInitialPolicyOrCopiedPolicy(createThing).thenApply(initialPolicyOrCopiedPolicy ->
CreateThing.of(createThing.getThing(), initialPolicyOrCopiedPolicy, createThing.getDittoHeaders())
);
}

private Optional<JsonObject> getInitialPolicyOrCopiedPolicy(final CreateThing createThing) {
private CompletionStage<JsonObject> getInitialPolicyOrCopiedPolicy(final CreateThing createThing) {

final Optional<String> policyId = createThing.getPolicyIdOrPlaceholder()
return createThing.getPolicyIdOrPlaceholder()
.flatMap(ReferencePlaceholder::fromCharSequence)
.map(referencePlaceholder -> {
log(createThing).debug(
"CreateThing command contains a reference placeholder for the policy it wants to copy: {}",
referencePlaceholder);
return policyIdReferencePlaceholderResolver.resolve(referencePlaceholder, dittoHeaders());
})
.map(policyIdCompletionStage -> awaitPolicyIdCompletionStage(policyIdCompletionStage, createThing))
.map(Optional::of)
.orElse(createThing.getPolicyIdOrPlaceholder());

if (policyId.isPresent()) {
log().debug("CreateThing command wants to use a copy of Policy <{}>", policyId.get());
final Policy policy = retrievePolicyWithEnforcement(policyId.get());
final JsonObject jsonPolicyWithoutId = policy.toJson(JsonSchemaVersion.V_2).remove("policyId");
return Optional.of(jsonPolicyWithoutId);
}

log().debug("CreateThing command did not contain a policy that should be copied.");
return createThing.getInitialPolicy();
}

private String awaitPolicyIdCompletionStage(final CompletionStage<String> policyIdCompletionStage,
final CreateThing createThing) {

try {
return policyIdCompletionStage.toCompletableFuture().get(getAskTimeout().toMillis(), TimeUnit.MILLISECONDS);
} catch (final InterruptedException | TimeoutException e) {
log(createThing).error(e, "An error occurred when trying to resolve policy id.");
throw GatewayServiceTimeoutException.newBuilder().dittoHeaders(createThing.getDittoHeaders()).build();
} catch (ExecutionException e) {
if (e.getCause() instanceof DittoRuntimeException) {
throw (DittoRuntimeException) e.getCause();
} else {
throw GatewayInternalErrorException.newBuilder()
.dittoHeaders(createThing.getDittoHeaders())
.cause(e.getCause())
.build();
}
}
.orElseGet(() -> CompletableFuture.completedFuture(createThing.getPolicyIdOrPlaceholder().orElse(null)))
.thenCompose(policyId -> {
if (policyId != null) {
log().debug("CreateThing command wants to use a copy of Policy <{}>", policyId);
return retrievePolicyWithEnforcement(policyId)
.thenApply(policy -> policy.toJson(JsonSchemaVersion.V_2).remove("policyId"));
} else {
log().debug("CreateThing command did not contain a policy that should be copied.");
return CompletableFuture.completedFuture(createThing.getInitialPolicy().orElse(null));
}
});
}

private Policy retrievePolicyWithEnforcement(final String policyId) {
private CompletionStage<Policy> retrievePolicyWithEnforcement(final String policyId) {

final CompletionStage<Policy> policyCompletionStage =
Patterns.ask(conciergeForwarder(), RetrievePolicy.of(policyId, dittoHeaders()), getAskTimeout())
return Patterns.ask(conciergeForwarder(), RetrievePolicy.of(policyId, dittoHeaders()), getAskTimeout())
.thenApplyAsync(response -> {
if (response instanceof RetrievePolicyResponse) {
return ((RetrievePolicyResponse) response).getPolicy();
@@ -770,23 +767,6 @@ private Policy retrievePolicyWithEnforcement(final String policyId) {
}
}, getEnforcementExecutor());

return awaitPolicyCompletionStage(policyCompletionStage);

}

private Policy awaitPolicyCompletionStage(final CompletionStage<Policy> policyCompletionStage) {
try {
return policyCompletionStage.toCompletableFuture().get(getAskTimeout().toMillis(), TimeUnit.MILLISECONDS);
} catch (final InterruptedException | TimeoutException e) {
log().error(e, "An error occurred when trying to retrieve policy.");
throw GatewayServiceTimeoutException.newBuilder().cause(e).build();
} catch (ExecutionException e) {
if (e.getCause() instanceof DittoRuntimeException) {
throw (DittoRuntimeException) e.getCause();
} else {
throw GatewayInternalErrorException.newBuilder().cause(e.getCause()).build();
}
}
}

private Optional<CreateThingWithEnforcer> enforceCreateThingByAuthorizationContext(final CreateThing createThing) {
@@ -36,7 +36,7 @@
import org.slf4j.LoggerFactory;

import akka.actor.ActorRef;
import akka.pattern.PatternsCS;
import akka.pattern.Patterns;

/**
* Responsible for resolving a policy id of a referenced entity.
@@ -67,8 +67,8 @@ private void initializeSupportedEntityTypeReferences() {
}

/**
* Resolves the policy id of the entity of type {@link ReferencePlaceholder#referencedEntityType} with id
* {@link ReferencePlaceholder#referencedEntityId}.
* Resolves the policy id of the entity of type {@link ReferencePlaceholder#getReferencedEntityType()} with id
* {@link ReferencePlaceholder#getReferencedEntityId()}.
*
* @param referencePlaceholder The placeholder holding the information about the referenced entity id.
* @param dittoHeaders The ditto headers.
@@ -102,7 +102,7 @@ private void initializeSupportedEntityTypeReferences() {
.withSelectedFields(referencePlaceholder.getReferencedField().toFieldSelector())
.build();

return PatternsCS.ask(conciergeForwarderActor, retrieveThingCommand, retrieveEntityTimeoutDuration)
return Patterns.ask(conciergeForwarderActor, retrieveThingCommand, retrieveEntityTimeoutDuration)
.thenApply(response -> this.handleRetrieveThingResponse(response, referencePlaceholder, dittoHeaders));
}

@@ -133,9 +133,9 @@ private String handleRetrieveThingResponse(final Object response,
// ignore warning that second argument isn't used. Runtime exceptions will have their stacktrace printed
// in the logs according to https://www.slf4j.org/faq.html#paramException
LogUtil.logWithCorrelationId(LOGGER, dittoHeaders, log -> log.info(
"Got Exception when waiting on RetrieveThingResponse when resolving policy id placeholder reference <{}>",
"Got Exception when waiting on RetrieveThingResponse when resolving policy id placeholder reference <{}> - {}: {}",
referencePlaceholder,
response));
response.getClass().getSimpleName(), ((DittoRuntimeException) response).getMessage()));
throw (DittoRuntimeException) response;
} else {
LogUtil.logWithCorrelationId(LOGGER, dittoHeaders, log -> log.error(
@@ -114,7 +114,7 @@ public int hashCode() {
@Override
public String toString() {
return getClass().getSimpleName() + " [" +
", referencedEntityType=" + referencedEntityType +
"referencedEntityType=" + referencedEntityType +
", referencedEntityId=" + referencedEntityId +
", referencedField=" + referencedField +
"]";
@@ -134,14 +134,11 @@

final ActorContext context = getContext();

final ActorRef enforcerActor =
authorizationProxyPropsFactory.startEnforcerActor(context, configReader, pubSubMediator);

final ActorRef conciergeForwarder = startChildActor(context, ConciergeForwarderActor.ACTOR_NAME,
ConciergeForwarderActor.props(pubSubMediator, enforcerActor));

authorizationProxyPropsFactory.startEnforcerActor(context, configReader, pubSubMediator);
pubSubMediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
pubSubMediator.tell(new DistributedPubSubMediator.Put(conciergeForwarder), getSelf());

final ActorRef conciergeForwarder = getContext().findChild(ConciergeForwarderActor.ACTOR_NAME).orElseThrow(() ->
new IllegalStateException("ConciergeForwarder could not be found"));

startClusterSingletonActor(context, BatchSupervisorActor.ACTOR_NAME,
BatchSupervisorActor.props(pubSubMediator, conciergeForwarder));
@@ -61,7 +61,7 @@ public static ActorRef createConciergeEnforcerClusterRouter(final ActorContext c
new ClusterRouterGroup(
new ConsistentHashingGroup(routeesPaths),
new ClusterRouterGroupSettings(numberOfRoutees, routeesPaths,
false, clusterRoles))
true, clusterRoles))
.props(),
CONCIERGE_ENFORCER_ROUTER_ACTORNAME);
}
@@ -16,7 +16,6 @@

import java.util.function.Function;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.services.models.concierge.ConciergeWrapper;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
@@ -90,7 +89,7 @@ public ConciergeForwarderActor create() {
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(Signal.class, signal -> forward(signal, getSender()))
.match(Signal.class, signal -> forward(signal, getContext()))
.match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck ->
log.debug("Successfully subscribed to distributed pub/sub on topic '{}'",
subscribeAck.subscribe().topic())
@@ -104,9 +103,9 @@ public Receive createReceive() {
* the {@code conciergeShardRegion}.
*
* @param signal the Signal to forward
* @param sender the ActorRef to use as sender
* @param ctx the ActorRef to use as sender
*/
private void forward(final Signal<?> signal, final ActorRef sender) {
private void forward(final Signal<?> signal, final ActorContext ctx) {

final Signal<?> transformedSignal = signalTransformer.apply(signal);

@@ -118,24 +117,13 @@ private void forward(final Signal<?> signal, final ActorRef sender) {
log.debug("Sending signal without ID and type <{}> to concierge-dispatcherActor: <{}>", signalType,
transformedSignal);
final DistributedPubSubMediator.Send msg = wrapForPubSub(transformedSignal);
log.debug("Sending message to concierge-dispatcherActor: <{}>.", msg);
pubSubMediator.tell(msg, sender);
log.debug("Forwarding message to concierge-dispatcherActor via pub/sub: <{}>.", msg);
pubSubMediator.forward(msg, ctx);
} else {
log.info("Sending signal with ID <{}> and type <{}> to concierge-shard-region",
signalId, signalType);
log.debug("Sending signal with ID <{}> and type <{}> to concierge-shard-region: <{}>",
signalId, signalType, transformedSignal);
final Object msg;
try {
msg = ConciergeWrapper.wrapForEnforcer(transformedSignal);
} catch (final DittoRuntimeException e) {
log.warning("Got DittoRuntimeException when wrapping signal for enforcer: {}: <{}>",
e.getClass().getSimpleName(), e.getMessage());
sender.tell(e, getSelf());
return;
}
log.debug("Sending message to concierge-shard-region: <{}>", msg);
conciergeEnforcer.tell(msg, sender);
log.info("Forwarding signal with ID <{}> and type <{}> to concierge enforcer", signalId, signalType);
final Object msg = ConciergeWrapper.wrapForEnforcer(transformedSignal);
log.debug("Forwarding message to concierge enforcer: <{}>", msg);
conciergeEnforcer.forward(msg, ctx);
}
}

0 comments on commit 15423e6

Please sign in to comment.
You can’t perform that action at this time.