Skip to content

Commit

Permalink
enhanced Connection by "specificConfig" containing configuration for …
Browse files Browse the repository at this point in the history
…specific ConnectionTypes

* enhanced AMQP 1.0 and RabbitMQ configs to use those specificConfigs

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Apr 3, 2018
1 parent 0e8d42f commit b8196b1
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
*/
package org.eclipse.ditto.model.connectivity;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -154,6 +155,13 @@ public interface Connection extends Jsonifiable.WithFieldSelectorAndPredicate<Js
*/
int getProcessorPoolSize();

/**
* Returns configuration which is only applicable for a specific {@link ConnectionType}.
*
* @return an arbitrary map of config keys to config values.
*/
Map<String, String> getSpecificConfig();

/**
* Returns all non hidden marked fields of this {@code Connection}.
*
Expand Down Expand Up @@ -258,6 +266,13 @@ final class JsonFields {
JsonFactory.newIntFieldDefinition("processorPoolSize", FieldType.REGULAR, JsonSchemaVersion.V_1,
JsonSchemaVersion.V_2);

/**
* JSON field containing the {@code Connection} {@link ConnectionType} specific config.
*/
public static final JsonFieldDefinition<JsonObject> SPECIFIC_CONFIG =
JsonFactory.newJsonObjectFieldDefinition("specificConfig", FieldType.REGULAR, JsonSchemaVersion.V_1,
JsonSchemaVersion.V_2);

