-
Notifications
You must be signed in to change notification settings - Fork 215
/
AbstractEnforcerActor.java
250 lines (229 loc) · 12.9 KB
/
AbstractEnforcerActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.api.commands.sudo.SudoCommand;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.internal.utils.akka.actors.AbstractActorWithStashWithTimers;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.instruments.trace.StartedTrace;
import org.eclipse.ditto.policies.enforcement.pre.PreEnforcerProvider;
import org.eclipse.ditto.policies.model.PolicyId;
import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;
/**
* Abstract enforcer of commands performing authorization / enforcement of incoming signals.
*
* @param <I> the type of the EntityId this enforcer actor enforces commands for.
* @param <S> the type of the Signals this enforcer actor enforces.
* @param <R> the type of the CommandResponses this enforcer actor filters.
* @param <E> the type of the EnforcementReloaded this enforcer actor uses for doing command enforcements.
*/
public abstract class AbstractEnforcerActor<I extends EntityId, S extends Signal<?>, R extends CommandResponse<?>,
E extends EnforcementReloaded<S, R>>
extends AbstractActorWithStashWithTimers {
/**
* Timeout for local actor invocations - a small timeout should be more than sufficient as those are just method
* calls.
*/
protected static final Duration DEFAULT_LOCAL_ASK_TIMEOUT = Duration.ofSeconds(5);
protected final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
protected final I entityId;
protected final E enforcement;
protected final PreEnforcerProvider preEnforcer;
protected AbstractEnforcerActor(final I entityId, final E enforcement) {
this.entityId = entityId;
this.enforcement = enforcement;
final var system = getContext().getSystem();
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(system.settings().config());
preEnforcer = PreEnforcerProvider.get(system, dittoExtensionsConfig);
}
/**
* Provides the {@link PolicyId} to use for the policy enforcement.
* The implementation chooses the most efficient strategy to retrieve it.
*
* @return a successful CompletionStage of either the loaded {@link PolicyId} of the Policy which should be used
* for enforcement or a failed CompletionStage with the cause for the failure.
*/
protected abstract CompletionStage<PolicyId> providePolicyIdForEnforcement(final Signal<?> signal);
/**
* Provides the {@link PolicyEnforcer} instance (which holds a {@code Policy} + the built {@code Enforcer}) for the
* provided {@code policyId} asynchronously.
* The implementation chooses the most efficient strategy to retrieve it.
*
* @param policyId the {@link PolicyId} to retrieve the PolicyEnforcer for.
* @return a successful CompletionStage of either an optional holding the loaded {@link PolicyEnforcer} or an empty optional if the enforcer could not be loaded.
*/
protected abstract CompletionStage<Optional<PolicyEnforcer>> providePolicyEnforcer(@Nullable PolicyId policyId);
@SuppressWarnings("unchecked")
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(DistributedPubSubMediator.SubscribeAck.class, s -> log.debug("Got subscribeAck <{}>.", s))
.match(SudoCommand.class, sudoCommand -> log.withCorrelationId(sudoCommand)
.error("Received SudoCommand in enforcer which should never happen: <{}>", sudoCommand)
)
.match(CommandResponse.class, r -> filterResponse((R) r))
.match(Signal.class, s -> enforceSignal((S) s))
.matchAny(message ->
log.withCorrelationId(
message instanceof WithDittoHeaders withDittoHeaders ? withDittoHeaders : null)
.warning("Got unknown message: '{}'", message))
.build();
}
protected CompletionStage<Optional<PolicyEnforcer>> loadPolicyEnforcer(Signal<?> signal) {
return providePolicyIdForEnforcement(signal)
.thenCompose(this::providePolicyEnforcer);
}
/**
* Enforces the passed {@code signal} using the {@code enforcement} of this actor.
* Successfully enforced signals are sent back to the {@code getSender()} - which is our dear parent, the Supervisor.
* Our parent is responsible for then forwarding the signal to the actual responsible target.
*
* @param signal the {@code Signal} to enforce based in the {@code policyEnforcer}.
*/
private void enforceSignal(final S signal) {
doEnforceSignal(signal, getSender());
}
@SuppressWarnings("unchecked")
private void doEnforceSignal(final S signal, final ActorRef sender) {
final StartedTrace trace = DittoTracing
.trace(signal, "enforce")
.start();
final S tracedSignal = DittoTracing.propagateContext(trace.getContext(), signal);
final ActorRef self = getSelf();
try {
preEnforcer.apply(tracedSignal)
.thenApply(preEnforcedSignal -> (S) preEnforcedSignal)
.thenCompose(preEnforcedSignal -> {
trace.mark("pre_enforced");
return loadPolicyEnforcer(preEnforcedSignal).thenCompose(optionalPolicyEnforcer -> {
trace.mark("enforcer_loaded");
return optionalPolicyEnforcer
.map(policyEnforcer -> enforcement.authorizeSignal(preEnforcedSignal,
policyEnforcer))
.orElseGet(() -> enforcement.authorizeSignalWithMissingEnforcer(
preEnforcedSignal));
}
);
})
.whenComplete((authorizedSignal, throwable) -> {
if (null != authorizedSignal) {
trace.mark("enforce_success").finish();
log.withCorrelationId(authorizedSignal)
.info("Completed enforcement of message type <{}> with outcome 'success'",
authorizedSignal.getType());
sender.tell(authorizedSignal, self);
} else if (null != throwable) {
trace.mark("enforce_failed").fail(throwable).finish();
handleAuthorizationFailure(tracedSignal, throwable, sender);
} else {
trace.mark("enforce_error").fail("unknown-outcome").finish();
log.withCorrelationId(tracedSignal)
.warning(
"Neither authorizedSignal nor throwable were present during enforcement of signal: " +
"<{}>", tracedSignal);
}
});
} catch (final DittoRuntimeException dittoRuntimeException) {
trace.mark("enforce_failed").fail(dittoRuntimeException).finish();
handleAuthorizationFailure(tracedSignal, dittoRuntimeException, sender);
}
}
private void handleAuthorizationFailure(
final Signal<?> signal,
final Throwable throwable,
final ActorRef sender
) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
DittoInternalErrorException.newBuilder()
.cause(t)
.dittoHeaders(dittoHeaders)
.build()
);
log.withCorrelationId(dittoRuntimeException)
.info("Completed enforcement of message type <{}> with outcome 'failed' and headers: <{}>",
signal.getType(), dittoHeaders);
sender.tell(dittoRuntimeException, getSelf());
}
/**
* Filters the response payload of the passed {@code commandResponse} using the {@code enforcement} of this actor.
* Filtered command responses are sent back to the {@code getSender()} - which is our dear parent, the Supervisor.
* Our parent is responsible for then forwarding the command response to the original sender.
*
* @param commandResponse the {@code CommandResponse} to filter based in the {@code policyEnforcer}.
*/
private void filterResponse(final R commandResponse) {
final ActorRef sender = getSender();
if (enforcement.shouldFilterCommandResponse(commandResponse)) {
loadPolicyEnforcer(commandResponse)
.thenAccept(optionalPolicyEnforcer -> optionalPolicyEnforcer.ifPresentOrElse(
policyEnforcer -> doFilterResponse(commandResponse, policyEnforcer, sender),
() -> log.withCorrelationId(commandResponse)
.error("Could not filter command response because policyEnforcer was missing")));
} else {
sender.tell(commandResponse, getContext().parent());
}
}
private void doFilterResponse(final R commandResponse, final PolicyEnforcer policyEnforcer, final ActorRef sender) {
final ActorRef parent = getContext().parent();
try {
final CompletionStage<R> filteredResponseStage =
enforcement.filterResponse(commandResponse, policyEnforcer);
filteredResponseStage.whenComplete((filteredResponse, throwable) -> {
if (null != filteredResponse) {
log.withCorrelationId(filteredResponse)
.info("Completed filtering of command response type <{}>",
filteredResponse.getType());
sender.tell(filteredResponse, parent);
} else if (null != throwable) {
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
DittoInternalErrorException.newBuilder()
.cause(t)
.dittoHeaders(commandResponse.getDittoHeaders())
.build()
);
log.withCorrelationId(dittoRuntimeException)
.info("Exception during filtering of command response type <{}> and headers: <{}>",
commandResponse.getType(), commandResponse.getDittoHeaders());
sender.tell(dittoRuntimeException, parent);
} else {
log.withCorrelationId(commandResponse)
.error("Neither filteredResponse nor throwable were present during filtering of " +
"commandResponse: <{}>", commandResponse);
}
});
} catch (final DittoRuntimeException dittoRuntimeException) {
log.withCorrelationId(dittoRuntimeException)
.info("Exception during filtering of command response type <{}> and headers: <{}>",
commandResponse.getType(), commandResponse.getDittoHeaders());
sender.tell(dittoRuntimeException, parent);
}
}
}