-
Notifications
You must be signed in to change notification settings - Fork 215
/
MqttConsumerActor.java
349 lines (306 loc) · 17 KB
/
MqttConsumerActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MqttConfig;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.BaseConsumerActor;
import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.AcknowledgementUnsupportedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.GenericMqttPublish;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.ManualAcknowledgementDisabledException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.MessageAlreadyAcknowledgedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.MqttPublishTransformationException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.TransformationResult;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import akka.NotUsed;
import akka.actor.Props;
import akka.japi.function.Predicate;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
/**
* Actor that receives a stream of subscribed MQTT Publish messages, transforms them to {@link ExternalMessage}s and
* forwards them to {@link org.eclipse.ditto.connectivity.service.messaging.InboundMappingSink}.
*/
public final class MqttConsumerActor extends BaseConsumerActor {
private final OperationMode operationMode;
private final MqttConfig mqttConfig;
private final ThreadSafeDittoLoggingAdapter logger;
private final MqttSpecificConfig mqttSpecificConfig;
private KillSwitch killSwitch;
private Source<GenericMqttPublish, NotUsed> mqttPublishSource;
@SuppressWarnings("java:S1144")
private MqttConsumerActor(final Connection connection,
final Sink<Object, ?> inboundMappingSink,
final org.eclipse.ditto.connectivity.model.Source source,
final ConnectivityStatusResolver connectivityStatusResolver,
final ConnectivityConfig connectivityConfig,
final Source<GenericMqttPublish, NotUsed> mqttPublishSource,
final OperationMode operationMode) {
super(connection,
String.join(";", source.getAddresses()),
inboundMappingSink,
source,
connectivityStatusResolver,
connectivityConfig);
this.operationMode = operationMode;
final var connectionConfig = connectivityConfig.getConnectionConfig();
mqttConfig = connectionConfig.getMqttConfig();
logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
logger.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId());
logger.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_TYPE, connection.getConnectionType());
mqttSpecificConfig = MqttSpecificConfig.fromConnection(connection, mqttConfig);
killSwitch = null;
this.mqttPublishSource = mqttPublishSource;
}
/**
* Returns the {@code Props} for creating an {@code MqttConsumerActor} with the specified arguments.
* The once created consumer actor operates in 'dry-run' mode, i.e. it drops all MQTT publish messages it receives.
* The dropped messages will be logged.
*
* @param connection the connection the consumer actor belongs to.
* @param inboundMappingSink the mapping sink where received messages are forwarded to.
* @param connectionSource the connection source of the consumer actor.
* @param connectivityStatusResolver resolves occurred exceptions to a connectivity status.
* @param connectivityConfig the config of Connectivity service with potential overwrites.
* @param mqttPublishSource stream of received MQTT publish messages logs and drops.
* @throws NullPointerException if any argument is {@code null}.
*/
public static Props propsDryRun(final Connection connection,
final Sink<Object, ?> inboundMappingSink,
final org.eclipse.ditto.connectivity.model.Source connectionSource,
final ConnectivityStatusResolver connectivityStatusResolver,
final ConnectivityConfig connectivityConfig,
final Source<GenericMqttPublish, NotUsed> mqttPublishSource) {
return Props.create(
MqttConsumerActor.class,
ConditionChecker.checkNotNull(connection, "connection"),
ConditionChecker.checkNotNull(inboundMappingSink, "inboundMappingSink"),
ConditionChecker.checkNotNull(connectionSource, "connectionSource"),
ConditionChecker.checkNotNull(connectivityStatusResolver, "connectivityStatusResolver"),
ConditionChecker.checkNotNull(connectivityConfig, "connectivityConfig"),
ConditionChecker.checkNotNull(mqttPublishSource, "mqttPublishSource"),
OperationMode.DRY_RUN
);
}
/**
* Returns the {@code Props} for creating an {@code MqttConsumerActor} with the specified arguments.
* The once created consumer actor operates in 'processing' mode, i.e. it actually processes all MQTT publish
* messages it receives.
*
* @param connection the connection the consumer actor belongs to.
* @param inboundMappingSink the mapping sink where received messages are forwarded to.
* @param connectionSource the connection source of the consumer actor.
* @param connectivityStatusResolver resolves occurred exceptions to a connectivity status.
* @param connectivityConfig the config of Connectivity service with potential overwrites.
* @param mqttPublishSource stream of received MQTT publish messages the consumer actor processes.
* @throws NullPointerException if any argument is {@code null}.
*/
public static Props propsProcessing(final Connection connection,
final Sink<Object, ?> inboundMappingSink,
final org.eclipse.ditto.connectivity.model.Source connectionSource,
final ConnectivityStatusResolver connectivityStatusResolver,
final ConnectivityConfig connectivityConfig,
final Source<GenericMqttPublish, NotUsed> mqttPublishSource) {
return Props.create(
MqttConsumerActor.class,
ConditionChecker.checkNotNull(connection, "connection"),
ConditionChecker.checkNotNull(inboundMappingSink, "inboundMappingSink"),
ConditionChecker.checkNotNull(connectionSource, "connectionSource"),
ConditionChecker.checkNotNull(connectivityStatusResolver, "connectivityStatusResolver"),
ConditionChecker.checkNotNull(connectivityConfig, "connectivityConfig"),
ConditionChecker.checkNotNull(mqttPublishSource, "mqttPublishSource"),
OperationMode.PROCESSING
);
}
@Override
public void preStart() throws Exception {
throttleMqttPublishSourceIfThrottlingEnabled();
if (OperationMode.DRY_RUN == operationMode) {
killSwitch = dropAndLogMqttPublishes();
} else {
killSwitch = processMqttPublishes();
}
}
private void throttleMqttPublishSourceIfThrottlingEnabled() {
final var throttlingConfig = mqttConfig.getConsumerThrottlingConfig();
if (throttlingConfig.isEnabled()) {
mqttPublishSource = mqttPublishSource.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval());
}
}
private KillSwitch dropAndLogMqttPublishes() {
return mqttPublishSource.viaMat(KillSwitches.single(), Keep.right())
.to(Sink.foreach(publish -> logger.info("Operating in 'dry-run' mode, thus dropping <{}>.", publish)))
.run(getContext().getSystem());
}
private KillSwitch processMqttPublishes() {
final var mqttPublishTransformer = MqttPublishToExternalMessageTransformer.newInstance(sourceAddress, source);
// TODO jff backpressure should work this way, right?
return mqttPublishSource.viaMat(KillSwitches.single(), Keep.right())
.map(mqttPublishTransformer::transform)
.divertTo(getTransformationFailureSink(), TransformationResult::isFailure)
.to(getTransformationSuccessSink())
.run(getContext().getSystem());
}
private <T extends TransformationResult<GenericMqttPublish, ExternalMessage>> Sink<T, ?> getTransformationFailureSink() {
final Predicate<MqttPublishTransformationException> isCausedByDittoRuntimeException = exception -> {
final var cause = exception.getCause();
return cause instanceof DittoRuntimeException;
};
return Flow.<T, MqttPublishTransformationException>fromFunction(TransformationResult::getErrorOrThrow)
.divertTo(Flow.fromFunction(MqttConsumerActor::appendMqttPublishHeadersToDittoRuntimeException)
.to(getDittoRuntimeExceptionSink()),
isCausedByDittoRuntimeException)
.to(Sink.foreach(this::recordTransformationException));
}
private static DittoRuntimeException appendMqttPublishHeadersToDittoRuntimeException(
final MqttPublishTransformationException transformationException
) {
final var dittoRuntimeException = (DittoRuntimeException) transformationException.getCause();
return dittoRuntimeException.setDittoHeaders(dittoRuntimeException.getDittoHeaders()
.toBuilder()
.putHeaders(transformationException.getMqttPublishHeaders())
.build());
}
private void recordTransformationException(final MqttPublishTransformationException transformationException) {
final var mqttPublishHeaders = transformationException.getMqttPublishHeaders();
if (mqttPublishHeaders.isEmpty()) {
inboundMonitor.exception(transformationException.getCause());
} else {
inboundMonitor.exception(mqttPublishHeaders, transformationException.getCause());
}
}
private <T extends TransformationResult<GenericMqttPublish, ExternalMessage>> Sink<T, ?> getTransformationSuccessSink() {
return Flow.<T>create()
.alsoTo(Flow.<T, ExternalMessage>fromFunction(TransformationResult::getSuccessValueOrThrow)
.to(Sink.foreach(inboundMonitor::success)))
.map(this::getAcknowledgeableMessageForTransformationResult)
.to(getMessageMappingSink());
}
private AcknowledgeableMessage getAcknowledgeableMessageForTransformationResult(
final TransformationResult<GenericMqttPublish, ExternalMessage> transformationResult
) {
final var externalMessage = transformationResult.getSuccessValueOrThrow();
final var genericMqttPublish = transformationResult.getTransformationInput();
return AcknowledgeableMessage.of(externalMessage,
() -> tryToAcknowledgePublish(genericMqttPublish, externalMessage),
shouldRedeliver -> rejectIncomingMessage(shouldRedeliver, externalMessage, genericMqttPublish));
}
private void tryToAcknowledgePublish(final GenericMqttPublish mqttPublish, final ExternalMessage externalMessage) {
try {
acknowledgePublish(mqttPublish, externalMessage);
} catch (final ManualAcknowledgementDisabledException e) {
inboundAcknowledgedMonitor.exception(externalMessage, """
Manual acknowledgement of incoming message at topic <{0}> failed because manual acknowledgement \
is disabled for this source's MQTT subscription.\
""", mqttPublish.getTopic());
} catch (final MessageAlreadyAcknowledgedException e) {
inboundAcknowledgedMonitor.exception(externalMessage, """
Acknowledgement of incoming message at topic <{0}> failed because it was acknowledged already by \
another source.\
""", mqttPublish.getTopic());
} catch (final AcknowledgementUnsupportedException e) {
inboundAcknowledgedMonitor.exception(externalMessage,
"Manual acknowledgement of incoming message at topic <{0}> failed: {1}",
mqttPublish.getTopic(),
e.getMessage());
}
}
private void acknowledgePublish(final GenericMqttPublish mqttPublish, final ExternalMessage externalMessage)
throws ManualAcknowledgementDisabledException, MessageAlreadyAcknowledgedException,
AcknowledgementUnsupportedException {
mqttPublish.acknowledge();
inboundAcknowledgedMonitor.success(externalMessage, "Sending success acknowledgement.");
}
private void rejectIncomingMessage(final boolean shouldRedeliver,
final ExternalMessage externalMessage,
final GenericMqttPublish mqttPublish) {
if (shouldRedeliver) {
if (mqttSpecificConfig.reconnectForRedelivery()) {
inboundAcknowledgedMonitor.exception(externalMessage, "Unfulfilled acknowledgements are present," +
" restarting consumer client in order to get redeliveries.");
reconnectConsumerClientForRedelivery();
} else {
/*
* Strictly speaking one should not acknowledge a message for which
* a redelivery was asked for.
* The MQTT spec, however, does not define that a MQTT broker
* should redeliver messages if an acknowledgement was not
* received – *unless* the client reconnects – see option
* `reconnectForRedelivery` for getting reconnects.
*/
tryToAcknowledgePublish(mqttPublish, externalMessage);
inboundAcknowledgedMonitor.exception(externalMessage, """
Unfulfilled acknowledgements are present. \
Acknowledging the MQTT message despite redelivery was requested because MQTT broker would not \
redeliver the message without a reconnect from the client.\
""");
}
} else {
/*
* Acknowledge messages for which redelivery does not make sense
* (e.g. 400 bad request or 403 forbidden) as redelivering them
* will not solve any problem.
*/
tryToAcknowledgePublish(mqttPublish, externalMessage);
inboundAcknowledgedMonitor.exception(externalMessage, """
Unfulfilled acknowledgements are present. \
Acknowledging the MQTT message because redelivery was not requested.\
""");
}
}
private void reconnectConsumerClientForRedelivery() {
final var context = getContext();
final var parent = context.getParent();
parent.tell(ReconnectConsumerClient.of(mqttSpecificConfig.getReconnectForDeliveryDelay()), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RetrieveAddressStatus.class, msg -> getSender().tell(getCurrentSourceStatus(), getSelf()))
.match(GracefulStop.class, gracefulStop -> shutdown())
.build();
}
private void shutdown() {
if (null != killSwitch) {
killSwitch.shutdown();
}
final var context = getContext();
context.stop(self());
}
@Override
public void unhandled(final Object message) {
logger.info("Received unhandled message <{}>.", message);
super.unhandled(message);
}
@Override
protected ThreadSafeDittoLoggingAdapter log() {
return logger;
}
private enum OperationMode {
DRY_RUN,
PROCESSING
}
}