-
Notifications
You must be signed in to change notification settings - Fork 214
/
SupervisorInlinePolicyEnrichment.java
221 lines (197 loc) · 10.1 KB
/
SupervisorInlinePolicyEnrichment.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.things.service.persistence.actors;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.policies.enforcement.config.EnforcementConfig;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicy;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicyResponse;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThing;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThingResponse;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
/**
* Functionality used in {@link ThingSupervisorActor} for retrieving an inline {@code Policy} together with a
* {@link RetrieveThing} command.
*/
final class SupervisorInlinePolicyEnrichment {
private static final Duration DEFAULT_LOCAL_ASK_TIMEOUT = Duration.ofSeconds(5);
private final ActorSystem actorSystem;
private final ThreadSafeDittoLoggingAdapter log;
private final ThingId thingId;
private final ActorSelection thingPersistenceActor;
private final ActorRef policiesShardRegion;
private final EnforcementConfig enforcementConfig;
SupervisorInlinePolicyEnrichment(final ActorSystem actorSystem,
final ThreadSafeDittoLoggingAdapter log,
final ThingId thingId,
final ActorSelection thingPersistenceActor,
final ActorRef policiesShardRegion,
final EnforcementConfig enforcementConfig) {
this.actorSystem = actorSystem;
this.log = log;
this.thingId = thingId;
this.thingPersistenceActor = thingPersistenceActor;
this.policiesShardRegion = policiesShardRegion;
this.enforcementConfig = enforcementConfig;
}
/**
* Check if inlined policy should be retrieved together with the thing.
*
* @param retrieveThing the RetrieveThing command.
* @return whether it is necessary to retrieve the thing's policy.
*/
static boolean shouldRetrievePolicyWithThing(final RetrieveThing retrieveThing) {
return retrieveThing.getSelectedFields()
.filter(selector -> selector.getPointers()
.stream()
.anyMatch(jsonPointer -> jsonPointer.getRoot()
.filter(jsonKey -> Policy.INLINED_FIELD_NAME.equals(jsonKey.toString()))
.isPresent()))
.isPresent();
}
/**
* Based on the need to retrieve a policy together with a thing determined by
* {@link #shouldRetrievePolicyWithThing(RetrieveThing)} this method creates a {@code RetrieveThingResponse}
* containing the inlined policy as {@code "_policy"} field in the Thing JSON.
*
* @param retrieveThing the RetrieveThing command which contains a field selector with {@code "_policy"} field
* @param retrieveThingResponse the response from the thing persistence actor containing the Thing JSON to enrich
* with the policy.
* @return a Source of a single RetrieveThingResponse combining Thing JSON and inlined {@code "_policy"}
*/
Source<RetrieveThingResponse, NotUsed> enrichPolicy(final RetrieveThing retrieveThing,
final RetrieveThingResponse retrieveThingResponse) {
return retrievePolicyIdViaSudoRetrieveThing()
.map(SudoRetrieveThingResponse::getThing)
.map(Thing::getPolicyId)
.map(optionalPolicyId -> optionalPolicyId.orElseThrow(() -> {
log.withCorrelationId(retrieveThing)
.warning("Found thing without policy ID. This should never be possible. " +
"This is most likely a bug and should be fixed.");
return ThingNotAccessibleException.newBuilder(thingId)
.dittoHeaders(retrieveThing.getDittoHeaders())
.build();
}))
.map(policyId -> {
final var dittoHeadersWithoutPreconditionHeaders = retrieveThing.getDittoHeaders()
.toBuilder()
.removePreconditionHeaders()
.build();
return RetrievePolicy.of(policyId, dittoHeadersWithoutPreconditionHeaders);
})
.map(this::retrieveInlinedPolicyForThing)
.flatMapConcat(Source::completionStage)
.map(policyResponse -> {
if (policyResponse.isPresent()) {
final JsonObject inlinedPolicy = policyResponse.get()
.getPolicy()
.toInlinedJson(retrieveThing.getImplementedSchemaVersion(),
FieldType.notHidden());
final JsonObject thingWithInlinedPolicy = retrieveThingResponse.getEntity()
.asObject()
.toBuilder()
.setAll(inlinedPolicy)
.build();
return retrieveThingResponse.setEntity(thingWithInlinedPolicy);
} else {
return retrieveThingResponse;
}
});
}
private Source<SudoRetrieveThingResponse, NotUsed> retrievePolicyIdViaSudoRetrieveThing() {
final CompletionStage<Object> askForThing =
Patterns.ask(thingPersistenceActor, SudoRetrieveThing.of(thingId,
JsonFieldSelector.newInstance("policyId"),
DittoHeaders.newBuilder()
.correlationId("retrievePolicyIdViaSudoRetrieveThing-" + UUID.randomUUID())
.build()
), DEFAULT_LOCAL_ASK_TIMEOUT
);
return Source.completionStage(askForThing)
.map(response -> {
if (response instanceof DittoRuntimeException dre) {
throw dre;
}
return response;
})
.divertTo(Sink.foreach(unexpectedResponseType ->
log.warning("Unexpected response type. Expected <{}>, but got <{}>.",
SudoRetrieveThingResponse.class, unexpectedResponseType.getClass())),
response -> !(response instanceof SudoRetrieveThingResponse))
.map(SudoRetrieveThingResponse.class::cast);
}
/**
* Retrieve inlined policy after retrieving a thing. Do not report errors.
*
* @param retrievePolicy the command to retrieve the thing's policy.
* @return future response from policies-shard-region.
*/
private CompletionStage<Optional<RetrievePolicyResponse>> retrieveInlinedPolicyForThing(
final RetrievePolicy retrievePolicy) {
return AskWithRetry.askWithRetry(policiesShardRegion, retrievePolicy,
enforcementConfig.getAskWithRetryConfig(),
actorSystem,
response -> {
if (response instanceof RetrievePolicyResponse retrievePolicyResponse) {
return Optional.of(retrievePolicyResponse);
} else {
log.withCorrelationId(getCorrelationIdOrNull(response, retrievePolicy))
.info("No authorized response when retrieving inlined policy <{}> for thing <{}>: {}",
retrievePolicy.getEntityId(), thingId, response);
return Optional.<RetrievePolicyResponse>empty();
}
}
).exceptionally(error -> {
log.withCorrelationId(getCorrelationIdOrNull(error, retrievePolicy))
.error(error, "Retrieving inlined policy after RetrieveThing");
return Optional.empty();
});
}
@Nullable
private static CharSequence getCorrelationIdOrNull(final Object signal, final WithDittoHeaders fallBackSignal) {
final WithDittoHeaders withDittoHeaders;
if (isWithDittoHeaders(signal)) {
withDittoHeaders = (WithDittoHeaders) signal;
} else {
withDittoHeaders = fallBackSignal;
}
final var dittoHeaders = withDittoHeaders.getDittoHeaders();
return dittoHeaders.getCorrelationId().orElse(null);
}
private static boolean isWithDittoHeaders(final Object o) {
return o instanceof WithDittoHeaders;
}
}