-
Notifications
You must be signed in to change notification settings - Fork 214
/
AbstractMessageMapper.java
161 lines (138 loc) · 6.15 KB
/
AbstractMessageMapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.mapping;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.MessageMappingFailedException;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.TopicPath;
import com.typesafe.config.Config;
import org.apache.pekko.actor.ActorSystem;
/**
* Abstract implementation of {@link MessageMapper} which adds an id field and also its initialization from mapping
* configuration (id is not passed as constructor argument because the mappers are created by reflection).
*/
public abstract class AbstractMessageMapper implements MessageMapper {
protected final ActorSystem actorSystem;
protected final Config config;
private String id;
private Map<String, String> incomingConditions;
private Map<String, String> outgoingConditions;
private Collection<String> contentTypeBlocklist;
protected AbstractMessageMapper(final ActorSystem actorSystem, final Config config) {
this.actorSystem = actorSystem;
this.config = config;
}
protected AbstractMessageMapper(final AbstractMessageMapper copyFromMapper) {
this.actorSystem = copyFromMapper.actorSystem;
this.config = copyFromMapper.config;
this.id = copyFromMapper.getId();
this.incomingConditions = copyFromMapper.getIncomingConditions();
this.outgoingConditions = copyFromMapper.getOutgoingConditions();
this.contentTypeBlocklist = copyFromMapper.getContentTypeBlocklist();
}
@Override
public String getId() {
return id;
}
@Override
public Map<String, String> getIncomingConditions() {
return incomingConditions;
}
@Override
public Map<String, String> getOutgoingConditions() {
return outgoingConditions;
}
@Override
public Collection<String> getContentTypeBlocklist() {
return contentTypeBlocklist;
}
@Override
public final void configure(final Connection connection,
final ConnectivityConfig connectivityConfig,
final MessageMapperConfiguration configuration,
final ActorSystem actorSystem) {
this.id = configuration.getId();
this.incomingConditions = configuration.getIncomingConditions();
this.outgoingConditions = configuration.getOutgoingConditions();
this.contentTypeBlocklist = configuration.getContentTypeBlocklist();
final MappingConfig mappingConfig = connectivityConfig.getMappingConfig();
doConfigure(connection, mappingConfig, configuration);
}
/**
* Applies the mapper specific configuration.
*
* @param connection the connection to apply the mapping for.
* @param mappingConfig the service configuration for the mapping.
* @param configuration the mapper specific configuration configured in scope of a single connection.
*/
protected void doConfigure(final Connection connection, final MappingConfig mappingConfig,
final MessageMapperConfiguration configuration) {
// noop default
}
/**
* Extracts the payload of the passed in {@code message} as string.
*
* @param message the external message to extract the payload from.
* @return the payload of the passed in {@code message} as string
* @throws MessageMappingFailedException if no payload was present or if it was empty.
*/
protected static String extractPayloadAsString(final ExternalMessage message) {
final Optional<String> payload;
if (message.isTextMessage()) {
payload = message.getTextPayload();
} else if (message.isBytesMessage()) {
final Charset charset = determineCharset(message.getHeaders());
payload = message.getBytePayload().map(charset::decode).map(CharBuffer::toString);
} else {
payload = Optional.empty();
}
return payload.filter(s -> !s.isEmpty()).orElseThrow(() ->
MessageMappingFailedException.newBuilder(message.findContentType().orElse(""))
.description(
"As payload was absent or empty, please make sure to send payload in your messages.")
.dittoHeaders(DittoHeaders.of(message.getHeaders()))
.build());
}
protected static Charset determineCharset(final Map<String, String> messageHeaders) {
return CharsetDeterminer.getInstance().apply(messageHeaders.get(ExternalMessage.CONTENT_TYPE_HEADER));
}
protected static boolean isResponse(final Adaptable adaptable) {
final var payload = adaptable.getPayload();
final var httpStatus = payload.getHttpStatus();
return httpStatus.isPresent();
}
protected static boolean isError(final Adaptable adaptable) {
final var topicPath = adaptable.getTopicPath();
return topicPath.isCriterion(TopicPath.Criterion.ERRORS);
}
protected static boolean isLiveSignal(final Adaptable adaptable) {
return adaptable.getTopicPath().isChannel(TopicPath.Channel.LIVE);
}
@Override
public String toString() {
return "id=" + id +
", incomingConditions=" + incomingConditions +
", outgoingConditions=" + outgoingConditions +
", contentTypeBlocklist=" + contentTypeBlocklist;
}
}