-
Notifications
You must be signed in to change notification settings - Fork 215
/
ImplicitThingCreationMessageMapper.java
283 lines (246 loc) · 13.6 KB
/
ImplicitThingCreationMessageMapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.mapping;
import static org.eclipse.ditto.base.model.exceptions.DittoJsonException.wrapJsonRuntimeException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.Placeholders;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityIdInvalidException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTagMatcher;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTagMatchers;
import org.eclipse.ditto.base.model.signals.GlobalErrorRegistry;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationInvalidException;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.edge.service.placeholders.RequestPlaceholder;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.HeadersPlaceholder;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderFilter;
import org.eclipse.ditto.placeholders.TimePlaceholder;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.ThingsModelFactory;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
/**
* This mapper creates a {@link org.eclipse.ditto.things.model.signals.commands.modify.CreateThing} command from
* a given thing template and may substitutes placeholders by given headers which can be {@code device_id},
* {@code entity_id} or {@code gateway_id}.
* The thingId must be set in the mapping configuration. It can either be a fixed Thing ID
* or it can be resolved from the message headers by using a placeholder e.g. {@code {{ header:device_id }}}.
* The policyId is not required to be set in the mapping configuration. If not set, the policyId will be the same as
* the thingId.
*
* @since 1.3.0
*/
@PayloadMapper(
alias = "ImplicitThingCreation",
requiresMandatoryConfiguration = true // "thing" is mandatory configuration
)
public final class ImplicitThingCreationMessageMapper extends AbstractMessageMapper {
private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(ImplicitThingCreationMessageMapper.class);
private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder();
private static final RequestPlaceholder REQUEST_PLACEHOLDER = ConnectivityPlaceholders.newRequestPlaceholder();
private static final String THING_TEMPLATE = "thing";
private static final String ALLOW_POLICY_LOCKOUT_OPTION = "allowPolicyLockout";
private static final String COMMAND_HEADERS = "commandHeaders";
private static final String THING_ID = "thingId";
private static final String THING_ID_CONFIGURATION_PROPERTY = THING_TEMPLATE + "/" + THING_ID;
private static final String POLICY_ID = "policyId";
private static final String POLICY_ID_CONFIGURATION_PROPERTY = THING_TEMPLATE + "/" + POLICY_ID;
public static final EntityTagMatcher ASTERISK = EntityTagMatcher.asterisk();
private final BiFunction<Map<String, String>, AuthorizationContext, ExpressionResolver> resolverFactory;
private String thingTemplate;
private Map<String, String> commandHeaders;
private boolean allowPolicyLockout;
/**
* Default constructor that supports headers placeholders in mapped headers and templates.
*/
@SuppressWarnings("unused")
public ImplicitThingCreationMessageMapper() {
this(ImplicitThingCreationMessageMapper::getExpressionResolver);
}
/**
* Constructor with customizable expression resolver.
*
* @param resolverFactory the creator of expression resolver.
*/
public ImplicitThingCreationMessageMapper(
final BiFunction<Map<String, String>, AuthorizationContext, ExpressionResolver> resolverFactory) {
this.resolverFactory = resolverFactory;
}
@Override
protected void doConfigure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) {
thingTemplate = configuration.findProperty(THING_TEMPLATE).orElseThrow(
() -> MessageMapperConfigurationInvalidException.newBuilder(THING_TEMPLATE).build());
commandHeaders = configuration.findProperty(COMMAND_HEADERS, JsonValue::isObject, JsonValue::asObject)
.filter(configuredHeaders -> !configuredHeaders.isEmpty())
.map(configuredHeaders -> {
final Map<String, String> newCommandHeaders = new LinkedHashMap<>();
newCommandHeaders.put(DittoHeaderDefinition.IF_NONE_MATCH.getKey(), ASTERISK.toString());
for (final JsonField field : configuredHeaders) {
newCommandHeaders.put(field.getKeyName(), field.getValue().formatAsString());
}
return Collections.unmodifiableMap(newCommandHeaders);
})
.orElseGet(() -> DittoHeaders.newBuilder()
.ifNoneMatch(EntityTagMatchers.fromList(Collections.singletonList(ASTERISK)))
.build());
final JsonObject thingJson = JsonObject.of(thingTemplate);
thingJson.getValue(THING_ID)
.map(JsonValue::asString)
.ifPresentOrElse(ImplicitThingCreationMessageMapper::validateThingEntityId, () -> {
throw MessageMapperConfigurationInvalidException.newBuilder(THING_ID_CONFIGURATION_PROPERTY)
.build();
});
// PolicyId is not required in mapping config. Still needs to be valid if present.
thingJson.getValue(POLICY_ID)
.map(JsonValue::asString)
.ifPresent(ImplicitThingCreationMessageMapper::validatePolicyEntityId);
LOGGER.debug("Configured with Thing template: {}", thingTemplate);
allowPolicyLockout = configuration.findProperty(ALLOW_POLICY_LOCKOUT_OPTION).map(Boolean::valueOf).orElse(true);
}
private static void validateThingEntityId(final String thingId) {
try {
if (!Placeholders.containsAnyPlaceholder(thingId)) {
ThingId.of(thingId);
}
} catch (final NamespacedEntityIdInvalidException e) {
throw MessageMapperConfigurationInvalidException.newBuilder(THING_ID_CONFIGURATION_PROPERTY)
.message(e.getMessage())
.description(e.getDescription().orElse("Make sure to use a valid Thing ID."))
.build();
}
}
private static void validatePolicyEntityId(final String policyId) {
try {
if (!Placeholders.containsAnyPlaceholder(policyId)) {
PolicyId.of(policyId);
}
} catch (final NamespacedEntityIdInvalidException e) {
throw MessageMapperConfigurationInvalidException.newBuilder(POLICY_ID_CONFIGURATION_PROPERTY)
.message(e.getMessage())
.description(e.getDescription().orElse("Make sure to use a valid Policy ID."))
.build();
}
}
@Override
public List<Adaptable> map(final ExternalMessage message) {
LOGGER.withCorrelationId(message.getInternalHeaders()).debug("Received ExternalMessage: {}", message);
final Map<String, String> externalHeaders = message.getHeaders();
final ExpressionResolver expressionResolver = resolverFactory.apply(externalHeaders,
message.getAuthorizationContext().orElse(null));
final String resolvedTemplate;
if (Placeholders.containsAnyPlaceholder(thingTemplate)) {
resolvedTemplate = PlaceholderFilter.apply(thingTemplate, expressionResolver);
} else {
resolvedTemplate = thingTemplate;
}
commandHeaders = resolveCommandHeaders(expressionResolver, commandHeaders);
final Signal<CreateThing> createThing = getCreateThingSignal(message, resolvedTemplate);
final Adaptable adaptable = DITTO_PROTOCOL_ADAPTER.toAdaptable(createThing);
// we cannot set the header on CreateThing directly because it is filtered when mapped to an adaptable
final DittoHeaders modifiedHeaders = DittoHeaders.of(commandHeaders).toBuilder()
.allowPolicyLockout(allowPolicyLockout)
.build();
final Adaptable adaptableWithModifiedHeaders = adaptable.setDittoHeaders(modifiedHeaders);
LOGGER.withCorrelationId(message.getInternalHeaders())
.debug("Mapped ExternalMessage to Adaptable: {}", adaptableWithModifiedHeaders);
return Collections.singletonList(adaptableWithModifiedHeaders);
}
@Override
public DittoHeaders getAdditionalInboundHeaders(final ExternalMessage message) {
return DittoHeaders.empty();
}
private static ExpressionResolver getExpressionResolver(final Map<String, String> headers,
@Nullable final AuthorizationContext authorizationContext) {
return PlaceholderFactory.newExpressionResolver(
PlaceholderFactory.newPlaceholderResolver(TIME_PLACEHOLDER, new Object()),
PlaceholderFactory.newPlaceholderResolver(HEADERS_PLACEHOLDER, headers),
PlaceholderFactory.newPlaceholderResolver(REQUEST_PLACEHOLDER, authorizationContext)
);
}
private Signal<CreateThing> getCreateThingSignal(final ExternalMessage message, final String template) {
final JsonObject thingJson = wrapJsonRuntimeException(() -> JsonFactory.newObject(template));
final Thing newThing = ThingsModelFactory.newThing(thingJson);
final JsonObject inlinePolicyJson = createInlinePolicyJson(thingJson);
final String copyPolicyFrom = getCopyPolicyFrom(thingJson);
final DittoHeaders dittoHeaders = message.getInternalHeaders().toBuilder()
.putHeaders(commandHeaders)
.build();
return CreateThing.of(newThing, inlinePolicyJson, copyPolicyFrom, dittoHeaders);
}
private static Map<String, String> resolveCommandHeaders(final ExpressionResolver resolver,
final Map<String, String> unresolvedHeaders) {
final Map<String, String> resolvedHeaders = new LinkedHashMap<>();
unresolvedHeaders.forEach((key, value) ->
resolver.resolve(value).findFirst().ifPresent(resolvedHeaderValue ->
resolvedHeaders.put(key, resolvedHeaderValue)
)
);
return resolvedHeaders;
}
@Nullable
private static JsonObject createInlinePolicyJson(final JsonObject thingJson) {
return thingJson.getValue(Policy.INLINED_FIELD_NAME)
.map(jsonValue -> wrapJsonRuntimeException(jsonValue::asObject))
.orElse(null);
}
@Nullable
private static String getCopyPolicyFrom(final JsonObject thingJson) {
return thingJson.getValue(CreateThing.JSON_COPY_POLICY_FROM)
.orElse(null);
}
@Override
public List<ExternalMessage> map(final Adaptable adaptable) {
if (isErrorByTopicPath(adaptable)) {
final var adaptablePayload = adaptable.getPayload();
adaptablePayload.getValue()
.filter(JsonValue::isObject)
.map(JsonValue::asObject)
.ifPresentOrElse(jsonObject -> {
final var globalErrorRegistry = GlobalErrorRegistry.getInstance();
throw globalErrorRegistry.parse(jsonObject, adaptable.getDittoHeaders());
}, () -> LOGGER.withCorrelationId(adaptable.getDittoHeaders())
.warn("Unexpected error adaptable. Expected value of type JsonObject in payload but got " +
"<{}>.", adaptablePayload));
}
return Collections.emptyList();
}
private static boolean isErrorByTopicPath(final Adaptable adaptable) {
final var topicPath = adaptable.getTopicPath();
return topicPath.isCriterion(TopicPath.Criterion.ERRORS);
}
}