/
EnforcerActorCreator.java
111 lines (96 loc) · 4.88 KB
/
EnforcerActorCreator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/*
* Copyright (c) 2017-2018 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.concierge.enforcement;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.controlflow.GraphActor;
import org.eclipse.ditto.services.utils.akka.controlflow.Pipe;
import org.eclipse.ditto.services.utils.akka.controlflow.WithSender;
import org.eclipse.ditto.services.utils.akka.controlflow.components.ActivityChecker;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.stream.javadsl.Flow;
/**
* Creator for Actors that enforce authorization.
*/
public final class EnforcerActorCreator {
private EnforcerActorCreator() {
throw new AssertionError();
}
/**
* Creates Akka configuration object Props for this EnforcerActor. Caution: The actor does not terminate itself
* after a period of inactivity.
*
* @param pubSubMediator Akka pub sub mediator.
* @param enforcementProviders a set of {@link EnforcementProvider}s.
* @param askTimeout the ask timeout duration: the duration to wait for entity shard regions.
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final Set<EnforcementProvider<?>> enforcementProviders,
final Duration askTimeout) {
return props(pubSubMediator, enforcementProviders, askTimeout, null, null);
}
/**
* Creates Akka configuration object Props for this EnforcerActor.
*
* @param pubSubMediator Akka pub sub mediator.
* @param enforcementProviders a set of {@link EnforcementProvider}s.
* @param askTimeout the ask timeout duration: the duration to wait for entity shard regions.
* @param preEnforcer a function executed before actual enforcement, may be {@code null}.
* @param activityCheckInterval how often to check for actor activity for termination after an idle period.
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final Set<EnforcementProvider<?>> enforcementProviders,
final Duration askTimeout,
@Nullable final Function<WithDittoHeaders, CompletionStage<WithDittoHeaders>> preEnforcer,
@Nullable final Duration activityCheckInterval) {
return props(pubSubMediator, enforcementProviders, askTimeout, preEnforcer, activityCheckInterval, null);
}
/**
* Creates Akka configuration object Props for this EnforcerActor.
*
* @param pubSubMediator Akka pub sub mediator.
* @param enforcementProviders a set of {@link EnforcementProvider}s.
* @param askTimeout the ask timeout duration: the duration to wait for entity shard regions.
* @param preEnforcer a function executed before actual enforcement, may be {@code null}.
* @param activityCheckInterval how often to check for actor activity for termination after an idle period.
* @param conciergeForwarderActor the config reader.
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final Set<EnforcementProvider<?>> enforcementProviders,
final Duration askTimeout,
@Nullable final Function<WithDittoHeaders, CompletionStage<WithDittoHeaders>> preEnforcer,
@Nullable final Duration activityCheckInterval,
final ActorRef conciergeForwarderActor) {
final Function<WithDittoHeaders, CompletionStage<WithDittoHeaders>> preEnforcerFunction =
preEnforcer != null ? preEnforcer : CompletableFuture::completedFuture;
return GraphActor.partial((actorContext, log) -> {
final AbstractEnforcement.Context enforcementContext =
new AbstractEnforcement.Context(pubSubMediator, askTimeout, conciergeForwarderActor)
.with(actorContext, log);
return Flow.<WithSender>create()
.via(ActivityChecker.ofNullable(activityCheckInterval, actorContext.self()))
.via(PreEnforcer.fromFunction(actorContext.self(), preEnforcerFunction))
.via(Pipe.joinFlows(enforcementProviders.stream()
.map(provider -> provider.toGraph(enforcementContext))
.collect(Collectors.toList())));
});
}
}