Skip to content

Commit

Permalink
Switched to generic implementation of MQTT with backpressure.
Browse files Browse the repository at this point in the history
* Legacy MQTT implementation still has to be removed after testing.
* Refactored `GenericMqttClientFactory` to be a utility class instead of an interface because it did not make sense to have an instance of this factory. This entailed some static mocking in `GenericMqttClientActorTest` and `ConnectionTesterTest`.
* Provide `MqttSpecificConfig` to `HiveMqttClientProperties` because this config is required in `GenericMqttActor` anyway. This prevents duplicate creation of `MqttSpecificConfig`.
* Implemented possibility of reconnecting of consumer client to `GenericMqttActor`.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed May 31, 2022
1 parent f967c0b commit 2253bd4
Show file tree
Hide file tree
Showing 25 changed files with 939 additions and 459 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.service.messaging.amqp.AmqpClientActor;
import org.eclipse.ditto.connectivity.service.messaging.httppush.HttpPushClientActor;
import org.eclipse.ditto.connectivity.service.messaging.kafka.KafkaClientActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.HiveMqtt3ClientActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.HiveMqtt5ClientActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.GenericMqttClientActor;
import org.eclipse.ditto.connectivity.service.messaging.rabbitmq.RabbitMQClientActor;

import com.typesafe.config.Config;
Expand Down Expand Up @@ -55,43 +53,41 @@ public static DefaultClientActorPropsFactory getInstance() {
}

