/
DefaultEnforcerActorFactory.java
113 lines (95 loc) · 6.13 KB
/
DefaultEnforcerActorFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*/
package org.eclipse.ditto.services.concierge.starter.proxy;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.enforcers.Enforcer;
import org.eclipse.ditto.services.concierge.cache.AclEnforcerCacheLoader;
import org.eclipse.ditto.services.concierge.cache.CacheFactory;
import org.eclipse.ditto.services.concierge.cache.PolicyEnforcerCacheLoader;
import org.eclipse.ditto.services.concierge.cache.ThingEnforcementIdCacheLoader;
import org.eclipse.ditto.services.concierge.cache.update.PolicyCacheUpdateActor;
import org.eclipse.ditto.services.concierge.enforcement.EnforcementProvider;
import org.eclipse.ditto.services.concierge.enforcement.EnforcerActorCreator;
import org.eclipse.ditto.services.concierge.enforcement.LiveSignalEnforcement;
import org.eclipse.ditto.services.concierge.enforcement.PolicyCommandEnforcement;
import org.eclipse.ditto.services.concierge.enforcement.ThingCommandEnforcement;
import org.eclipse.ditto.services.concierge.enforcement.placeholders.PlaceholderSubstitution;
import org.eclipse.ditto.services.concierge.starter.actors.DispatcherActorCreator;
import org.eclipse.ditto.services.concierge.util.config.ConciergeConfigReader;
import org.eclipse.ditto.services.models.concierge.EntityId;
import org.eclipse.ditto.services.models.concierge.cache.Entry;
import org.eclipse.ditto.services.models.policies.PoliciesMessagingConstants;
import org.eclipse.ditto.services.models.things.ThingsMessagingConstants;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
/**
* Ditto default implementation of {@link AbstractEnforcerActorFactory}.
*/
public final class DefaultEnforcerActorFactory extends AbstractEnforcerActorFactory<ConciergeConfigReader> {
private static final String ENFORCER_CACHE_METRIC_NAME_PREFIX = "ditto_authorization_enforcer_cache_";
private static final String ID_CACHE_METRIC_NAME_PREFIX = "ditto_authorization_id_cache_";
@Override
public ActorRef startEnforcerActor(final ActorContext context, final ConciergeConfigReader configReader,
final ActorRef pubSubMediator) {
final Duration askTimeout = configReader.caches().askTimeout();
final ActorRef policiesShardRegionProxy = startProxy(context.system(), configReader.cluster().numberOfShards(),
PoliciesMessagingConstants.SHARD_REGION, PoliciesMessagingConstants.CLUSTER_ROLE);
final ActorRef thingsShardRegionProxy = startProxy(context.system(), configReader.cluster().numberOfShards(),
ThingsMessagingConstants.SHARD_REGION, ThingsMessagingConstants.CLUSTER_ROLE);
final ThingEnforcementIdCacheLoader thingEnforcerIdCacheLoader =
new ThingEnforcementIdCacheLoader(askTimeout, thingsShardRegionProxy);
final Cache<EntityId, Entry<EntityId>> thingIdCache =
CacheFactory.createCache(thingEnforcerIdCacheLoader, configReader.caches().id(),
ID_CACHE_METRIC_NAME_PREFIX + ThingCommand.RESOURCE_TYPE);
final PolicyEnforcerCacheLoader policyEnforcerCacheLoader =
new PolicyEnforcerCacheLoader(askTimeout, policiesShardRegionProxy);
final Cache<EntityId, Entry<Enforcer>> policyEnforcerCache =
CacheFactory.createCache(policyEnforcerCacheLoader, configReader.caches().enforcer(),
ENFORCER_CACHE_METRIC_NAME_PREFIX + "policy");
final AclEnforcerCacheLoader aclEnforcerCacheLoader =
new AclEnforcerCacheLoader(askTimeout, thingsShardRegionProxy);
final Cache<EntityId, Entry<Enforcer>> aclEnforcerCache =
CacheFactory.createCache(aclEnforcerCacheLoader, configReader.caches().enforcer(),
ENFORCER_CACHE_METRIC_NAME_PREFIX + "acl");
final Set<EnforcementProvider<?>> enforcementProviders = new HashSet<>();
enforcementProviders.add(new ThingCommandEnforcement.Provider(thingsShardRegionProxy,
policiesShardRegionProxy, thingIdCache, policyEnforcerCache, aclEnforcerCache));
enforcementProviders.add(new PolicyCommandEnforcement.Provider(policiesShardRegionProxy, policyEnforcerCache));
enforcementProviders.add(new LiveSignalEnforcement.Provider(thingIdCache, policyEnforcerCache,
aclEnforcerCache));
final Duration enforcementAskTimeout = configReader.enforcement().askTimeout();
// set activity check interval identical to cache retention
final Duration activityCheckInterval = configReader.caches().id().expireAfterWrite();
final Function<WithDittoHeaders, CompletionStage<WithDittoHeaders>> preEnforcer =
PlaceholderSubstitution.newInstance();
final Props enforcerProps =
EnforcerActorCreator.props(pubSubMediator, enforcementProviders, enforcementAskTimeout,
preEnforcer, activityCheckInterval);
final ActorRef enforcerShardRegion = startShardRegion(context.system(), configReader.cluster(), enforcerProps);
// start cache updaters
final int instanceIndex = configReader.instanceIndex();
final Props policyCacheUpdateActorProps =
PolicyCacheUpdateActor.props(policyEnforcerCache, pubSubMediator, instanceIndex);
context.actorOf(policyCacheUpdateActorProps, PolicyCacheUpdateActor.ACTOR_NAME);
context.actorOf(DispatcherActorCreator.props(configReader, pubSubMediator, enforcerShardRegion),
DispatcherActorCreator.ACTOR_NAME);
return enforcerShardRegion;
}
}