Skip to content

Commit

Permalink
applied manual reconnect for MQTT connections
Browse files Browse the repository at this point in the history
* triggering a reconnect based on the situation, e.g. for reconenct because of redelivery, directly reconnect without any delay
* don't setup new hivemq clients for the manual reconnect, but rather reconnect the existing ones
* applied some simplifications caused by this new approach

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed May 19, 2021
1 parent bed5bb2 commit 3ecf151
Show file tree
Hide file tree
Showing 19 changed files with 140 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.ditto.internal.utils.config.ScopedConfig;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

/**
* This class is the default implementation of {@link MqttConfig}.
Expand All @@ -30,13 +31,15 @@
public final class DefaultMqttConfig implements MqttConfig {

private static final String CONFIG_PATH = "mqtt";
private static final String RECONNECT_BACKOFF_PATH = "reconnect";

private final int sourceBufferSize;
private final int eventLoopThreads;
private final boolean cleanSession;
private final boolean reconnectForRedelivery;
private final boolean useSeparateClientForPublisher;
private final Duration reconnectForRedeliveryDelay;
private final BackOffConfig reconnectBackOffConfig;

private DefaultMqttConfig(final ScopedConfig config) {
sourceBufferSize = config.getInt(MqttConfigValue.SOURCE_BUFFER_SIZE.getConfigPath());
Expand All @@ -46,6 +49,9 @@ private DefaultMqttConfig(final ScopedConfig config) {
reconnectForRedeliveryDelay =
config.getDuration(MqttConfigValue.RECONNECT_FOR_REDELIVERY_DELAY.getConfigPath());
useSeparateClientForPublisher = config.getBoolean(MqttConfigValue.SEPARATE_PUBLISHER_CLIENT.getConfigPath());
reconnectBackOffConfig = DefaultBackOffConfig.of(config.hasPath(RECONNECT_BACKOFF_PATH)
? config.getConfig(RECONNECT_BACKOFF_PATH)
: ConfigFactory.parseString("backoff" + "={}"));
}

/**
Expand Down Expand Up @@ -89,6 +95,11 @@ public boolean shouldUseSeparatePublisherClient() {
return useSeparateClientForPublisher;
}

@Override
public BackOffConfig getReconnectBackOffConfig() {
return reconnectBackOffConfig;
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
Expand All @@ -103,13 +114,14 @@ public boolean equals(@Nullable final Object o) {
Objects.equals(cleanSession, that.cleanSession) &&
Objects.equals(reconnectForRedelivery, that.reconnectForRedelivery) &&
Objects.equals(reconnectForRedeliveryDelay, that.reconnectForRedeliveryDelay) &&
Objects.equals(useSeparateClientForPublisher, that.useSeparateClientForPublisher);
Objects.equals(useSeparateClientForPublisher, that.useSeparateClientForPublisher) &&
Objects.equals(reconnectBackOffConfig, that.reconnectBackOffConfig);
}

@Override
public int hashCode() {
return Objects.hash(sourceBufferSize, eventLoopThreads, cleanSession, reconnectForRedelivery,
reconnectForRedeliveryDelay, useSeparateClientForPublisher);
reconnectForRedeliveryDelay, useSeparateClientForPublisher, reconnectBackOffConfig);
}

@Override
Expand All @@ -121,6 +133,7 @@ public String toString() {
", reconnectForRedelivery=" + reconnectForRedelivery +
", reconnectForRedeliveryDelay=" + reconnectForRedeliveryDelay +
", useSeparateClientForPublisher=" + useSeparateClientForPublisher +
", reconnectBackOffConfig=" + reconnectBackOffConfig +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public interface MqttConfig {
*/
boolean shouldUseSeparatePublisherClient();

/**
* Returns the reconnect backoff configuration to apply when reconnecting failed MQTT connections.
*
* @return the reconnect backoff configuration to apply.
* @since 2.0.0
*/
BackOffConfig getReconnectBackOffConfig();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code MqttConfig}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
*/
final class DuplicationRetryTimeoutStrategy implements RetryTimeoutStrategy {

private static final Duration ZERO_MIN_DURATION_FIRST_INCREMENT = Duration.ofMillis(100);

private final Duration minTimeout;
private final Duration maxTimeout;
private Duration currentTimeout;
Expand Down Expand Up @@ -54,6 +56,9 @@ public void reset() {
public Duration getNextTimeout() {
final Duration timeout = this.currentTimeout;
this.increase();
if (timeout.isZero()) {
this.currentTimeout = ZERO_MIN_DURATION_FIRST_INCREMENT;
}
return timeout;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,24 @@

import java.time.Duration;

import org.eclipse.ditto.connectivity.service.config.TimeoutConfig;

/**
* Retry timeout strategy that provides increasing timeouts for retrying to connect.
*/
interface RetryTimeoutStrategy {
public interface RetryTimeoutStrategy {

/**
* Creates a new RetryTimeoutStrategy duplicating the {@link #getNextTimeout()} until the configured max timeout
* of the passed {@code timeoutConfig} is reached with the formula: {@code timeout = minTimeout * 2^x}
*
* @param timeoutConfig the timeout config to apply.
* @return the created RetryTimeoutStrategy
* @since 2.0.0
*/
static RetryTimeoutStrategy newDuplicationRetryTimeoutStrategy(final TimeoutConfig timeoutConfig) {
return DuplicationRetryTimeoutStrategy.fromConfig(timeoutConfig);
}

/**
* Resets the timeout and the current tries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public int hashCode() {
@Override
public String toString() {
return getClass().getSimpleName() + " [" +
", capacity=" + capacity +
"capacity=" + capacity +
", elements=" + elements +
"]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public int hashCode() {
@Override
public String toString() {
return getClass().getSimpleName() + " [" +
", connectionId=" + connectionId +
"connectionId=" + connectionId +
", delegate=" + delegate +
", active=" + active +
"]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public int hashCode() {
@Override
public String toString() {
return getClass().getSimpleName() + " [" +
", category=" + category +
"category=" + category +
", type=" + type +
", successLogs=" + successLogs +
", failureLogs=" + failureLogs +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ <T extends MqttClientBuilderBase<T>> T configureClientBuilder(
final T newBuilder,
final Connection connection,
final String identifier,
final boolean allowReconnect,
@Nullable final MqttClientConnectedListener connectedListener,
@Nullable final MqttClientDisconnectedListener disconnectedListener,
final ConnectionLogger connectionLogger,
Expand All @@ -135,10 +134,6 @@ <T extends MqttClientBuilderBase<T>> T configureClientBuilder(
.serverHost(uri.getHost())
.serverPort(uri.getPort());

if (allowReconnect && connection.isFailoverEnabled()) {
builder = builder.automaticReconnectWithDefaultConfig();
}

if (isSecuredConnection(connection.getProtocol())) {

// create DittoTrustManagerFactory to apply hostname verification
Expand Down
Loading

0 comments on commit 3ecf151

Please sign in to comment.