@Override
public Props getActorPropsForType(final Connection connection, final ActorRef proxyActor,
public Props getActorPropsForType(final Connection connection,
final ActorRef proxyActor,
final ActorRef connectionActor,
final ActorSystem actorSystem,
final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {

final ConnectionType connectionType = connection.getConnectionType();
final Props result;
switch (connectionType) {
case AMQP_091:
result = RabbitMQClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case AMQP_10:
result = AmqpClientActor.props(connection, proxyActor, connectionActor, connectivityConfigOverwrites,
actorSystem, dittoHeaders);
break;
case MQTT:
result = HiveMqtt3ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case MQTT_5:
result = HiveMqtt5ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case KAFKA:
result = KafkaClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case HTTP_PUSH:
result = HttpPushClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
default:
throw new IllegalArgumentException("ConnectionType <" + connectionType + "> is not supported.");
}
return result;
return switch (connection.getConnectionType()) {
case AMQP_091 -> RabbitMQClientActor.props(connection,
proxyActor,
connectionActor,
dittoHeaders,
connectivityConfigOverwrites);
case AMQP_10 -> AmqpClientActor.props(connection,
proxyActor,
connectionActor,
connectivityConfigOverwrites,
actorSystem,
dittoHeaders);
case MQTT, MQTT_5 -> GenericMqttClientActor.props(connection,
proxyActor,
connectionActor,
dittoHeaders,
connectivityConfigOverwrites);
case KAFKA -> KafkaClientActor.props(connection,
proxyActor,
connectionActor,
dittoHeaders,
connectivityConfigOverwrites);
case HTTP_PUSH -> HttpPushClientActor.props(connection,
proxyActor,
connectionActor,
dittoHeaders,
connectivityConfigOverwrites);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,20 @@ public boolean separatePublisherClient() {
/**
* @return how long to wait before reconnect a consumer client for redelivery.
*/
// TODO jff delete as soon as unused.
public Duration getReconnectForDeliveryDelay() {
return specificConfig.getDuration(RECONNECT_FOR_REDELIVERY_DELAY);
}

/**
* Returns the delay how long to wait before reconnecting a consumer client for redelivery.
*
* @return the reconnect delay.
*/
public ReconnectDelay getReconnectForDeliveryDelayNg() {
return ReconnectDelay.ofOrLowerBoundary(specificConfig.getDuration(RECONNECT_FOR_REDELIVERY_DELAY));
}

/**
* @return the optional clientId which should be used by the MQTT client when connecting to the MQTT broker.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.common.ConditionChecker;

/**
* Delay how long to wait before reconnecting an MQTT client.
* This delay is guaranteed
*/
@Immutable
public final class ReconnectDelay implements Comparable<ReconnectDelay> {

/**
* The lower boundary a {@code ReconnectDelay} a never falls below.
*/
static final Duration LOWER_BOUNDARY = Duration.ofSeconds(1L);

private final Duration duration;

private ReconnectDelay(final Duration duration) {
this.duration = duration;
}

/**
* Returns an instance of {@code ReconnectDelay} for the specified {@code Duration} argument or
* {@link #LOWER_BOUNDARY} if the argument is less than the lower boundary.
*
* @param duration the duration of the returned {@code ReconnectDelay}.
* @return the instance.
* @throws NullPointerException if {@code duration} is {@code null}.
*/
static ReconnectDelay ofOrLowerBoundary(final Duration duration) {
final Duration durationWithinLowerBoundary;
if (0 < LOWER_BOUNDARY.compareTo(ConditionChecker.checkNotNull(duration, "duration"))) {
durationWithinLowerBoundary = LOWER_BOUNDARY;
} else {
durationWithinLowerBoundary = duration;
}
return new ReconnectDelay(durationWithinLowerBoundary);
}

public Duration getDuration() {
return duration;
}

@Override
public int compareTo(final ReconnectDelay o) {
ConditionChecker.checkNotNull(o, "o");
return duration.compareTo(o.getDuration());
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final var that = (ReconnectDelay) o;
return Objects.equals(duration, that.duration);
}

@Override
public int hashCode() {
return Objects.hash(duration);
}

/**
* @return the string representation of the wrapped {@code Duration}.
*/
@Override
public String toString() {
return duration.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@
import org.eclipse.ditto.connectivity.service.messaging.ChildActorNanny;
import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.KeepAliveInterval;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.AllSubscriptionsFailedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClient;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClientFactory;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.HiveMqttClientProperties;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttSubscribeException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SomeSubscriptionsFailedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SubscriptionStatus;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.MqttConsumerActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.connect.GenericMqttConnect;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing.MqttPublisherActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.AllSubscriptionsFailedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttSubscribeException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.MqttSubscriber;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SomeSubscriptionsFailedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.SubscribeResult;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SubscriptionStatus;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
Expand All @@ -61,7 +61,6 @@
@NotThreadSafe
final class ConnectionTester {

private final GenericMqttClientFactory genericMqttClientFactory;
private final HiveMqttClientProperties hiveMqttClientProperties;
private final Sink<Object, NotUsed> inboundMappingSink;
private final ConnectivityStatusResolver connectivityStatusResolver;
Expand All @@ -70,7 +69,6 @@ final class ConnectionTester {
private final ThreadSafeDittoLogger logger;

private ConnectionTester(final Builder builder) {
genericMqttClientFactory = checkNotNull(builder.genericMqttClientFactory, "genericMqttClientFactory");
hiveMqttClientProperties = checkNotNull(builder.hiveMqttClientProperties, "hiveMqttClientProperties");
inboundMappingSink = checkNotNull(builder.inboundMappingSink, "inboundMappingSink");
connectivityStatusResolver = checkNotNull(builder.connectivityStatusResolver, "connectivityStatusResolver");
Expand Down Expand Up @@ -110,7 +108,7 @@ CompletionStage<Status.Status> testConnection() {
private CompletionStage<GenericMqttClient> tryToGetGenericMqttClient() {
try {
return CompletableFuture.completedFuture(
genericMqttClientFactory.getGenericMqttClientForConnectionTesting(hiveMqttClientProperties)
GenericMqttClientFactory.getGenericMqttClientForConnectionTesting(hiveMqttClientProperties)
);
} catch (final Exception e) {
return CompletableFuture.failedFuture(e);
Expand Down Expand Up @@ -263,7 +261,6 @@ private static Throwable unwrapCauseIfCompletionException(final Throwable throwa
@NotThreadSafe
static final class Builder {

private GenericMqttClientFactory genericMqttClientFactory;
private HiveMqttClientProperties hiveMqttClientProperties;
private Sink<Object, NotUsed> inboundMappingSink;
private ConnectivityStatusResolver connectivityStatusResolver;
Expand All @@ -280,13 +277,6 @@ private Builder() {
correlationId = null;
}

Builder withGenericMqttClientFactory(
final GenericMqttClientFactory genericMqttClientFactory
) {
this.genericMqttClientFactory = checkNotNull(genericMqttClientFactory, "genericMqttClientFactory");
return this;
}

Builder withHiveMqttClientProperties(final HiveMqttClientProperties hiveMqttClientProperties) {
this.hiveMqttClientProperties = checkNotNull(hiveMqttClientProperties, "hiveMqttClientProperties");
return this;
Expand Down
Loading

0 comments on commit 2253bd4

Please sign in to comment.