Skip to content

Commit

Permalink
Switched to generic implementation of MQTT with backpressure.
Browse files Browse the repository at this point in the history
* Legacy MQTT implementation still has to be removed after testing.
* Refactored `GenericMqttClientFactory` to be a utility class instead of an interface because it did not make sense to have an instance of this factory. This entailed some static mocking in `GenericMqttClientActorTest` and `ConnectionTesterTest`.
* Provide `MqttSpecificConfig` to `HiveMqttClientProperties` because this config is required in `GenericMqttActor` anyway. This prevents duplicate creation of `MqttSpecificConfig`.
* Implemented possibility of reconnecting of consumer client to `GenericMqttActor`.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed May 31, 2022
1 parent f967c0b commit 2253bd4
Show file tree
Hide file tree
Showing 25 changed files with 939 additions and 459 deletions.
Expand Up @@ -17,12 +17,10 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.service.messaging.amqp.AmqpClientActor;
import org.eclipse.ditto.connectivity.service.messaging.httppush.HttpPushClientActor;
import org.eclipse.ditto.connectivity.service.messaging.kafka.KafkaClientActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.HiveMqtt3ClientActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.HiveMqtt5ClientActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.GenericMqttClientActor;
import org.eclipse.ditto.connectivity.service.messaging.rabbitmq.RabbitMQClientActor;

import com.typesafe.config.Config;
Expand Down Expand Up @@ -55,43 +53,41 @@ public static DefaultClientActorPropsFactory getInstance() {
}

