Skip to content

Commit

Permalink
removed sharded EnforcerActor and replace with ClusterRoute with cons…
Browse files Browse the repository at this point in the history
…istent hashing group

* fixed cache invalidation in all clustered concierge nodes (before only the caches on the node where e.g. a policy change command was processed were invalidated correctly)

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Apr 16, 2019
1 parent fdd9cfd commit 54216f3
Show file tree
Hide file tree
Showing 35 changed files with 581 additions and 397 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,15 @@
import java.util.UUID;

import org.assertj.core.api.Assertions;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.things.Attributes;
import org.eclipse.ditto.model.things.Feature;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.services.utils.akka.JavaTestProbe;
import org.eclipse.ditto.services.utils.test.Retry;
import org.eclipse.ditto.signals.base.JsonParsableRegistry;
import org.eclipse.ditto.signals.base.ShardedMessageEnvelope;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.batch.ExecuteBatch;
import org.eclipse.ditto.signals.commands.batch.ExecuteBatchResponse;
import org.eclipse.ditto.signals.commands.batch.exceptions.BatchAlreadyExecutingException;
import org.eclipse.ditto.signals.commands.things.ThingCommandRegistry;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.commands.things.exceptions.FeatureNotModifiableException;
import org.eclipse.ditto.signals.commands.things.modify.ModifyAttributes;
Expand All @@ -58,6 +53,7 @@
import akka.actor.Props;
import akka.event.Logging;
import akka.japi.pf.ReceiveBuilder;
import akka.routing.ConsistentHashingRouter;

/**
* Unit test for {@link BatchCoordinatorActor}.
Expand All @@ -79,9 +75,9 @@ public final class BatchCoordinatorActorTest {
@BeforeClass
public static void setUpActorSystem() {
actorSystem = ActorSystem.create("AkkaTestSystem", CONFIG);
final ActorRef sharedRegionProxy = actorSystem.actorOf(Props.create(SharedRegionProxyMock.class));
final ActorRef enforcerActorMock = actorSystem.actorOf(Props.create(EnforcerActorMock.class));
conciergeForwarder = actorSystem.actorOf(BatchSupervisorActorTest.ConciergeForwarderActorMock.props
(sharedRegionProxy), BatchSupervisorActorTest.ConciergeForwarderActorMock.ACTOR_NAME);
(enforcerActorMock), BatchSupervisorActorTest.ConciergeForwarderActorMock.ACTOR_NAME);
}

/** */
Expand Down Expand Up @@ -309,34 +305,31 @@ private static String randomBatchId() {
return "BatchCoordinatorActorTest-" + UUID.randomUUID().toString();
}

