-
Notifications
You must be signed in to change notification settings - Fork 214
/
DefaultPolicyEnforcerProvider.java
151 lines (131 loc) · 7.28 KB
/
DefaultPolicyEnforcerProvider.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
import org.eclipse.ditto.internal.utils.cache.config.DefaultCacheConfig;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.internal.utils.cluster.config.DefaultClusterConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.policies.model.PolicyId;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.pubsub.DistributedPubSub;
import akka.dispatch.MessageDispatcher;
/**
* Loads the {@link org.eclipse.ditto.policies.model.Policy} from the policies shard region and wraps it into a {@link PolicyEnforcer}.
*/
public final class DefaultPolicyEnforcerProvider implements PolicyEnforcerProvider {
private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(DefaultPolicyEnforcerProvider.class);
private final AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader;
private final MessageDispatcher enforcementCacheDispatcher;
DefaultPolicyEnforcerProvider(final ActorSystem actorSystem) {
this(policyEnforcerCacheLoader(actorSystem), enforcementCacheDispatcher(actorSystem));
}
DefaultPolicyEnforcerProvider(
final AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader,
final MessageDispatcher enforcementCacheDispatcher) {
this.policyEnforcerCacheLoader = policyEnforcerCacheLoader;
this.enforcementCacheDispatcher = enforcementCacheDispatcher;
}
/**
* Creates a new instance of this policy enforcer provider based on the configuration in the actor system
*
* @param actorSystem used to initialize all dependencies of the policy enforcer provider
* @return the new instance.
*/
public static PolicyEnforcerProvider getInstance(final ActorSystem actorSystem) {
final DefaultPolicyEnforcerProvider shardRegionPolicyEnforcerProvider =
new DefaultPolicyEnforcerProvider(actorSystem);
final boolean withCaching = actorSystem.settings().config().getBoolean("ditto.policies-enforcer-cache.enabled");
if (withCaching) {
return shardRegionPolicyEnforcerProvider.withCaching(actorSystem);
} else {
return shardRegionPolicyEnforcerProvider;
}
}
private static AskWithRetryConfig askWithRetryConfig(final ActorSystem actorSystem) {
final DefaultScopedConfig dittoScoped = DefaultScopedConfig.dittoScoped(actorSystem.settings().config());
return DefaultEnforcementConfig.of(dittoScoped).getAskWithRetryConfig();
}
private static ActorRef policiesShardRegion(final ActorSystem actorSystem) {
final var dittoScopedConfig = DefaultScopedConfig.dittoScoped(actorSystem.settings().config());
final ClusterConfig clusterConfig = DefaultClusterConfig.of(dittoScopedConfig);
final ShardRegionProxyActorFactory shardRegionProxyActorFactory =
ShardRegionProxyActorFactory.newInstance(actorSystem, clusterConfig);
return shardRegionProxyActorFactory.getShardRegionProxyActor(
PoliciesMessagingConstants.CLUSTER_ROLE,
PoliciesMessagingConstants.SHARD_REGION
);
}
private static AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader(
final ActorSystem actorSystem) {
final AskWithRetryConfig askWithRetryConfig = askWithRetryConfig(actorSystem);
final ActorRef policiesShardRegion = policiesShardRegion(actorSystem);
return new PolicyEnforcerCacheLoader(askWithRetryConfig, actorSystem.getScheduler(), policiesShardRegion);
}
private static MessageDispatcher enforcementCacheDispatcher(final ActorSystem actorSystem) {
return actorSystem.dispatchers().lookup(PolicyEnforcerCacheLoader.ENFORCEMENT_CACHE_DISPATCHER);
}
/**
* Loads the {@link org.eclipse.ditto.policies.model.Policy} from the policies shard region and wraps it into a {@link PolicyEnforcer}.
*
* @param policyId the ID of the policy that should be loaded.
* @return A completion stage completing with an Optional holding the PolicyEnforcer in case it could be retrieved or an empty optional if not.
*/
@Override
public CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable final PolicyId policyId) {
if (null == policyId) {
return CompletableFuture.completedStage(Optional.empty());
} else {
try {
return policyEnforcerCacheLoader.asyncLoad(EnforcementCacheKey.of(policyId),
enforcementCacheDispatcher)
.thenApply(Entry::get)
.exceptionally(error -> Optional.empty());
} catch (final Exception e) {
LOGGER.warn(
"Got exception when trying to load the policy enforcer via cache loader. This is " +
"unexpected", e
);
return CompletableFuture.completedStage(Optional.empty());
}
}
}
private PolicyEnforcerProvider withCaching(final ActorSystem actorSystem) {
final var dispatchers = actorSystem.dispatchers();
final var cacheConfig = DefaultCacheConfig.of(actorSystem.settings().config(), "ditto.policies-enforcer-cache");
final var cacheDispatcher = dispatchers.lookup(PolicyEnforcerCacheLoader.ENFORCEMENT_CACHE_DISPATCHER);
final var policyEnforcerCache =
CacheFactory.<PolicyId, Entry<PolicyEnforcer>>createCache(cacheConfig, "policy_enforcer_cache",
cacheDispatcher);
final var pubSubMediator = DistributedPubSub.get(actorSystem).mediator();
final var blockedNamespaces = BlockedNamespaces.of(actorSystem);
return new CachingPolicyEnforcerProvider(actorSystem, policyEnforcerCache, this, blockedNamespaces,
pubSubMediator);
}
}