@Override
public Props getActorPropsForType(final Connection connection, final ActorRef proxyActor,
public Props getActorPropsForType(final Connection connection,
final ActorRef proxyActor,
final ActorRef connectionActor,
final ActorSystem actorSystem,
final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {

final ConnectionType connectionType = connection.getConnectionType();
final Props result;
switch (connectionType) {
case AMQP_091:
result = RabbitMQClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case AMQP_10:
result = AmqpClientActor.props(connection, proxyActor, connectionActor, connectivityConfigOverwrites,
actorSystem, dittoHeaders);
break;
case MQTT:
result = HiveMqtt3ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case MQTT_5:
result = HiveMqtt5ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case KAFKA:
result = KafkaClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case HTTP_PUSH:
result = HttpPushClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
default:
throw new IllegalArgumentException("ConnectionType <" + connectionType + "> is not supported.");
}
return result;
return switch (connection.getConnectionType()) {
case AMQP_091 -> RabbitMQClientActor.props(connection,
proxyActor,
connectionActor,
dittoHeaders,
connectivityConfigOverwrites);
case AMQP_10 -> AmqpClientActor.props(connection,
proxyActor,
connectionActor,
connectivityConfigOverwrites,
actorSystem,
dittoHeaders);
case MQTT, MQTT_5 -> GenericMqttClientActor.props(connection,
proxyActor,
connectionActor,
dittoHeaders,
connectivityConfigOverwrites);
case KAFKA -> KafkaClientActor.props(connection,
proxyActor,
connectionActor,
dittoHeaders,
connectivityConfigOverwrites);
case HTTP_PUSH -> HttpPushClientActor.props(connection,
proxyActor,
connectionActor,
dittoHeaders,
connectivityConfigOverwrites);
};
}

}
Expand Up @@ -109,10 +109,20 @@ public boolean separatePublisherClient() {
/**
* @return how long to wait before reconnect a consumer client for redelivery.
*/
// TODO jff delete as soon as unused.
public Duration getReconnectForDeliveryDelay() {
return specificConfig.getDuration(RECONNECT_FOR_REDELIVERY_DELAY);
}

/**
* Returns the delay how long to wait before reconnecting a consumer client for redelivery.
*
* @return the reconnect delay.
*/
public ReconnectDelay getReconnectForDeliveryDelayNg() {
return ReconnectDelay.ofOrLowerBoundary(specificConfig.getDuration(RECONNECT_FOR_REDELIVERY_DELAY));
}

/**
* @return the optional clientId which should be used by the MQTT client when connecting to the MQTT broker.
*/
Expand Down
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.common.ConditionChecker;

/**
* Delay how long to wait before reconnecting an MQTT client.
* This delay is guaranteed
*/
@Immutable
public final class ReconnectDelay implements Comparable<ReconnectDelay> {

/**
* The lower boundary a {@code ReconnectDelay} a never falls below.
*/
static final Duration LOWER_BOUNDARY = Duration.ofSeconds(1L);

private final Duration duration;

private ReconnectDelay(final Duration duration) {
this.duration = duration;
}

/**
* Returns an instance of {@code ReconnectDelay} for the specified {@code Duration} argument or
* {@link #LOWER_BOUNDARY} if the argument is less than the lower boundary.
*
* @param duration the duration of the returned {@code ReconnectDelay}.
* @return the instance.
* @throws NullPointerException if {@code duration} is {@code null}.
*/
static ReconnectDelay ofOrLowerBoundary(final Duration duration) {
final Duration durationWithinLowerBoundary;
if (0 < LOWER_BOUNDARY.compareTo(ConditionChecker.checkNotNull(duration, "duration"))) {
durationWithinLowerBoundary = LOWER_BOUNDARY;
} else {
durationWithinLowerBoundary = duration;
}
return new ReconnectDelay(durationWithinLowerBoundary);
}

public Duration getDuration() {
return duration;
}

@Override
public int compareTo(final ReconnectDelay o) {
ConditionChecker.checkNotNull(o, "o");
return duration.compareTo(o.getDuration());
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final var that = (ReconnectDelay) o;
return Objects.equals(duration, that.duration);
}

@Override
public int hashCode() {
return Objects.hash(duration);
}

/**
* @return the string representation of the wrapped {@code Duration}.
*/
@Override
public String toString() {
return duration.toString();
}

}
Expand Up @@ -31,18 +31,18 @@
import org.eclipse.ditto.connectivity.service.messaging.ChildActorNanny;
import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.KeepAliveInterval;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.AllSubscriptionsFailedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClient;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClientFactory;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.HiveMqttClientProperties;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttSubscribeException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SomeSubscriptionsFailedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SubscriptionStatus;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.MqttConsumerActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.connect.GenericMqttConnect;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing.MqttPublisherActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.AllSubscriptionsFailedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttSubscribeException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.MqttSubscriber;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SomeSubscriptionsFailedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.SubscribeResult;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SubscriptionStatus;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
Expand All @@ -61,7 +61,6 @@
@NotThreadSafe
final class ConnectionTester {

private final GenericMqttClientFactory genericMqttClientFactory;
private final HiveMqttClientProperties hiveMqttClientProperties;
private final Sink<Object, NotUsed> inboundMappingSink;
private final ConnectivityStatusResolver connectivityStatusResolver;
Expand All @@ -70,7 +69,6 @@ final class ConnectionTester {
private final ThreadSafeDittoLogger logger;

private ConnectionTester(final Builder builder) {
genericMqttClientFactory = checkNotNull(builder.genericMqttClientFactory, "genericMqttClientFactory");
hiveMqttClientProperties = checkNotNull(builder.hiveMqttClientProperties, "hiveMqttClientProperties");
inboundMappingSink = checkNotNull(builder.inboundMappingSink, "inboundMappingSink");
connectivityStatusResolver = checkNotNull(builder.connectivityStatusResolver, "connectivityStatusResolver");
Expand Down Expand Up @@ -110,7 +108,7 @@ CompletionStage<Status.Status> testConnection() {
private CompletionStage<GenericMqttClient> tryToGetGenericMqttClient() {
try {
return CompletableFuture.completedFuture(
genericMqttClientFactory.getGenericMqttClientForConnectionTesting(hiveMqttClientProperties)
GenericMqttClientFactory.getGenericMqttClientForConnectionTesting(hiveMqttClientProperties)
);
} catch (final Exception e) {
return CompletableFuture.failedFuture(e);
Expand Down Expand Up @@ -263,7 +261,6 @@ private static Throwable unwrapCauseIfCompletionException(final Throwable throwa
@NotThreadSafe
static final class Builder {

private GenericMqttClientFactory genericMqttClientFactory;
private HiveMqttClientProperties hiveMqttClientProperties;
private Sink<Object, NotUsed> inboundMappingSink;
private ConnectivityStatusResolver connectivityStatusResolver;
Expand All @@ -280,13 +277,6 @@ private Builder() {
correlationId = null;
}

Builder withGenericMqttClientFactory(
final GenericMqttClientFactory genericMqttClientFactory
) {
this.genericMqttClientFactory = checkNotNull(genericMqttClientFactory, "genericMqttClientFactory");
return this;
}

Builder withHiveMqttClientProperties(final HiveMqttClientProperties hiveMqttClientProperties) {
this.hiveMqttClientProperties = checkNotNull(hiveMqttClientProperties, "hiveMqttClientProperties");
return this;
Expand Down

0 comments on commit 2253bd4

Please sign in to comment.