-
Notifications
You must be signed in to change notification settings - Fork 214
/
MqttSubscriber.java
172 lines (153 loc) · 8.37 KB
/
MqttSubscriber.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import java.text.MessageFormat;
import java.util.List;
import java.util.Optional;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttSubscribingClient;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttSubscribeException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.GenericMqttPublish;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.subscribe.GenericMqttSubscribe;
import com.hivemq.client.mqtt.datatypes.MqttTopicFilter;
import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.javadsl.Source;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
/**
* This utility class facilitates subscribing for topics derived from the addresses of connection
* {@link org.eclipse.ditto.connectivity.model.Source}s at the MQTT broker.
*/
public final class MqttSubscriber {
private final GenericMqttSubscribingClient subscribingClient;
private MqttSubscriber(final GenericMqttSubscribingClient subscribingClient) {
this.subscribingClient = subscribingClient;
}
/**
* Returns a new instance of {@code MqttSubscriber} for the specified {@code GenericMqttSubscribingClient}.
*
* @param genericMqttSubscribingClient the client to be used for subscribing to topics at the broker and for
* consuming incoming Publish message for the subscribed topics.
* @return the instance.
* @throws NullPointerException if {@code genericMqttSubscribingClient} is {@code null}.
*/
public static MqttSubscriber newInstance(final GenericMqttSubscribingClient genericMqttSubscribingClient) {
return new MqttSubscriber(checkNotNull(genericMqttSubscribingClient, "genericMqttSubscribingClient"));
}
/**
* Subscribes the specified {@code GenericMqttSubscribingClient} for the addresses of the specified connection
* sources.
* For each connection source an MQTT Subscribe message is created and sent to the broker by the client.
* The MQTT Subscribe message contains an MQTT Subscription for each address of the connection source where the
* address is regarded as MQTT filter topic and the MQTT QoS is taken from the connection source as provided.
* The returned Akka stream contains the results of the client's subscribing for each connection source.
* If a connection source does not provide any addresses then no Subscribe message is created for that source –
* thus, there is no {@code SubscribeResult} in the returned Akka stream for that connection source.
* A connection source address might not be a valid MQTT filter topic.
* In this case the SubscribeResult for the associated connection source is a failure.
* <p>
* A {@code SubscribeResult} is only then successful if all subscriptions to its connection source addresses
* succeeded.
*
* @param connectionSources the connection sources to subscribe for.
* @return an Akka stream containing the client subscribing results with their associated connection sources.
* @throws NullPointerException if any argument is {@code null}.
*/
public Source<SubscribeResult, NotUsed> subscribeForConnectionSources(
final List<org.eclipse.ditto.connectivity.model.Source> connectionSources
) {
checkNotNull(connectionSources, "connectionSources");
// Use Pairs to carry along associated connection Source.
return Source.fromIterator(connectionSources::iterator)
.map(MqttSubscriber::tryToGetGenericMqttSubscribe)
.map(optionalTryPair -> Pair.create(
optionalTryPair.first(),
optionalTryPair.second()
.map(optionalSubscribeMsg -> Source.fromJavaStream(optionalSubscribeMsg::stream))
.map(subscribeMsgSource -> subscribeMsgSource.flatMapConcat(
subscribeMsg -> subscribe(subscribeMsg, optionalTryPair.first()))
)
))
.flatMapConcat(pair -> pair.second()
.fold(
error -> getSubscribeFailureSource(pair.first(), error),
sourceSubscribeResultSource -> sourceSubscribeResultSource
));
}
private static Pair<org.eclipse.ditto.connectivity.model.Source, Try<Optional<GenericMqttSubscribe>>> tryToGetGenericMqttSubscribe(
final org.eclipse.ditto.connectivity.model.Source connectionSource
) {
try {
return Pair.create(
connectionSource,
new Success<>(GenericMqttSubscribeFactory.getGenericSourceSubscribeMessage(connectionSource))
);
} catch (final InvalidMqttTopicFilterStringException e) {
return Pair.create(connectionSource, new Failure<>(e));
}
}
private Source<SubscribeResult, NotUsed> subscribe(final GenericMqttSubscribe genericMqttSubscribe,
final org.eclipse.ditto.connectivity.model.Source connectionSource) {
return Source.fromPublisher(subscribingClient.subscribe(genericMqttSubscribe) // <- there
.map(unused -> consumeIncomingPublishesForSubscribedTopics(connectionSource))
.onErrorReturn(error -> getSubscribeFailureResult(connectionSource, error))
.toFlowable());
}
private SubscribeResult consumeIncomingPublishesForSubscribedTopics(
final org.eclipse.ditto.connectivity.model.Source connectionSource
) {
final List<MqttTopicFilter> topicFilters =
connectionSource.getAddresses().stream().map(MqttTopicFilter::of).toList();
return SubscribeSuccess.newInstance(connectionSource,
Source.fromPublisher(subscribingClient.consumeSubscribedPublishesWithManualAcknowledgement()
.filter(publish -> messageHasRightTopicPath(publish, topicFilters))));
}
/**
* Filters out messages which don't match any of the given topic filters. This is done because the HiveMQ API makes
* it hard to consume only messages which match specific topics in the first place.
*
* @param genericMqttPublish a consumed MQTT message.
* @param topicFilters the topic filters applied to consumed messages.
* @return whether the message matches any of the given topic filters.
*/
private boolean messageHasRightTopicPath(final GenericMqttPublish genericMqttPublish,
final List<MqttTopicFilter> topicFilters) {
return topicFilters.stream().anyMatch(filter -> filter.matches(genericMqttPublish.getTopic()));
}
private static SubscribeResult getSubscribeFailureResult(
final org.eclipse.ditto.connectivity.model.Source connectionSource,
final Throwable failure
) {
final SubscribeFailure result;
if (failure instanceof MqttSubscribeException mqttSubscribeException) {
result = SubscribeFailure.newInstance(connectionSource, mqttSubscribeException);
} else {
result = SubscribeFailure.newInstance(connectionSource, new MqttSubscribeException(failure));
}
return result;
}
private static Source<SubscribeResult, NotUsed> getSubscribeFailureSource(
final org.eclipse.ditto.connectivity.model.Source connectionSource,
final Throwable error
) {
return Source.single(
SubscribeFailure.newInstance(connectionSource, new MqttSubscribeException(
MessageFormat.format("Failed to instantiate {0}: {1}",
GenericMqttSubscribe.class.getSimpleName(),
error.getMessage()),
error
)));
}
}