Skip to content

Commit

Permalink
Change defaults for reconnectForRedelivery and separatePublisherClien…
Browse files Browse the repository at this point in the history
…t in

MqttSpecificConfig

* Changed both default values to false and made them configurable via
  HOCON config

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Apr 20, 2021
1 parent a47d0d4 commit 98bc6f7
Show file tree
Hide file tree
Showing 15 changed files with 267 additions and 90 deletions.
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.services.connectivity.config;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.Nullable;
Expand All @@ -31,9 +32,16 @@ public final class DefaultMqttConfig implements MqttConfig {
private static final String CONFIG_PATH = "mqtt";

private final int sourceBufferSize;
private final boolean reconnectForRedelivery;
private final boolean useSeparateClientForPublisher;
private final Duration reconnectForRedeliveryDelay;

private DefaultMqttConfig(final ScopedConfig config) {
sourceBufferSize = config.getInt(MqttConfigValue.SOURCE_BUFFER_SIZE.getConfigPath());
reconnectForRedelivery = config.getBoolean(MqttConfigValue.RECONNECT_FOR_REDELIVERY.getConfigPath());
reconnectForRedeliveryDelay =
config.getDuration(MqttConfigValue.RECONNECT_FOR_REDELIVERY_DELAY.getConfigPath());
useSeparateClientForPublisher = config.getBoolean(MqttConfigValue.SEPARATE_PUBLISHER_CLIENT.getConfigPath());
}

/**
Expand All @@ -52,6 +60,21 @@ public int getSourceBufferSize() {
return sourceBufferSize;
}

@Override
public boolean shouldReconnectForRedelivery() {
return reconnectForRedelivery;
}

@Override
public Duration getReconnectForRedeliveryDelay() {
return reconnectForRedeliveryDelay;
}

@Override
public boolean shouldUseSeparatePublisherClient() {
return useSeparateClientForPublisher;
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
Expand All @@ -61,18 +84,25 @@ public boolean equals(@Nullable final Object o) {
return false;
}
final DefaultMqttConfig that = (DefaultMqttConfig) o;
return sourceBufferSize == that.sourceBufferSize;
return Objects.equals(sourceBufferSize, that.sourceBufferSize) &&
Objects.equals(reconnectForRedelivery, that.reconnectForRedelivery) &&
Objects.equals(reconnectForRedeliveryDelay, that.reconnectForRedeliveryDelay) &&
Objects.equals(useSeparateClientForPublisher, that.useSeparateClientForPublisher);
}

@Override
public int hashCode() {
return Objects.hash(sourceBufferSize);
return Objects.hash(sourceBufferSize, reconnectForRedelivery, reconnectForRedeliveryDelay,
useSeparateClientForPublisher);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"sourceBufferSize=" + sourceBufferSize +
", reconnectForRedelivery=" + reconnectForRedelivery +
", reconnectForRedeliveryDelay=" + reconnectForRedeliveryDelay +
", useSeparateClientForPublisher=" + useSeparateClientForPublisher +
"]";
}

Expand Down
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.services.connectivity.config;

import java.time.Duration;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.services.utils.config.KnownConfigValue;
Expand All @@ -29,6 +31,26 @@ public interface MqttConfig {
*/
int getSourceBufferSize();

/**
* Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement.
*
* @return true if the client should reconnect, false if not.
*/
boolean shouldReconnectForRedelivery();

/**
* @return the amount of time that a reconnect will be delayed after a failed acknowledgement.
*/
Duration getReconnectForRedeliveryDelay();

/**
* Indicates whether a separate client should be used for publishing. This could be useful when
* {@link #shouldReconnectForRedelivery()} returns true to avoid that the publisher has downtimes.
*
* @return true if a separate client should be used, false if not.
*/
boolean shouldUseSeparatePublisherClient();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code MqttConfig}.
Expand All @@ -38,7 +60,23 @@ enum MqttConfigValue implements KnownConfigValue {
/**
* The maximum number of buffered messages for each MQTT source.
*/
SOURCE_BUFFER_SIZE("source-buffer-size", 8);
SOURCE_BUFFER_SIZE("source-buffer-size", 8),

/**
* Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement.
*/
RECONNECT_FOR_REDELIVERY("reconnect-for-redelivery", false),

/**
* The amount of time that a reconnect will be delayed after a failed acknowledgement
*/
RECONNECT_FOR_REDELIVERY_DELAY("reconnect-for-redelivery-delay", Duration.ofSeconds(2)),

/**
* Indicates whether a separate client should be used for publishing. This could be useful when
* {@link #shouldReconnectForRedelivery()} returns true to avoid that the publisher has downtimes.
*/
SEPARATE_PUBLISHER_CLIENT("separate-publisher-client", false);

private final String path;
private final Object defaultValue;
Expand Down
13 changes: 13 additions & 0 deletions services/connectivity/config/src/main/resources/connectivity.conf
Expand Up @@ -174,6 +174,19 @@ ditto {
# maximum mumber of MQTT messages to buffer in a source (presumably for at-least-once and exactly-once delivery)
source-buffer-size = 8
source-buffer-size = ${?CONNECTIVITY_MQTT_SOURCE_BUFFER_SIZE}

# Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement.
reconnect-for-redelivery = false
reconnect-for-redelivery = ${?CONNECTIVITY_MQTT_RECONNECT_FOR_REDELIVERY}

#The amount of time that a reconnect will be delayed after a failed acknowledgement
reconnect-for-redelivery-delay = 2s
reconnect-for-redelivery-delay = ${?CONNECTIVITY_MQTT_RECONNECT_FOR_REDELIVERY_DELAY}

# Indicates whether a separate client should be used for publishing. This could be useful when
# reconnect-for-redelivery is set to true to avoid that the publisher has downtimes.
separate-publisher-client = false
separate-publisher-client = ${?CONNECTIVITY_MQTT_SEPARATE_PUBLISHER_CLIENT}
}

http-push {
Expand Down
Expand Up @@ -12,19 +12,18 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.mqtt;

import static org.eclipse.ditto.services.models.placeholders.PlaceholderFactory.newHeadersPlaceholder;
import static org.eclipse.ditto.services.models.connectivity.placeholders.ConnectivityPlaceholders.newEntityPlaceholder;
import static org.eclipse.ditto.services.models.connectivity.placeholders.ConnectivityPlaceholders.newFeaturePlaceholder;
import static org.eclipse.ditto.services.models.connectivity.placeholders.ConnectivityPlaceholders.newPolicyPlaceholder;
import static org.eclipse.ditto.services.models.connectivity.placeholders.ConnectivityPlaceholders.newSourceAddressPlaceholder;
import static org.eclipse.ditto.services.models.connectivity.placeholders.ConnectivityPlaceholders.newThingPlaceholder;
import static org.eclipse.ditto.services.models.connectivity.placeholders.ConnectivityPlaceholders.newTopicPathPlaceholder;
import static org.eclipse.ditto.services.models.placeholders.PlaceholderFactory.newHeadersPlaceholder;

import java.text.MessageFormat;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -46,16 +45,17 @@
import org.eclipse.ditto.model.connectivity.Enforcement;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.services.models.placeholders.ExpressionResolver;
import org.eclipse.ditto.services.models.placeholders.Placeholder;
import org.eclipse.ditto.services.models.placeholders.PlaceholderFactory;
import org.eclipse.ditto.services.models.placeholders.PlaceholderFilter;
import org.eclipse.ditto.services.models.placeholders.UnresolvedPlaceholderException;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.connectivity.config.MqttConfig;
import org.eclipse.ditto.services.connectivity.messaging.validation.AbstractProtocolValidator;
import org.eclipse.ditto.services.models.connectivity.EnforcementFactoryFactory;
import org.eclipse.ditto.services.models.connectivity.placeholders.SourceAddressPlaceholder;
import org.eclipse.ditto.services.models.connectivity.placeholders.ThingPlaceholder;
import org.eclipse.ditto.services.models.placeholders.ExpressionResolver;
import org.eclipse.ditto.services.models.placeholders.Placeholder;
import org.eclipse.ditto.services.models.placeholders.PlaceholderFactory;
import org.eclipse.ditto.services.models.placeholders.PlaceholderFilter;
import org.eclipse.ditto.services.models.placeholders.UnresolvedPlaceholderException;

import com.hivemq.client.internal.util.UnsignedDataTypes;
import com.hivemq.client.mqtt.datatypes.MqttQos;
Expand All @@ -72,12 +72,17 @@ public abstract class AbstractMqttValidator extends AbstractProtocolValidator {
private static final String QOS = "qos";

protected static final Collection<String> ACCEPTED_SCHEMES = List.of("tcp", "ssl");
protected static final Collection<String> SECURE_SCHEMES = Collections.unmodifiableList(
Collections.singletonList("ssl"));
protected static final Collection<String> SECURE_SCHEMES = List.of("ssl");

private static final String ERROR_DESCRIPTION = "''{0}'' is not a valid value for MQTT enforcement. Valid" +
" values are: ''{1}''.";

private final MqttConfig mqttConfig;

protected AbstractMqttValidator(final MqttConfig mqttConfig) {
this.mqttConfig = mqttConfig;
}

@Override
protected void validateSource(final Source source, final DittoHeaders dittoHeaders,
final Supplier<String> sourceDescription) {
Expand Down Expand Up @@ -215,12 +220,13 @@ protected static void validateAddresses(final Connection connection, final Ditto
}

protected void validateSpecificConfig(final Connection connection, final DittoHeaders dittoHeaders) {
final MqttSpecificConfig mqttSpecificConfig = MqttSpecificConfig.fromConnection(connection);
final MqttSpecificConfig mqttSpecificConfig = MqttSpecificConfig.fromConnection(connection, mqttConfig);
mqttSpecificConfig.getKeepAliveInterval().ifPresent(keepAlive -> {
final long seconds = keepAlive.toSeconds();
if (!UnsignedDataTypes.isUnsignedShort(seconds)) {
throw ConnectionConfigurationInvalidException
.newBuilder("Keep alive interval '"+seconds+"' is not within the allowed range of [0, 65535] seconds.")
.newBuilder("Keep alive interval '" + seconds +
"' is not within the allowed range of [0, 65535] seconds.")
.description("Please adjust the interval to be within the allowed range.")
.dittoHeaders(dittoHeaders)
.build();
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.ditto.model.connectivity.HeaderMapping;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.services.connectivity.config.MqttConfig;

import com.hivemq.client.internal.util.UnsignedDataTypes;

Expand All @@ -35,13 +36,18 @@
@Immutable
public final class Mqtt3Validator extends AbstractMqttValidator {

private Mqtt3Validator(final MqttConfig mqttConfig) {
super(mqttConfig);
}

/**
* Create a new {@code Mqtt3Validator}.
*
* @param mqttConfig used to create the fallback specific config
* @return a new instance.
*/
public static Mqtt3Validator newInstance() {
return new Mqtt3Validator();
public static Mqtt3Validator newInstance(final MqttConfig mqttConfig) {
return new Mqtt3Validator(mqttConfig);
}

@Override
Expand Down
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionType;
import org.eclipse.ditto.services.connectivity.config.MqttConfig;

import akka.actor.ActorSystem;

Expand All @@ -26,13 +27,18 @@
@Immutable
public final class Mqtt5Validator extends AbstractMqttValidator {

private Mqtt5Validator(final MqttConfig mqttConfig) {
super(mqttConfig);
}

/**
* Create a new {@code Mqtt5Validator}.
*
* @param mqttConfig used to create the fallback specific config
* @return a new instance.
*/
public static Mqtt5Validator newInstance() {
return new Mqtt5Validator();
public static Mqtt5Validator newInstance(final MqttConfig mqttConfig) {
return new Mqtt5Validator(mqttConfig);
}

@Override
Expand Down
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.services.connectivity.config.MqttConfig;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
Expand All @@ -41,29 +42,34 @@ public final class MqttSpecificConfig {
private static final String KEEP_ALIVE_INTERVAL = "keepAlive";
private static final String RECONNECT_FOR_REDELIVERY_DELAY = "reconnectForRedeliveryDelay";

private static final boolean DEFAULT_RECONNECT_FOR_REDELIVERY = true;
private static final Duration DEFAULT_RECONNECT_DURATION = Duration.ofSeconds(2L);

private final Config specificConfig;

MqttSpecificConfig(final Map<String, String> specificConfig) {
final Map<String, Object> defaultMap = new HashMap<>();
defaultMap.put(RECONNECT_FOR_REDELIVERY, DEFAULT_RECONNECT_FOR_REDELIVERY);
defaultMap.put(SEPARATE_PUBLISHER_CLIENT, DEFAULT_RECONNECT_FOR_REDELIVERY);
defaultMap.put(RECONNECT_FOR_REDELIVERY_DELAY, DEFAULT_RECONNECT_DURATION);
this.specificConfig = ConfigFactory.parseMap(specificConfig)
.withFallback(ConfigFactory.parseMap(defaultMap));
private MqttSpecificConfig(final Config specificConfig) {
this.specificConfig = specificConfig;
}

/**
* Creates a new instance of MqttSpecificConfig based on the {@code specificConfig} of the passed
* {@code connection}.
*
* @param connection the Connection to extract the {@code specificConfig} map from.
* @param mqttConfig the mqtt config to create the default config from.
* @return the new MqttSpecificConfig instance
*/
public static MqttSpecificConfig fromConnection(final Connection connection) {
return new MqttSpecificConfig(connection.getSpecificConfig());
public static MqttSpecificConfig fromConnection(final Connection connection, final MqttConfig mqttConfig) {
final Map<String, Object> defaultConfig = toDefaultConfig(mqttConfig);
final Config config = ConfigFactory.parseMap(connection.getSpecificConfig())
.withFallback(ConfigFactory.parseMap(defaultConfig));

return new MqttSpecificConfig(config);
}

private static Map<String, Object> toDefaultConfig(final MqttConfig mqttConfig) {
final Map<String, Object> defaultMap = new HashMap<>();
defaultMap.put(RECONNECT_FOR_REDELIVERY, mqttConfig.shouldReconnectForRedelivery());
defaultMap.put(SEPARATE_PUBLISHER_CLIENT, mqttConfig.shouldUseSeparatePublisherClient());
defaultMap.put(RECONNECT_FOR_REDELIVERY_DELAY, mqttConfig.getReconnectForRedeliveryDelay());
return defaultMap;
}

/**
Expand All @@ -83,15 +89,15 @@ public boolean cleanSession() {
* @return whether reconnect-for-redelivery behavior is activated.
*/
public boolean reconnectForRedelivery() {
return getSafely(() -> specificConfig.getBoolean(RECONNECT_FOR_REDELIVERY), DEFAULT_RECONNECT_FOR_REDELIVERY);
return specificConfig.getBoolean(RECONNECT_FOR_REDELIVERY);
}

/**
* @return whether to use a separate client for publisher actors so that reconnect-for-redelivery
* does not disrupt the publisher.
*/
public boolean separatePublisherClient() {
return getSafely(() -> specificConfig.getBoolean(SEPARATE_PUBLISHER_CLIENT), DEFAULT_RECONNECT_FOR_REDELIVERY);
return specificConfig.getBoolean(SEPARATE_PUBLISHER_CLIENT);
}

/**
Expand Down Expand Up @@ -153,6 +159,7 @@ private Optional<String> getStringOptional(final String key) {
return Optional.empty();
}
}

private Optional<Duration> getDurationOptional(final String key) {
if (specificConfig.hasPath(key)) {
return Optional.of(specificConfig.getDuration(key));
Expand Down

0 comments on commit 98bc6f7

Please sign in to comment.