-
Notifications
You must be signed in to change notification settings - Fork 215
/
AbstractEnforcement.java
331 lines (291 loc) · 13.3 KB
/
AbstractEnforcement.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithType;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.policies.api.Permission;
import org.eclipse.ditto.policies.model.ResourceKey;
import org.eclipse.ditto.policies.model.enforcers.Enforcer;
import org.eclipse.ditto.things.model.ThingConstants;
import akka.actor.ActorRef;
import akka.pattern.AskTimeoutException;
/**
* Contains self-type requirements for aspects of enforcer actor dealing with specific commands.
* Implementations only need to implement {@link #enforce()} in which they
* check if the passed in {@code signal} is authorized and forward it accordingly or respond with an error to the passed
* in {@code sender}.
*
* TODO CR-11297 candidate for removal - all implementations should in the end extends from AbstractEnforcementReloaded instead
*/
public abstract class AbstractEnforcement<C extends Signal<?>> {
/**
* Context of the enforcement step: sender, self, signal and so forth.
*/
protected final Contextual<C> context;
/**
* Create an enforcement step from its context.
*
* @param context the context of the enforcement step.
*/
protected AbstractEnforcement(final Contextual<C> context) {
this.context = context;
}
/**
* Performs authorization enforcement for the passed {@code signal}.
* If the signal is authorized, the implementation chooses to which target to forward. If it is not authorized, the
* passed {@code sender} will get an authorization error response.
* CAUTION: May deliver a failed future.
*
* @return future after enforcement was performed.
*/
public abstract CompletionStage<Contextual<WithDittoHeaders>> enforce();
/**
* Performs authorization enforcement for the passed {@code signal}.
* If the signal is authorized, the implementation chooses to which target to forward. If it is not authorized, the
* passed {@code sender} will get an authorization error response.
* The result future always succeeds.
*
* @return future after enforcement was performed.
*/
public CompletionStage<Contextual<WithDittoHeaders>> enforceSafely() {
return enforce().handle(handleEnforcementCompletion());
}
private BiFunction<Contextual<WithDittoHeaders>, Throwable, Contextual<WithDittoHeaders>> handleEnforcementCompletion() {
return (result, throwable) -> {
if (null != result) {
final ThreadSafeDittoLoggingAdapter l = result.getLog().withCorrelationId(result);
final String typeHint = result.getMessageOptional()
.filter(WithType.class::isInstance)
.map(msg -> ((WithType) msg).getType())
.orElse("?");
l.info("Completed enforcement of contextual message type <{}> with outcome 'success'",
typeHint);
l.debug("Completed enforcement of contextual message type <{}> with outcome 'success' " +
"and headers: <{}>", typeHint, result.getDittoHeaders());
} else {
log().info("Completed enforcement of contextual message with outcome 'failed' and headers: " +
"<{}>", dittoHeaders());
}
return Objects.requireNonNullElseGet(result,
() -> withMessageToReceiver(reportError("Error thrown during enforcement", throwable), sender()));
};
}
/**
* Report unexpected error or unknown response.
*/
protected DittoRuntimeException reportErrorOrResponse(final String hint, @Nullable final Object response,
@Nullable final Throwable error) {
if (error != null) {
return reportError(hint, error);
} else if (response instanceof Throwable throwable) {
return reportError(hint, throwable);
} else if (response != null) {
return reportUnknownResponse(hint, response);
} else {
return reportError(hint, new NullPointerException("Response and error were null."));
}
}
/**
* Reports an error differently based on type of the error. If the error is of type
* {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException}, it is returned as is
* (without modification), otherwise it is wrapped inside a
* {@link org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException}.
*
* @param hint hint about the nature of the error.
* @param throwable the error.
* @return DittoRuntimeException suitable for transmission of the error.
*/
protected DittoRuntimeException reportError(final String hint, @Nullable final Throwable throwable) {
final Throwable error = throwable == null
? new NullPointerException("Result and error are both null")
: throwable;
final var dre = DittoRuntimeException.asDittoRuntimeException(
error, cause -> reportUnexpectedError(hint, cause));
log().info("{} - {}: {}", hint, dre.getClass().getSimpleName(), dre.getMessage());
return dre;
}
/**
* Report unexpected error.
*/
private DittoRuntimeException reportUnexpectedError(final String hint, final Throwable error) {
log().error(error, "Unexpected error {} - {}: {}", hint, error.getClass().getSimpleName(),
error.getMessage());
return DittoInternalErrorException.newBuilder()
.cause(error)
.dittoHeaders(dittoHeaders())
.build();
}
/**
* Report unknown response.
*/
protected DittoInternalErrorException reportUnknownResponse(final String hint, final Object response) {
log().error("Unexpected response {}: <{}>", hint, response);
return DittoInternalErrorException.newBuilder().dittoHeaders(dittoHeaders()).build();
}
/**
* Extend a signal by subject headers given with granted and revoked READ access.
* The subjects are provided by the given enforcer for the resource type {@link ThingConstants#ENTITY_TYPE}.
*
* @param signal the signal to extend.
* @param enforcer the enforcer.
* @return the extended signal.
*/
protected static <T extends Signal<T>> T addEffectedReadSubjectsToThingSignal(final Signal<T> signal,
final Enforcer enforcer) {
final var resourceKey = ResourceKey.newInstance(ThingConstants.ENTITY_TYPE, signal.getResourcePath());
final var authorizationSubjects = enforcer.getSubjectsWithUnrestrictedPermission(resourceKey, Permission.READ);
final var newHeaders = DittoHeaders.newBuilder(signal.getDittoHeaders())
.readGrantedSubjects(authorizationSubjects)
.build();
return signal.setDittoHeaders(newHeaders);
}
/**
* Check whether response or error from a future is {@code AskTimeoutException}.
*
* @param response response from a future.
* @param error error thrown in a future.
* @return whether either is {@code AskTimeoutException}.
*/
protected static boolean isAskTimeoutException(final Object response, @Nullable final Throwable error) {
return error instanceof AskTimeoutException || response instanceof AskTimeoutException;
}
/**
* @return the configuration of "ask with retry" pattern during enforcement.
*/
protected AskWithRetryConfig getAskWithRetryConfig() {
return context.getAskWithRetryConfig();
}
/**
* @return the entity ID.
*/
protected EnforcementCacheKey entityId() {
return context.getCacheKey();
}
/**
* @param withPotentialDittoHeaders the object which potentially contains DittoHeaders from which a
* {@code correlation-id} can be extracted in order to enhance the returned DiagnosticLoggingAdapter
* @return the diagnostic logging adapter.
*/
protected ThreadSafeDittoLoggingAdapter log(final Object withPotentialDittoHeaders) {
if (withPotentialDittoHeaders instanceof WithDittoHeaders withDittoHeaders) {
return context.getLog().withCorrelationId(withDittoHeaders);
}
if (withPotentialDittoHeaders instanceof DittoHeaders dittoHeaders) {
return context.getLog().withCorrelationId(dittoHeaders);
}
return context.getLog();
}
/**
* @return the diagnostic logging adapter.
*/
protected ThreadSafeDittoLoggingAdapter log() {
return context.getLog().withCorrelationId(dittoHeaders());
}
/**
* @return Akka pubsub mediator.
*/
protected ActorRef pubSubMediator() {
return context.getPubSubMediator();
}
/**
* @return actor reference of the enforcer actor this object belongs to.
*/
protected ActorRef self() {
return context.getSelf();
}
/**
* @return the sender of the sent {@link #signal()}
*/
protected ActorRef sender() {
return context.getSender();
}
/**
* @return the sent Signal of subtype {@code <T>}
*/
protected C signal() {
return context.getMessage();
}
/**
* Inserts the passed {@code message} and {@code receiver} into the current {@link Contextual} {@link #context}.
*
* @param message the message to insert into the current context.
* @param receiver the ActorRef of the receiver which should get the message.
* @param <S> the message's type
* @return the adjusted context.
*/
protected <S extends WithDittoHeaders> Contextual<S> withMessageToReceiver(
@Nullable final S message,
@Nullable final ActorRef receiver) {
return context.setMessage(message).withReceiver(receiver);
}
/**
* Insert the passed {@code message} and {@code receiver} into the current {@link Contextual} {@link #context},
* then insert {@code askFutureWithoutErrorHandling} after appending error handling logic to it.
*
* @param message the message to log when executing the contextual; usually the message to ask.
* @param receiver the final receiver of the ask result.
* @param askFutureWithoutErrorHandling supplier of a future that performs an ask-operation with command-order
* guarantee, whose result is to be piped to {@code receiver}.
* @param <T> type of results of the ask future.
* @return a copy of the context with message, receiver and ask-future including error handling.
*/
protected <T> Contextual<WithDittoHeaders> withMessageToReceiverViaAskFuture(final WithDittoHeaders message,
final ActorRef receiver,
final Supplier<CompletionStage<T>> askFutureWithoutErrorHandling) {
return this.withMessageToReceiver(message, receiver)
.withAskFuture(() -> askFutureWithoutErrorHandling.get()
.<Object>thenApply(x -> x)
.exceptionally(error -> this.reportError("Error thrown during enforcement", error))
);
}
/**
* Inserts the passed {@code message} and {@code receiver} into the current {@link Contextual} {@link #context}
* providing a function which shall be invoked prior to sending the {@code message} to the {@code receiver}.
*
* @param message the message to insert into the current context.
* @param receiver the ActorRef of the receiver which should get the message.
* @param wrapperFunction the function to apply prior to sending to the {@code receiver}.
* @param <S> the message's type
* @return the adjusted context.
*/
protected <S extends WithDittoHeaders> Contextual<S> withMessageToReceiver(final S message,
final ActorRef receiver,
final UnaryOperator<Object> wrapperFunction) {
return context.setMessage(message).withReceiver(receiver).withReceiverWrapperFunction(wrapperFunction);
}
/**
* @return the DittoHeaders of the sent {@link #signal()}
*/
protected DittoHeaders dittoHeaders() {
return signal().getDittoHeaders();
}
/**
* @return the {@code ConciergeForwarderActor} reference
*/
protected ActorRef commandForwarder() {
return context.getCommandForwarder();
}
}