private JsonFields() {
throw new AssertionError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
*/
package org.eclipse.ditto.model.connectivity;

import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -77,6 +78,14 @@ public interface ConnectionBuilder {
*/
ConnectionBuilder clientCount(int clientCount);

/**
* Adds configuration which is only applicable for a specific {@link ConnectionType}.
*
* @param specificConfig the ConnectionType specific configuration to set
* @return this builder to allow method chaining.
*/
ConnectionBuilder specificConfig(Map<String, String> specificConfig);

/**
* Builds a new {@link Connection}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
package org.eclipse.ditto.model.connectivity;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -65,6 +67,7 @@ final class ImmutableConnection implements Connection {
private final boolean validateCertificate;
private final int throttle;
private final int processorPoolSize;
private final Map<String, String> specificConfig;

ImmutableConnection(final ImmutableConnectionBuilder builder) {
this.id = builder.id;
Expand All @@ -79,6 +82,8 @@ final class ImmutableConnection implements Connection {
this.validateCertificate = builder.validateCertificate;
this.throttle = builder.throttle;
this.processorPoolSize = builder.processorPoolSize;
this.specificConfig = Collections.unmodifiableMap(new HashMap<>(
builder.specificConfig));

final Matcher matcher = URI_REGEX_PATTERN.matcher(uri);

Expand Down Expand Up @@ -155,6 +160,15 @@ public static Connection fromJson(final JsonObject jsonObject) {
final Optional<Boolean> readValidateCertificates = jsonObject.getValue(JsonFields.VALIDATE_CERTIFICATES);
final Optional<Integer> readThrottle = jsonObject.getValue(JsonFields.THROTTLE);
final Optional<Integer> readProcessorPoolSize = jsonObject.getValue(JsonFields.PROCESSOR_POOL_SIZE);
final Map<String, String> readConnectionTypeSpecificConfiguration = jsonObject
.getValue(JsonFields.SPECIFIC_CONFIG)
.filter(JsonValue::isObject)
.map(JsonValue::asObject)
.map(JsonObject::stream)
.map(jsonFields -> jsonFields
.collect(Collectors.toMap(JsonField::getKeyName, f -> f.getValue().isString() ?
f.getValue().asString() : f.getValue().toString())))
.orElse(Collections.emptyMap());

final ConnectionBuilder builder =
ImmutableConnectionBuilder.of(readId, readConnectionType, readUri, readAuthorizationContext);
Expand All @@ -166,6 +180,7 @@ public static Connection fromJson(final JsonObject jsonObject) {
readFailoverEnabled.ifPresent(builder::failoverEnabled);
readValidateCertificates.ifPresent(builder::validateCertificate);
readProcessorPoolSize.ifPresent(builder::processorPoolSize);
builder.specificConfig(readConnectionTypeSpecificConfiguration);
return builder.build();
}

Expand Down Expand Up @@ -254,6 +269,11 @@ public int getProcessorPoolSize() {
return processorPoolSize;
}

@Override
public Map<String, String> getSpecificConfig() {
return specificConfig;
}

@Override
public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<JsonField> thePredicate) {
final Predicate<JsonField> predicate = schemaVersion.and(thePredicate);
Expand All @@ -278,6 +298,11 @@ public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<
jsonObjectBuilder.set(JsonFields.VALIDATE_CERTIFICATES, validateCertificate, predicate);
jsonObjectBuilder.set(JsonFields.THROTTLE, throttle, predicate);
jsonObjectBuilder.set(JsonFields.PROCESSOR_POOL_SIZE, processorPoolSize, predicate);
if (!specificConfig.isEmpty()) {
jsonObjectBuilder.set(JsonFields.SPECIFIC_CONFIG, specificConfig.entrySet().stream()
.map(entry -> JsonField.newInstance(entry.getKey(), JsonValue.of(entry.getValue())))
.collect(JsonCollectors.fieldsToObject()), predicate);
}
return jsonObjectBuilder.build();
}

Expand All @@ -303,14 +328,15 @@ public boolean equals(@Nullable final Object o) {
Objects.equals(path, that.path) &&
Objects.equals(throttle, that.throttle) &&
Objects.equals(processorPoolSize, that.processorPoolSize) &&
Objects.equals(validateCertificate, that.validateCertificate);
Objects.equals(validateCertificate, that.validateCertificate) &&
Objects.equals(specificConfig, that.specificConfig);
}

@Override
public int hashCode() {
return Objects.hash(id, connectionType, authorizationContext, sources, targets, clientCount,
failoverEnabled, uri, protocol, username, password, hostname, path, port, validateCertificate, throttle,
processorPoolSize);
processorPoolSize, specificConfig);
}

@Override
Expand All @@ -333,6 +359,7 @@ public String toString() {
", validateCertificate=" + validateCertificate +
", throttle=" + throttle +
", processorPoolSize=" + processorPoolSize +
", specificConfig=" + specificConfig +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkArgument;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.eclipse.ditto.model.base.auth.AuthorizationContext;
Expand All @@ -35,6 +37,7 @@ class ImmutableConnectionBuilder implements ConnectionBuilder {
int clientCount = 1;
int throttle = -1;
int processorPoolSize = 5;
final Map<String, String> specificConfig = new HashMap<>();

private ImmutableConnectionBuilder(final String id, final ConnectionType connectionType,
final String uri, final AuthorizationContext authorizationContext) {
Expand Down Expand Up @@ -102,6 +105,13 @@ public ConnectionBuilder clientCount(final int clientCount) {
return this;
}

@Override
public ConnectionBuilder specificConfig(final Map<String, String> specificConfig) {
checkNotNull(specificConfig, "Specific Config");
this.specificConfig.putAll(specificConfig);
return this;
}

@Override
public Connection build() {
return new ImmutableConnection(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.amqp;

import static org.apache.qpid.jms.provider.failover.FailoverProviderFactory.FAILOVER_OPTION_PREFIX;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.io.UnsupportedEncodingException;
Expand All @@ -19,9 +20,10 @@
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -84,19 +86,22 @@ private Context createContext(final Connection connection) throws NamingExceptio
final int port = connection.getPort();
final boolean failoverEnabled = connection.isFailoverEnabled();

final Map<String, String> specificConfig = connection.getSpecificConfig();

final String baseUri = formatUri(protocol, hostname, port);

final List<String> parameters = new ArrayList<>(getAmqpParameters(username == null || password == null));
if (!connection.isValidateCertificates() && SECURE_AMQP_SCHEME.equalsIgnoreCase(protocol)) {
// these setting can only be applied for amqps connections:
parameters.addAll(getTransportParameters());
}
final List<String> parameters =
new ArrayList<>(getAmqpParameters(username == null || password == null, specificConfig));
final boolean securedConnection =
!connection.isValidateCertificates() && SECURE_AMQP_SCHEME.equalsIgnoreCase(protocol);
parameters.addAll(getTransportParameters(securedConnection, specificConfig));
final String nestedUri = baseUri + parameters.stream().collect(Collectors.joining("&", "?", ""));

final List<String> globalParameters = new ArrayList<>(getJmsParameters(id, username, password));
final List<String> globalParameters =
new ArrayList<>(getJmsParameters(id, username, password, specificConfig));
final String connectionUri;
if (failoverEnabled) {
globalParameters.addAll(getFailoverParameters());
globalParameters.addAll(getFailoverParameters(specificConfig));
connectionUri =
wrapWithFailOver(nestedUri) + globalParameters.stream().collect(Collectors.joining("&", "?", ""));
} else {
Expand All @@ -117,42 +122,80 @@ private static String formatUri(final String protocol, final String hostname, fi

@SuppressWarnings("squid:S2068")
private static List<String> getJmsParameters(final String id, @Nullable final String username,
@Nullable final String password) {
@Nullable final String password, final Map<String, String> specificConfig) {
String encodedId;
try {
encodedId = URLEncoder.encode(id, StandardCharsets.UTF_8.displayName());
} catch (final UnsupportedEncodingException e) {
LOGGER.info("Enconding not supported: {}", e.getMessage());
LOGGER.info("Encoding not supported: {}", e.getMessage());
//fallback: replace special characters
encodedId = id.replaceAll("[^a-zA-Z0-9]+", "");
}
final List<String> parameters = new ArrayList<>();
parameters.add("jms.clientID=" + encodedId);
final List<String> jmsParams = specificConfig.entrySet().stream()
.filter(e -> e.getKey().startsWith("jms"))
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.toList());

jmsParams.add("jms.clientID=" + encodedId);
if (username != null && password != null) {
parameters.add("jms.username=" + username);
parameters.add("jms.password=" + password);
jmsParams.add("jms.username=" + username);
jmsParams.add("jms.password=" + password);
}
return parameters;
return jmsParams;
}

private static List<String> getAmqpParameters(final boolean anonymous) {
return anonymous ?
Collections.singletonList("amqp.saslMechanisms=ANONYMOUS") :
Collections.singletonList("amqp.saslMechanisms=PLAIN");
private static List<String> getAmqpParameters(final boolean anonymous,
final Map<String, String> specificConfig) {

final List<String> amqpParams = specificConfig.entrySet().stream()
.filter(e -> e.getKey().startsWith("amqp"))
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.toList());

if (anonymous) {
amqpParams.add("amqp.saslMechanisms=ANONYMOUS");
} else {
amqpParams.add("amqp.saslMechanisms=PLAIN");
}

return amqpParams;
}

private static List<String> getTransportParameters() {
return Arrays.asList("transport.trustAll=true",
"transport.verifyHost=false");
private static List<String> getTransportParameters(final boolean securedConnectionWithAcceptInvalidCertificates,
final Map<String, String> specificConfig) {

final List<String> transportParams = specificConfig.entrySet().stream()
.filter(e -> e.getKey().startsWith("transport"))
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.toList());

if (securedConnectionWithAcceptInvalidCertificates) {
// these setting can only be applied for amqps connections:
transportParams.add("transport.trustAll=true");
transportParams.add("transport.verifyHost=false");
}
return transportParams;
}

private static List<String> getFailoverParameters() {
return Arrays.asList("initialReconnectDelay=10s",
"failover.startupMaxReconnectAttempts=1", // important, we cannot interrupt connection initiation
"reconnectDelay=1s",
"maxReconnectDelay=1h",
"failover.useReconnectBackOff=true",
"reconnectBackOffMultiplier=1m");
private static List<String> getFailoverParameters(
final Map<String, String> specificConfig) {

final List<String> failoverParams = specificConfig.entrySet().stream()
.filter(e -> e.getKey().startsWith(FAILOVER_OPTION_PREFIX))
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.toList());

final List<String> defaultFailoverParams =
Arrays.asList(FAILOVER_OPTION_PREFIX + "initialReconnectDelay=" + TimeUnit.SECONDS.toMillis(10),
FAILOVER_OPTION_PREFIX + "startupMaxReconnectAttempts=1",
// important, we cannot interrupt connection initiation
FAILOVER_OPTION_PREFIX + "reconnectDelay=" + TimeUnit.SECONDS.toMillis(1),
FAILOVER_OPTION_PREFIX + "maxReconnectDelay=" + TimeUnit.MINUTES.toMillis(60),
FAILOVER_OPTION_PREFIX + "useReconnectBackOff=true",
FAILOVER_OPTION_PREFIX + "reconnectBackOffMultiplier=1.0");

defaultFailoverParams.addAll(failoverParams);
return defaultFailoverParams;
}

private static String wrapWithFailOver(final String uri) {
Expand Down
Loading

0 comments on commit b8196b1

Please sign in to comment.