/
AbstractEnforcement.java
373 lines (328 loc) · 15 KB
/
AbstractEnforcement.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.concierge.enforcement;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.enforcers.EffectedSubjects;
import org.eclipse.ditto.model.enforcers.Enforcer;
import org.eclipse.ditto.model.policies.ResourceKey;
import org.eclipse.ditto.model.things.ThingConstants;
import org.eclipse.ditto.services.models.policies.Permission;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.cache.EntityIdWithResourceType;
import org.eclipse.ditto.services.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;
import akka.actor.ActorRef;
import akka.event.DiagnosticLoggingAdapter;
import akka.pattern.AskTimeoutException;
/**
* Contains self-type requirements for aspects of enforcer actor dealing with specific commands.
* Implementations only need to implement {@link #enforce()} in which they
* check if the passed in {@code signal} is authorized and forward it accordingly or respond with an error to the passed
* in {@code sender}.
*/
public abstract class AbstractEnforcement<T extends Signal<?>> {
/**
* Context of the enforcement step: sender, self, signal and so forth.
*/
protected final Contextual<T> context;
/**
* Create an enforcement step from its context.
*
* @param context the context of the enforcement step.
*/
protected AbstractEnforcement(final Contextual<T> context) {
this.context = context;
}
/**
* Performs authorization enforcement for the passed {@code signal}.
* If the signal is authorized, the implementation chooses to which target to forward. If it is not authorized, the
* passed {@code sender} will get an authorization error response.
* CAUTION: May deliver a failed future.
*
* @return future after enforcement was performed.
*/
public abstract CompletionStage<Contextual<WithDittoHeaders>> enforce();
/**
* Performs authorization enforcement for the passed {@code signal}.
* If the signal is authorized, the implementation chooses to which target to forward. If it is not authorized, the
* passed {@code sender} will get an authorization error response.
* The result future always succeeds.
*
* @return future after enforcement was performed.
*/
public CompletionStage<Contextual<WithDittoHeaders>> enforceSafely() {
return enforce().handle(handleEnforcementCompletion());
}
/**
* Handle error for a future message such that failed futures become a completed future with DittoRuntimeException.
*
* @param throwable error of the future on failure.
* @return the DittoRuntimeException.
*/
private DittoRuntimeException convertError(@Nullable final Throwable throwable) {
final Throwable error = throwable instanceof CompletionException
? throwable.getCause()
: throwable != null
? throwable
: new NullPointerException("Result and error are both null");
return reportError("Error thrown during enforcement", error);
}
private BiFunction<Contextual<WithDittoHeaders>, Throwable, Contextual<WithDittoHeaders>> handleEnforcementCompletion() {
return (result, throwable) -> {
context.getStartedTimer()
.map(startedTimer -> startedTimer.tag("outcome", throwable != null ? "fail" : "success"))
.ifPresent(StartedTimer::stop);
return Objects.requireNonNullElseGet(result,
() -> withMessageToReceiver(convertError(throwable), sender()));
};
}
/**
* Report unexpected error or unknown response.
*/
protected DittoRuntimeException reportUnexpectedErrorOrResponse(final String hint, final Object response,
@Nullable final Throwable error) {
if (error != null) {
return reportUnexpectedError(hint, error);
} else {
return reportUnknownResponse(hint, response);
}
}
/**
* Reports an error differently based on type of the error. If the error is of type
* {@link org.eclipse.ditto.model.base.exceptions.DittoRuntimeException}, it is send to the {@code sender}
* without modification, otherwise it is wrapped inside a {@link GatewayInternalErrorException}.
*
* @param hint hint about the nature of the error.
* @param error the error.
* @return DittoRuntimerException suitable for transmission of the error.
*/
protected DittoRuntimeException reportError(final String hint, final Throwable error) {
if (error instanceof DittoRuntimeException) {
log().info("{} - {}: {}", hint, error.getClass().getSimpleName(), error.getMessage());
return (DittoRuntimeException) error;
} else {
return reportUnexpectedError(hint, error);
}
}
/**
* Report unexpected error.
*/
protected DittoRuntimeException reportUnexpectedError(final String hint, final Throwable error) {
log().error(error, "Unexpected error {} - {}: {}", hint, error.getClass().getSimpleName(),
error.getMessage());
return mapToExternalException(error);
}
/**
* Report unknown response.
*/
protected GatewayInternalErrorException reportUnknownResponse(final String hint, final Object response) {
log().error("Unexpected response {}: <{}>", hint, response);
return GatewayInternalErrorException.newBuilder().dittoHeaders(dittoHeaders()).build();
}
private DittoRuntimeException mapToExternalException(final Throwable error) {
if (error instanceof GatewayInternalErrorException) {
return (GatewayInternalErrorException) error;
} else {
log().error(error, "Unexpected non-DittoRuntimeException error - responding with " +
"GatewayInternalErrorException - {} :{}", error.getClass().getSimpleName(), error.getMessage());
return GatewayInternalErrorException.newBuilder()
.cause(error)
.dittoHeaders(dittoHeaders())
.build();
}
}
/**
* Extend a signal by subject headers given with granted and revoked READ access.
* The subjects are provided by the given enforcer for the resource type {@link ThingConstants#ENTITY_TYPE}.
*
* @param signal the signal to extend.
* @param enforcer the enforcer.
* @return the extended signal.
*/
protected static <T extends Signal<T>> T addEffectedReadSubjectsToThingSignal(final Signal<T> signal,
final Enforcer enforcer) {
final ResourceKey resourceKey = ResourceKey.newInstance(ThingConstants.ENTITY_TYPE, signal.getResourcePath());
final EffectedSubjects effectedSubjects = enforcer.getSubjectsWithPermission(resourceKey, Permission.READ);
final DittoHeaders newHeaders = DittoHeaders.newBuilder(signal.getDittoHeaders())
.readGrantedSubjects(effectedSubjects.getGranted())
.readRevokedSubjects(effectedSubjects.getRevoked())
.build();
return signal.setDittoHeaders(newHeaders);
}
/**
* Check whether response or error from a future is {@code AskTimeoutException}.
*
* @param response response from a future.
* @param error error thrown in a future.
* @return whether either is {@code AskTimeoutException}.
*/
protected static boolean isAskTimeoutException(final Object response, @Nullable final Throwable error) {
return error instanceof AskTimeoutException || response instanceof AskTimeoutException;
}
/**
* @return Timeout duration for asking entity shard regions.
*/
protected Duration getAskTimeout() {
return context.getAskTimeout();
}
/**
* @return the entity ID.
*/
protected EntityIdWithResourceType entityId() {
return context.getEntityIdWithResourceType();
}
/**
* @param withPotentialDittoHeaders the object which potentially contains DittoHeaders from which a
* {@code correlation-id} can be extracted in order to enhance the returned DiagnosticLoggingAdapter
* @return the diagnostic logging adapter.
*/
protected DiagnosticLoggingAdapter log(final Object withPotentialDittoHeaders) {
if (withPotentialDittoHeaders instanceof WithDittoHeaders) {
return log(((WithDittoHeaders<?>) withPotentialDittoHeaders).getDittoHeaders());
}
if (withPotentialDittoHeaders instanceof DittoHeaders) {
LogUtil.enhanceLogWithCorrelationId(context.getLog(), (DittoHeaders) withPotentialDittoHeaders);
}
return context.getLog();
}
/**
* @return the diagnostic logging adapter.
*/
protected DiagnosticLoggingAdapter log() {
LogUtil.enhanceLogWithCorrelationId(context.getLog(), dittoHeaders());
return context.getLog();
}
/**
* @return Akka pubsub mediator.
*/
protected ActorRef pubSubMediator() {
return context.getPubSubMediator();
}
/**
* @return actor reference of the enforcer actor this object belongs to.
*/
protected ActorRef self() {
return context.getSelf();
}
/**
* @return the sender of the sent {@link #signal()}
*/
protected ActorRef sender() {
return context.getSender();
}
/**
* @return the sent Signal of subtype {@code <T>}
*/
protected T signal() {
return context.getMessage();
}
/**
* Inserts the passed {@code message} and {@code receiver} into the current {@link Contextual} {@link #context}.
*
* @param message the message to insert into the current context.
* @param receiver the ActorRef of the receiver which should get the message.
* @param <S> the message's type
* @return the adjusted context.
*/
protected <S extends WithDittoHeaders> Contextual<S> withMessageToReceiver(
@Nullable final S message,
@Nullable final ActorRef receiver) {
return context.withMessage(message).withReceiver(receiver);
}
/**
* Insert the passed {@code message} and {@code receiver} into the current {@link Contextual} {@link #context},
* then insert {@code askFutureWithoutErrorHandling} after appending error handling logic to it.
*
* @param message the message to log when executing the contextual; usually the message to ask.
* @param receiver the final receiver of the ask result.
* @param askFutureWithoutErrorHandling supplier of a future that performs an ask-operation with command-order
* guarantee, whose result is to be piped to {@code receiver}.
* @param <T> type of messages produced by the ask future.
* @return a copy of the context with message, receiver and ask-future including error handling.
*/
protected <T> Contextual<WithDittoHeaders> withMessageToReceiverViaAskFuture(final WithDittoHeaders message,
final ActorRef receiver,
final Supplier<CompletionStage<T>> askFutureWithoutErrorHandling) {
return withMessageToReceiver(message, receiver)
.withAskFuture(() -> askFutureWithoutErrorHandling.get()
.<Object>thenApply(x -> x)
.exceptionally(this::convertError)
);
}
/**
* Inserts the passed {@code message} and {@code receiver} into the current {@link Contextual} {@link #context}
* providing a function which shall be invoked prior to sending the {@code message} to the {@code receiver}.
*
* @param message the message to insert into the current context.
* @param receiver the ActorRef of the receiver which should get the message.
* @param wrapperFunction the function to apply prior to sending to the {@code receiver}.
* @param <S> the message's type
* @return the adjusted context.
*/
protected <S extends WithDittoHeaders> Contextual<S> withMessageToReceiver(final S message, final ActorRef receiver,
final Function<Object, Object> wrapperFunction) {
return context.withMessage(message).withReceiver(receiver).withReceiverWrapperFunction(wrapperFunction);
}
/**
* Sets the {@code null} receiver to the {@link #context} meaning that no message at all is emitted. Therefore
* the {@code message} may also stay {@code null}.
*
* @return the adjusted context.
*/
protected <S extends WithDittoHeaders> Contextual<S> withoutReceiver(final S message) {
return context.withMessage(message).withReceiver(null);
}
/**
* @return the DittoHeaders of the sent {@link #signal()}
*/
protected DittoHeaders dittoHeaders() {
return signal().getDittoHeaders();
}
/**
* @return the {@link org.eclipse.ditto.services.models.concierge.actors.ConciergeForwarderActor} reference
*/
protected ActorRef conciergeForwarder() {
return context.getConciergeForwarder();
}
/**
* Handle the passed {@code throwable} by sending it to the {@link #context}'s sender.
*
* @param throwable the occurred throwable (most likely a {@link DittoRuntimeException}) to send to the sender.
* @return the built contextual including the DittoRuntimeException.
*/
protected Contextual<WithDittoHeaders> handleExceptionally(final Throwable throwable) {
final Contextual<T> newContext = context.withReceiver(context.getSender());
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable,
cause -> {
LogUtil.enhanceLogWithCorrelationId(log(), context.getDittoHeaders());
log().error(cause, "Unexpected non-DittoRuntimeException");
return GatewayInternalErrorException.newBuilder()
.cause(cause)
.dittoHeaders(context.getDittoHeaders())
.build();
});
return newContext.withMessage(dittoRuntimeException);
}
}