private static final class SharedRegionProxyMock extends AbstractActor {
private static final class EnforcerActorMock extends AbstractActor {

private int modifyAttributesCount = 0;
private final JsonParsableRegistry<? extends Command> commandRegistry = ThingCommandRegistry.newInstance();

SharedRegionProxyMock() {
EnforcerActorMock() {
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ShardedMessageEnvelope.class, this::extractCommandFromEnvelop)
.match(ConsistentHashingRouter.ConsistentHashableEnvelope.class, this::extractCommandFromEnvelop)
.matchAny(this::unhandled)
.build();
}

private void extractCommandFromEnvelop(ShardedMessageEnvelope shardedMessageEnvelope) {
final String type = shardedMessageEnvelope.getType();
final JsonObject jsonCommand = shardedMessageEnvelope.getMessage();
final DittoHeaders dittoHeaders = shardedMessageEnvelope.getDittoHeaders();
private void extractCommandFromEnvelop(ConsistentHashingRouter.ConsistentHashableEnvelope envelope) {
final Object obj = envelope.message();

if (type.equals(ModifyThing.TYPE)) {
final ModifyThing command = (ModifyThing) commandRegistry.parse(jsonCommand, dittoHeaders);
if (obj instanceof ModifyThing) {
final ModifyThing command = (ModifyThing) obj;

getSender().tell(ModifyThingResponse.modified(command.getId(),
command.getDittoHeaders()), getSelf());
} else if (type.equals(ModifyFeature.TYPE)) {
final ModifyFeature command = (ModifyFeature) commandRegistry.parse(jsonCommand, dittoHeaders);
} else if (obj instanceof ModifyFeature) {
final ModifyFeature command = (ModifyFeature) obj;

final String thingId = command.getThingId();
final String featureId = command.getFeatureId();
Expand All @@ -362,8 +355,8 @@ private void extractCommandFromEnvelop(ShardedMessageEnvelope shardedMessageEnve
}
}

} else if (type.equals(ModifyAttributes.TYPE)) {
final ModifyAttributes command = (ModifyAttributes) commandRegistry.parse(jsonCommand, dittoHeaders);
} else if (obj instanceof ModifyAttributes) {
final ModifyAttributes command = (ModifyAttributes) obj;

if (command.getDittoHeaders().isDryRun()) {
respondToModifyAttributes(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
import java.util.Arrays;
import java.util.UUID;

import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.things.Feature;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.services.models.concierge.ConciergeWrapper;
import org.eclipse.ditto.services.utils.akka.JavaTestProbe;
import org.eclipse.ditto.services.utils.test.Retry;
import org.eclipse.ditto.signals.base.JsonParsableRegistry;
import org.eclipse.ditto.signals.base.ShardedMessageEnvelope;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.batch.ExecuteBatch;
Expand All @@ -53,6 +51,7 @@
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;
import akka.routing.ConsistentHashingRouter;

/**
* Unit test for {@link BatchSupervisorActor}.
Expand Down Expand Up @@ -119,7 +118,7 @@ public void batchExecutionWorksAsExpected() {

/** */
@Test
public void batchExecutionResumesAfterRecovery() throws InterruptedException {
public void batchExecutionResumesAfterRecovery() {
new JavaTestProbe(actorSystem) {
{
subscribeToEvents(this, BatchExecutionStarted.TYPE, BatchExecutionFinished.TYPE);
Expand Down Expand Up @@ -220,24 +219,22 @@ private static final class SharedRegionProxyMock extends AbstractActor {
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ShardedMessageEnvelope.class, this::extractCommandFromEnvelop)
.match(ConsistentHashingRouter.ConsistentHashableEnvelope.class, this::extractCommandFromEnvelop)
.matchAny(this::unhandled)
.build();
}

private void extractCommandFromEnvelop(ShardedMessageEnvelope shardedMessageEnvelope) {
final String type = shardedMessageEnvelope.getType();
final JsonObject jsonCommand = shardedMessageEnvelope.getMessage();
final DittoHeaders dittoHeaders = shardedMessageEnvelope.getDittoHeaders();
private void extractCommandFromEnvelop(ConsistentHashingRouter.ConsistentHashableEnvelope envelope) {
final Object obj = envelope.message();

if (type.equals(ModifyThing.TYPE)) {
final ModifyThing command = (ModifyThing) commandRegistry.parse(jsonCommand, dittoHeaders);
if (obj instanceof ModifyThing) {
final ModifyThing command = (ModifyThing) obj;

getSender().tell(ModifyThingResponse.modified(command.getId(),
command.getDittoHeaders()), getSelf());
}
else if (type.equals(ModifyFeature.TYPE)) {
final ModifyFeature command = (ModifyFeature) commandRegistry.parse(jsonCommand, dittoHeaders);
else if (obj instanceof ModifyFeature) {
final ModifyFeature command = (ModifyFeature) obj;

if (command.getDittoHeaders().isDryRun()) {
respondToModifyFeature(command);
Expand All @@ -261,31 +258,31 @@ public static final class ConciergeForwarderActorMock extends AbstractActor {

public static final String ACTOR_NAME = "conciergeForwarder";

private final ActorRef enforcerShardRegion;
private final ActorRef enforcerActor;

private ConciergeForwarderActorMock(final ActorRef enforcerShardRegion) {
this.enforcerShardRegion = enforcerShardRegion;
private ConciergeForwarderActorMock(final ActorRef enforcerActor) {
this.enforcerActor = enforcerActor;
}


/**
* Creates Akka configuration object Props for this actor.
*
* @param enforcerShardRegion the ActorRef of the enforcerShardRegion.
* @param enforcerActor the ActorRef of the enforcerActor.
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef enforcerShardRegion) {
public static Props props(final ActorRef enforcerActor) {

return Props.create(ConciergeForwarderActorMock.class,
() -> new ConciergeForwarderActorMock(enforcerShardRegion));
() -> new ConciergeForwarderActorMock(enforcerActor));
}

@Override
public AbstractActor.Receive createReceive() {
return ReceiveBuilder.create()
.match(Signal.class, signal -> {
final ShardedMessageEnvelope msg = ConciergeWrapper.wrapForEnforcer(signal);
enforcerShardRegion.tell(msg, getSender());
final Object msg = ConciergeWrapper.wrapForEnforcer(signal);
enforcerActor.tell(msg, getSender());
})
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
*/
package org.eclipse.ditto.services.concierge.enforcement;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.enforcers.Enforcer;
import org.eclipse.ditto.services.models.concierge.EntityId;
import org.eclipse.ditto.services.models.concierge.cache.Entry;
import org.eclipse.ditto.services.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.services.utils.cache.CaffeineCache;
Expand All @@ -28,64 +30,98 @@

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

/**
* Extensible actor to execute enforcement behavior.
*/
public abstract class AbstractEnforcerActor extends AbstractGraphActor<Contextual<Object>> {
public abstract class AbstractEnforcerActor extends AbstractGraphActor<Contextual<WithDittoHeaders>> {

/**
* Contextual information about this actor.
*/
protected final Contextual<NotUsed> contextual;
protected final Contextual<WithDittoHeaders> contextual;

@Nullable
private final Cache<EntityId, Entry<EntityId>> thingIdCache;
@Nullable
private final Cache<EntityId, Entry<Enforcer>> aclEnforcerCache;
@Nullable
private final Cache<EntityId, Entry<Enforcer>> policyEnforcerCache;

/**
* Create an instance of this actor.
*
* @param pubSubMediator Akka pub-sub-mediator.
* @param pubSubMediator Akka pub-sub-mediator.
* @param conciergeForwarder the concierge forwarder.
* @param enforcerExecutor executor for enforcement steps.
* @param askTimeout how long to wait for entity actors.
* @param thingIdCache TODO TJ javadoc
* @param aclEnforcerCache
* @param policyEnforcerCache
*/
protected AbstractEnforcerActor(final ActorRef pubSubMediator,
final ActorRef conciergeForwarder,
final Executor enforcerExecutor,
final Duration askTimeout) {
final Duration askTimeout,
@Nullable final Cache<EntityId, Entry<EntityId>> thingIdCache,
@Nullable final Cache<EntityId, Entry<Enforcer>> aclEnforcerCache,
@Nullable final Cache<EntityId, Entry<Enforcer>> policyEnforcerCache) {

contextual = new Contextual<>(NotUsed.getInstance(), getSelf(), getContext().getSystem().deadLetters(),
pubSubMediator, conciergeForwarder, enforcerExecutor, askTimeout, log,
decodeEntityId(getSelf()),
this.thingIdCache = thingIdCache;
this.aclEnforcerCache = aclEnforcerCache;
this.policyEnforcerCache = policyEnforcerCache;

contextual = new Contextual<>(null, getSelf(), getContext().getSystem().deadLetters(),
pubSubMediator, conciergeForwarder, enforcerExecutor, askTimeout, log, null,
createResponseReceiversCache());

// register for sending messages via pub/sub to this enforcer
// used for receiving cache invalidations from brother concierge nodes
pubSubMediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
}

@Override
protected abstract Flow<Contextual<Object>, Contextual<Object>, NotUsed> getHandler();
protected void preEnhancement(final ReceiveBuilder receiveBuilder) {
receiveBuilder.match(InvalidateCacheEntry.class, invalidateCacheEntry -> {
log.debug("received <{}>", invalidateCacheEntry);
final EntityId entityId = invalidateCacheEntry.getEntityId();
invalidateCaches(entityId);
});
}

private void invalidateCaches(final EntityId entityId) {
if (thingIdCache != null) {
final boolean invalidated = thingIdCache.invalidate(entityId);
log.debug("thingId cache for entity id <{}> was invalidated: {}", entityId, invalidated);
}
if (aclEnforcerCache != null) {
final boolean invalidated = aclEnforcerCache.invalidate(entityId);
log.debug("acl enforcer cache for entity id <{}> was invalidated: {}", entityId, invalidated);
}
if (policyEnforcerCache != null) {
final boolean invalidated = policyEnforcerCache.invalidate(entityId);
log.debug("policy enforcer cache for entity id <{}> was invalidated: {}", entityId, invalidated);
}
}

@Override
protected abstract Flow<Contextual<WithDittoHeaders>, Contextual<WithDittoHeaders>, NotUsed> getHandler();

@Override
@SuppressWarnings("unchecked")
protected Class<Contextual<Object>> getMessageClass() {
protected Class<Contextual<WithDittoHeaders>> getMessageClass() {
// trick Java type system into accepting the cast without it understanding the covariance of Contextual<>
return (Class<Contextual<Object>>) (Object) Contextual.class;
return (Class<Contextual<WithDittoHeaders>>) (Object) Contextual.class;
}

@Override
protected Source<Contextual<Object>, NotUsed> mapMessage(final Object message) {
protected Source<Contextual<WithDittoHeaders>, NotUsed> mapMessage(final WithDittoHeaders<?> message) {
return Source.single(contextual.withReceivedMessage(message, getSender()));
}

private static EntityId decodeEntityId(final ActorRef self) {
final String name = self.path().name();
try {
final String typeWithPath = URLDecoder.decode(name, StandardCharsets.UTF_8.name());
return EntityId.readFrom(typeWithPath);
} catch (final UnsupportedEncodingException e) {
throw new IllegalStateException("Unsupported encoding", e);
}
}

private static Cache<String, ActorRef> createResponseReceiversCache() {
return CaffeineCache.of(Caffeine.newBuilder().expireAfterWrite(120, TimeUnit.SECONDS));
}
Expand Down
Loading

0 comments on commit 54216f3

Please sign in to comment.