Skip to content

Commit

Permalink
Rework consuming messages from MQTT broker
Browse files Browse the repository at this point in the history
  • Loading branch information
dimabarbul committed Nov 17, 2023
1 parent afeecbc commit ccee5d1
Show file tree
Hide file tree
Showing 11 changed files with 568 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.NoMqttConnectionException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.MqttConsumerActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.ReconnectConsumerClient;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.GenericMqttPublish;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing.MqttPublisherActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.MqttSubscriber;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.SubscribeResult;
Expand All @@ -72,6 +73,8 @@

import scala.concurrent.ExecutionContextExecutor;

import io.reactivex.disposables.Disposable;

/**
* Actor for handling connection to an MQTT broker for protocol versions 3 or 5.
*/
Expand All @@ -85,6 +88,7 @@ public final class MqttClientActor extends BaseClientActor {
private final AtomicBoolean automaticReconnect;
@Nullable private ActorRef publishingActorRef;
private final List<ActorRef> mqttConsumerActorRefs;
@Nullable private Disposable unsolicitedPublishesAutoAckSubscription;

@SuppressWarnings("java:S1144") // called by reflection
private MqttClientActor(final Connection connection,
Expand Down Expand Up @@ -230,11 +234,18 @@ protected CompletionStage<Void> stopConsuming() {
protected void cleanupResourcesForConnection() {
mqttConsumerActorRefs.forEach(this::stopChildActor);
stopChildActor(publishingActorRef);
if (unsolicitedPublishesAutoAckSubscription != null) {
unsolicitedPublishesAutoAckSubscription.dispose();
}
if (null != genericMqttClient) {
disableAutomaticReconnect();
genericMqttClient.disconnect();
if (genericMqttClient instanceof Disposable) {
((Disposable)genericMqttClient).dispose();
}
}

unsolicitedPublishesAutoAckSubscription = null;
genericMqttClient = null;
publishingActorRef = null;
mqttConsumerActorRefs.clear();
Expand Down Expand Up @@ -439,6 +450,13 @@ protected CompletionStage<Status.Status> startPublisherActor() {
protected CompletionStage<Status.Status> startConsumerActors(@Nullable final ClientConnected clientConnected) {
return subscribe()
.thenCompose(this::handleSourceSubscribeResults)
.thenApply(actors -> {
if (null != genericMqttClient) {
subscribeToAcknowledgeUnsolicitedPublishes();
genericMqttClient.stopBufferingPublishes();
}
return actors;
})
.thenApply(actorRefs -> {
mqttConsumerActorRefs.addAll(actorRefs);
return DONE;
Expand All @@ -460,6 +478,23 @@ private CompletionStage<Source<SubscribeResult, NotUsed>> subscribe() {
return result;
}

private void subscribeToAcknowledgeUnsolicitedPublishes() {

final var subscribedTopics = connection().getSources()
.stream().flatMap(s -> s.getAddresses().stream().map(MqttTopicFilter::of))
.toList();

unsolicitedPublishesAutoAckSubscription = genericMqttClient.consumeSubscribedPublishesWithManualAcknowledgement()
.filter(p -> messageHasNoMatchingSubscription(p, subscribedTopics))
.subscribe(GenericMqttPublish::acknowledge,
p -> logger.info("Failed to read unsolicited publish: <{}>", p));
}

private boolean messageHasNoMatchingSubscription(final GenericMqttPublish genericMqttPublish,
final List<MqttTopicFilter> topicFilters) {
return topicFilters.stream().noneMatch(filter -> filter.matches(genericMqttPublish.getTopic()));
}

private CompletionStage<List<ActorRef>> handleSourceSubscribeResults(
final Source<SubscribeResult, NotUsed> sourceSubscribeResults
) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.GenericMqttPublish;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;

import io.reactivex.Flowable;

/**
* Base implementation of {@link GenericMqttConsumingClient}.
*/
abstract class BaseGenericMqttConsumingClient<C extends MqttClient> implements GenericMqttConsumingClient {

private final C mqttClient;

private BaseGenericMqttConsumingClient(final C mqttClient) {
this.mqttClient = mqttClient;
}

static BaseGenericMqttConsumingClient<Mqtt3RxClient> ofMqtt3RxClient(
final Mqtt3RxClient mqtt3RxClient
) {
return new Mqtt3ConsumingClient(mqtt3RxClient);
}

static BaseGenericMqttConsumingClient<Mqtt5RxClient> ofMqtt5RxClient(
final Mqtt5RxClient mqtt5RxClient
) {
return new Mqtt5ConsumingClient(mqtt5RxClient);
}

@Override
public String toString() {
final var mqttClientConfig = mqttClient.getConfig();
final var clientIdentifier = mqttClientConfig.getClientIdentifier();
return clientIdentifier.toString();
}

private static final class Mqtt3ConsumingClient extends BaseGenericMqttConsumingClient<Mqtt3RxClient> {

private final BufferingFlowableWrapper<Mqtt3Publish> bufferingFlowableWrapper;
private boolean isDisposed = false;

private Mqtt3ConsumingClient(final Mqtt3RxClient mqtt3RxClient) {
super(ConditionChecker.checkNotNull(mqtt3RxClient, "mqtt3RxClient"));

bufferingFlowableWrapper = BufferingFlowableWrapper.of(mqtt3RxClient.publishes(MqttGlobalPublishFilter.ALL, true));
}

@Override
public Flowable<GenericMqttPublish> consumePublishes() {
return bufferingFlowableWrapper.toFlowable().map(GenericMqttPublish::ofMqtt3Publish);
}

@Override
public void stopBufferingPublishes() {
bufferingFlowableWrapper.stopBuffering();
}

@Override
public void dispose() {
bufferingFlowableWrapper.dispose();
isDisposed = true;
}

@Override
public boolean isDisposed() {
return isDisposed;
}

}

private static final class Mqtt5ConsumingClient extends BaseGenericMqttConsumingClient<Mqtt5RxClient> {

private final BufferingFlowableWrapper<Mqtt5Publish> bufferingFlowableWrapper;
private boolean isDisposed = false;

private Mqtt5ConsumingClient(final Mqtt5RxClient mqtt5RxClient) {
super(checkNotNull(mqtt5RxClient, "mqtt5RxClient"));

bufferingFlowableWrapper = BufferingFlowableWrapper.of(mqtt5RxClient.publishes(MqttGlobalPublishFilter.ALL, true));
}

@Override
public Flowable<GenericMqttPublish> consumePublishes() {
return bufferingFlowableWrapper.toFlowable().map(GenericMqttPublish::ofMqtt5Publish);
}

@Override
public void stopBufferingPublishes() {
bufferingFlowableWrapper.stopBuffering();
}

@Override
public void dispose() {
bufferingFlowableWrapper.dispose();
isDisposed = true;
}

@Override
public boolean isDisposed() {
return isDisposed;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.subscribe.GenericMqttSubscription;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttTopicFilter;
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient;
import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3SubAckException;
Expand All @@ -38,24 +37,29 @@
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.SingleTransformer;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.SingleSubject;

/**
* Generic client for subscribing to topics at the MQTT broker.
*/
abstract class BaseGenericMqttSubscribingClient<C extends MqttClient>
implements GenericMqttConnectableClient, GenericMqttSubscribingClient {
implements GenericMqttConnectableClient, GenericMqttSubscribingClient, Disposable {

private final C mqttClient;
private final GenericMqttConnectableClient connectingClient;
private final GenericMqttConsumingClient consumingClient;
private final ClientRole clientRole;
private boolean isDisposed = false;

private BaseGenericMqttSubscribingClient(final C mqttClient,
final GenericMqttConnectableClient connectingClient,
final GenericMqttConsumingClient consumingClient,
final ClientRole clientRole) {

this.mqttClient = mqttClient;
this.connectingClient = connectingClient;
this.consumingClient = consumingClient;
this.clientRole = checkNotNull(clientRole, "clientRole");
}

Expand All @@ -73,6 +77,7 @@ static BaseGenericMqttSubscribingClient<Mqtt3RxClient> ofMqtt3RxClient(final Mqt
checkNotNull(mqtt3RxClient, "mqtt3RxClient");
return new Mqtt3RxSubscribingClient(mqtt3RxClient,
BaseGenericMqttConnectableClient.ofMqtt3AsyncClient(mqtt3RxClient.toAsync()),
BaseGenericMqttConsumingClient.ofMqtt3RxClient(mqtt3RxClient),
clientRole);
}

Expand All @@ -90,6 +95,7 @@ static BaseGenericMqttSubscribingClient<Mqtt5RxClient> ofMqtt5RxClient(final Mqt
checkNotNull(mqtt5RxClient, "mqtt5RxClient");
return new Mqtt5RxSubscribingClient(mqtt5RxClient,
BaseGenericMqttConnectableClient.ofMqtt5AsyncClient(mqtt5RxClient.toAsync()),
BaseGenericMqttConsumingClient.ofMqtt5RxClient(mqtt5RxClient),
clientRole);
}

Expand Down Expand Up @@ -167,12 +173,13 @@ private static List<SubscriptionStatus> getFailedSubscriptionStatuses(

@Override
public Flowable<GenericMqttPublish> consumeSubscribedPublishesWithManualAcknowledgement() {
return consumeIncomingPublishes(mqttClient, MqttGlobalPublishFilter.SUBSCRIBED, true);
return consumingClient.consumePublishes();
}

protected abstract Flowable<GenericMqttPublish> consumeIncomingPublishes(C mqttClient,
MqttGlobalPublishFilter filter,
boolean manualAcknowledgement);
@Override
public void stopBufferingPublishes() {
consumingClient.stopBufferingPublishes();
}

@Override
public CompletionStage<Void> connect(final GenericMqttConnect genericMqttConnect) {
Expand All @@ -184,6 +191,17 @@ public CompletionStage<Void> disconnect() {
return connectingClient.disconnect();
}

@Override
public void dispose() {
consumingClient.dispose();
isDisposed = true;
}

@Override
public boolean isDisposed() {
return isDisposed;
}

@Override
public String toString() {
final var mqttClientConfig = mqttClient.getConfig();
Expand All @@ -195,9 +213,10 @@ private static final class Mqtt3RxSubscribingClient extends BaseGenericMqttSubsc

private Mqtt3RxSubscribingClient(final Mqtt3RxClient mqtt3RxClient,
final GenericMqttConnectableClient genericMqttConnectingClient,
final GenericMqttConsumingClient consumingClient,
final ClientRole clientRole) {

super(mqtt3RxClient, genericMqttConnectingClient, clientRole);
super(mqtt3RxClient, genericMqttConnectingClient, consumingClient, clientRole);
}

@Override
Expand Down Expand Up @@ -235,23 +254,16 @@ Completable sendUnsubscribe(final Mqtt3RxClient mqtt3RxClient, final MqttTopicFi
}
}

@Override
protected Flowable<GenericMqttPublish> consumeIncomingPublishes(final Mqtt3RxClient mqtt3RxClient,
final MqttGlobalPublishFilter filter,
final boolean manualAcknowledgement) {

return mqtt3RxClient.publishes(filter, manualAcknowledgement).map(GenericMqttPublish::ofMqtt3Publish);
}

}

private static final class Mqtt5RxSubscribingClient extends BaseGenericMqttSubscribingClient<Mqtt5RxClient> {

private Mqtt5RxSubscribingClient(final Mqtt5RxClient mqtt5RxClient,
final GenericMqttConnectableClient genericMqttConnectingClient,
final GenericMqttConsumingClient consumingClient,
final ClientRole clientRole) {

super(mqtt5RxClient, genericMqttConnectingClient, clientRole);
super(mqtt5RxClient, genericMqttConnectingClient, consumingClient, clientRole);
}

@Override
Expand Down Expand Up @@ -284,14 +296,6 @@ Completable sendUnsubscribe(final Mqtt5RxClient mqtt5RxClient, final MqttTopicFi
return mqtt5RxClient.unsubscribe(unsubscribe).ignoreElement();
}

@Override
protected Flowable<GenericMqttPublish> consumeIncomingPublishes(final Mqtt5RxClient mqtt5RxClient,
final MqttGlobalPublishFilter filter,
final boolean manualAcknowledgement) {

return mqtt5RxClient.publishes(filter, manualAcknowledgement).map(GenericMqttPublish::ofMqtt5Publish);
}

}

}
Loading

0 comments on commit ccee5d1

Please sign in to comment.