-
Notifications
You must be signed in to change notification settings - Fork 214
/
AbstractAdapter.java
executable file
·184 lines (158 loc) · 7.62 KB
/
AbstractAdapter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.protocoladapter;
import static java.util.Objects.requireNonNull;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
import java.util.Map;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoJsonException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.model.base.json.Jsonifiable;
import org.eclipse.ditto.model.messages.MessageHeaderDefinition;
import org.eclipse.ditto.model.things.ThingId;
/**
* Abstract implementation of {@link Adapter} to provide common functionality.
*/
public abstract class AbstractAdapter<T extends Jsonifiable.WithPredicate<JsonObject, JsonField>>
implements Adapter<T> {
private final Map<String, JsonifiableMapper<T>> mappingStrategies;
private final HeaderTranslator headerTranslator;
protected final PathMatcher pathMatcher;
protected AbstractAdapter(final Map<String, JsonifiableMapper<T>> mappingStrategies,
final HeaderTranslator headerTranslator, final PathMatcher pathMatcher) {
this.mappingStrategies = requireNonNull(mappingStrategies);
this.headerTranslator = requireNonNull(headerTranslator);
this.pathMatcher = requireNonNull(pathMatcher);
}
protected static boolean isCreated(final Adaptable adaptable) {
return adaptable.getPayload().getStatus()
.map(HttpStatusCode.CREATED::equals)
.orElseThrow(() -> JsonParseException.newBuilder().build());
}
/**
* Reads Ditto headers from an Adaptable. CAUTION: Headers are taken as-is!.
*
* @param adaptable the protocol message.
* @return the headers of the message.
*/
protected static DittoHeaders dittoHeadersFrom(final Adaptable adaptable) {
return adaptable.getHeaders().orElseGet(DittoHeaders::empty);
}
protected static ThingId thingIdFrom(final Adaptable adaptable) {
final TopicPath topicPath = adaptable.getTopicPath();
return ThingId.of(topicPath.getNamespace(), topicPath.getId());
}
protected static String featureIdForMessageFrom(final Adaptable adaptable) {
return adaptable.getPayload().getPath()
.getFeatureId()
.orElseThrow(() -> JsonParseException.newBuilder().build());
}
protected static HttpStatusCode statusCodeFrom(final Adaptable adaptable) {
return adaptable.getPayload().getStatus().orElse(null);
}
protected static String namespaceFrom(final Adaptable adaptable) {
final String namespace = adaptable.getTopicPath().getNamespace();
return "_".equals(namespace) ? null : namespace;
}
protected static String leafValue(final JsonPointer path) {
return path.getLeaf().orElseThrow(() -> UnknownPathException.newBuilder(path).build()).toString();
}
protected static TopicPath.Action getAction(final TopicPath topicPath) {
return topicPath.getAction()
.orElseThrow(() -> new NullPointerException("TopicPath did not contain an Action!"));
}
protected abstract Adaptable constructAdaptable(final T signal, final TopicPath.Channel channel);
protected abstract String getType(Adaptable adaptable);
/*
* injects header reading phase to parsing of protocol messages.
*/
@Override
public final T fromAdaptable(final Adaptable externalAdaptable) {
checkNotNull(externalAdaptable, "Adaptable");
// get type from external adaptable before header filtering in case some headers exist for external messages
// but not internally in Ditto.
final String type = getType(externalAdaptable);
// filter headers by header translator, then inject any missing information from topic path
final DittoHeaders externalHeaders = externalAdaptable.getHeaders().orElse(DittoHeaders.empty());
final DittoHeaders filteredHeaders = addTopicPathInfo(
headerTranslator.fromExternalHeaders(externalHeaders),
externalAdaptable.getTopicPath());
final JsonifiableMapper<T> jsonifiableMapper = mappingStrategies.get(type);
if (null == jsonifiableMapper) {
throw UnknownTopicPathException.fromTopicAndPath(externalAdaptable.getTopicPath(),
externalAdaptable.getPayload().getPath(), filteredHeaders);
}
final Adaptable adaptable = externalAdaptable.setDittoHeaders(filteredHeaders);
return DittoJsonException.wrapJsonRuntimeException(() -> jsonifiableMapper.map(adaptable));
}
/**
* Add to headers any information that will be missing from topic path.
*
* @param filteredHeaders headers read from external headers.
* @param topicPath topic path of an adaptable.
* @return filteredHeaders with extra information from topicPath.
*/
private static DittoHeaders addTopicPathInfo(final DittoHeaders filteredHeaders, final TopicPath topicPath) {
final DittoHeaders extraInfo = mapTopicPathToHeaders(topicPath);
return extraInfo.isEmpty() ? filteredHeaders : filteredHeaders.toBuilder().putHeaders(extraInfo).build();
}
/**
* Add any extra information in topic path as Ditto headers. Currently "channel" is the only relevant header.
*
* @param topicPath the topic path to extract information from.
* @return headers containing extra information from topic path.
*/
private static DittoHeaders mapTopicPathToHeaders(final TopicPath topicPath) {
final DittoHeadersBuilder<?, ?> headersBuilder = DittoHeaders.newBuilder();
if (topicPath.getNamespace() != null && topicPath.getId() != null) {
// add thing ID for known topic-paths for error reporting.
headersBuilder.putHeader(MessageHeaderDefinition.THING_ID.getKey(),
topicPath.getNamespace() + ":" + topicPath.getId());
}
if (topicPath.getChannel() == TopicPath.Channel.LIVE) {
headersBuilder.channel(TopicPath.Channel.LIVE.getName());
}
return headersBuilder.build();
}
/*
* inject header publishing phase to creation of protocol messages.
*/
@Override
public final Adaptable toAdaptable(final T signal, final TopicPath.Channel channel) {
final Adaptable adaptable = constructAdaptable(signal, channel);
final Map<String, String> externalHeaders = headerTranslator.toExternalHeaders(adaptable.getDittoHeaders());
return adaptable.setDittoHeaders(DittoHeaders.of(externalHeaders));
}
protected final HeaderTranslator headerTranslator() {
return headerTranslator;
}
/**
* Returns the given String {@code s} with an upper case first letter.
*
* @param s the String.
* @return the upper case String.
*/
protected static String upperCaseFirst(final String s) {
if (s.isEmpty()) {
return s;
}
final char[] chars = s.toCharArray();
chars[0] = Character.toUpperCase(chars[0]);
return new String(chars);
}
}