Skip to content

Commit

Permalink
[#1078] split request signing factories into different factories for …
Browse files Browse the repository at this point in the history
…http push and amqp

Signed-off-by: Florian Fendt <Florian.Fendt@bosch.io>
  • Loading branch information
ffendt committed Jun 9, 2021
1 parent 7ef33cb commit 584183c
Show file tree
Hide file tree
Showing 28 changed files with 531 additions and 189 deletions.
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Map;

import javax.annotation.concurrent.Immutable;

Expand Down Expand Up @@ -150,6 +151,11 @@ default int getConsumerThrottlingLimit() {
*/
int getGlobalPrefetchPolicyAllCount();

/**
* @return configuration of HMAC request-signing algorithms.
*/
Map<String, String> getHmacAlgorithms();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code Amqp10Config}.
Expand Down Expand Up @@ -204,7 +210,12 @@ enum Amqp10ConfigValue implements KnownConfigValue {
/**
* How many message producers to cache per client actor.
*/
GLOBAL_PREFETCH_POLICY_ALL_COUNT("global-prefetch-policy-all-count", 10);
GLOBAL_PREFETCH_POLICY_ALL_COUNT("global-prefetch-policy-all-count", 10),

/**
* HMAC request-signing algorithms.
*/
HMAC_ALGORITHMS("hmac-algorithms", Map.of());

private final String path;
private final Object defaultValue;
Expand Down
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;
Expand All @@ -28,7 +29,7 @@
* This class is the default implementation of {@link Amqp10Config}.
*/
@Immutable
public final class DefaultAmqp10Config implements Amqp10Config {
public final class DefaultAmqp10Config implements Amqp10Config, WithStringMapDecoding {

private static final String CONFIG_PATH = "amqp10";
private static final String CONSUMER_PATH = "consumer";
Expand All @@ -46,6 +47,7 @@ public final class DefaultAmqp10Config implements Amqp10Config {
private final Duration globalSendTimeout;
private final Duration globalRequestTimeout;
private final int globalPrefetchPolicyAllCount;
private final Map<String, String> hmacAlgorithms;

private DefaultAmqp10Config(final ScopedConfig config) {
consumerRateLimitEnabled = config.getBoolean(Amqp10ConfigValue.CONSUMER_RATE_LIMIT_ENABLED.getConfigPath());
Expand All @@ -66,6 +68,7 @@ private DefaultAmqp10Config(final ScopedConfig config) {
globalRequestTimeout = config.getDuration(Amqp10ConfigValue.GLOBAL_REQUEST_TIMEOUT.getConfigPath());
globalPrefetchPolicyAllCount =
config.getInt(Amqp10ConfigValue.GLOBAL_PREFETCH_POLICY_ALL_COUNT.getConfigPath());
hmacAlgorithms = asStringMap(config, HttpPushConfig.ConfigValue.HMAC_ALGORITHMS.getConfigPath());
}

/**
Expand Down Expand Up @@ -139,6 +142,11 @@ public int getGlobalPrefetchPolicyAllCount() {
return globalPrefetchPolicyAllCount;
}

@Override
public Map<String, String> getHmacAlgorithms() {
return hmacAlgorithms;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -159,15 +167,16 @@ public boolean equals(final Object o) {
Objects.equals(consumerThrottlingConfig, that.consumerThrottlingConfig) &&
Objects.equals(globalConnectTimeout, that.globalConnectTimeout) &&
Objects.equals(globalSendTimeout, that.globalSendTimeout) &&
Objects.equals(globalRequestTimeout, that.globalRequestTimeout);
Objects.equals(globalRequestTimeout, that.globalRequestTimeout) &&
Objects.equals(hmacAlgorithms, that.hmacAlgorithms);
}

@Override
public int hashCode() {
return Objects.hash(consumerRateLimitEnabled, consumerMaxInFlight, consumerRedeliveryExpectationTimeout,
producerCacheSize, backOffConfig, consumerThrottlingConfig, maxQueueSize,
messagePublishingParallelism, globalConnectTimeout, globalSendTimeout, globalRequestTimeout,
globalPrefetchPolicyAllCount);
globalPrefetchPolicyAllCount, hmacAlgorithms);
}

@Override
Expand All @@ -185,6 +194,8 @@ public String toString() {
", globalSendTimeout=" + globalSendTimeout +
", globalRequestTimeout=" + globalRequestTimeout +
", globalPrefetchPolicyAllCount=" + globalPrefetchPolicyAllCount +
", hmacAlgorithms=" + hmacAlgorithms +
"]";
}

}
Expand Up @@ -13,10 +13,8 @@
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import javax.annotation.concurrent.Immutable;

Expand All @@ -32,7 +30,7 @@
* This class is the default implementation of {@link HttpPushConfig}.
*/
@Immutable
final class DefaultHttpPushConfig implements HttpPushConfig {
final class DefaultHttpPushConfig implements HttpPushConfig, WithStringMapDecoding {

private static final String CONFIG_PATH = "http-push";

Expand All @@ -48,7 +46,7 @@ private DefaultHttpPushConfig(final ScopedConfig config) {
throw new DittoConfigError("Request timeout must be greater than 0");
}
httpProxyConfig = DefaultHttpProxyConfig.ofProxy(config);
hmacAlgorithms = asStringMap(config.getConfig(ConfigValue.HMAC_ALGORITHMS.getConfigPath()));
hmacAlgorithms = asStringMap(config, ConfigValue.HMAC_ALGORITHMS.getConfigPath());
}

static DefaultHttpPushConfig of(final Config config) {
Expand Down Expand Up @@ -105,13 +103,4 @@ public String toString() {
"]";
}

private static Map<String, String> asStringMap(final Config config) {
try {
final Map<String, String> map = config.root().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> (String) entry.getValue().unwrapped()));
return Collections.unmodifiableMap(map);
} catch (final ClassCastException e) {
throw new DittoConfigError("In HttpPushConfig, hmac-algorithms must be a map from string to string.");
}
}
}
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.connectivity.service.config;

import java.text.MessageFormat;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import org.eclipse.ditto.internal.utils.config.DittoConfigError;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;

/**
* Interface providing functionality to decode a config entry to a {@code Map<String, String>}.
*/
public interface WithStringMapDecoding {

/**
* Decode the value at {@code path} in {@code config} to a {@link Map}.
*
* @param config the config.
* @param path the path at which the Map should be located.
* @return the {@link Map} containing the values.
* @throws DittoConfigError if the value at {@code path} was missing or no Map from string to string.
*/
default Map<String, String> asStringMap(final ScopedConfig config, final String path) {
try {
final Config stringMapConfig = config.getConfig(path);
final Map<String, String> map = stringMapConfig.root().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> (String) entry.getValue().unwrapped()));
return Collections.unmodifiableMap(map);
} catch (final ClassCastException | ConfigException e) {
final String errorMessage =
MessageFormat.format("Key <{0}> in config <{1}> must contain a map from string to string",
path, config.getConfigPath());
throw new DittoConfigError(errorMessage);
}
}

}
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.amqp;

import java.time.Instant;
import java.util.Optional;

import org.eclipse.ditto.connectivity.model.UserPasswordCredentials;
import org.eclipse.ditto.connectivity.service.messaging.signing.Signing;

/**
* Functional interface for preparing signing information for AMQP connection.
*/
@FunctionalInterface
public interface AmqpConnectionSigning extends Signing {

/**
* Creates signed credentials for a connection if applicable.
*
* @return an {@link Optional} containing the signed credentials if applicable, otherwise an empty {@link Optional}.
*/
default Optional<UserPasswordCredentials> createSignedCredentials() {
return createSignedCredentials(Instant.now());
}

/**
* Creates signed credentials for a connection if applicable.
*
* @param timestamp Timestamp to include in the signature.
* @return an {@link Optional} containing the signed credentials if applicable, otherwise an empty {@link Optional}.
*/
Optional<UserPasswordCredentials> createSignedCredentials(Instant timestamp);

}
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.amqp;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import org.eclipse.ditto.connectivity.model.ClientCertificateCredentials;
import org.eclipse.ditto.connectivity.model.CredentialsVisitor;
import org.eclipse.ditto.connectivity.model.HmacCredentials;
import org.eclipse.ditto.connectivity.model.MessageSendingFailedException;
import org.eclipse.ditto.connectivity.model.SshPublicKeyCredentials;
import org.eclipse.ditto.connectivity.model.UserPasswordCredentials;
import org.eclipse.ditto.connectivity.service.config.Amqp10Config;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.messaging.signing.NoOpSigning;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;

import com.typesafe.config.Config;

import akka.actor.AbstractExtensionId;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;
import scala.collection.immutable.List$;
import scala.reflect.ClassTag;

/**
* Actor system extension to load the configured connection signing algorithms.
*/
public final class AmqpConnectionSigningExtension implements Extension, CredentialsVisitor<AmqpConnectionSigning> {

private static final Id ID = new Id();

private final Map<String, AmqpConnectionSigningFactory> registry;

private AmqpConnectionSigningExtension(final Map<String, AmqpConnectionSigningFactory> registry) {
this.registry = registry;
}

/**
* Lookup this actor system extension.
*
* @param system the actor system.
* @return this extension.
*/
public static AmqpConnectionSigningExtension get(final ActorSystem system) {
return ID.get(system);
}

@Override
public AmqpConnectionSigning clientCertificate(final ClientCertificateCredentials credentials) {
return NoOpSigning.INSTANCE;
}

@Override
public AmqpConnectionSigning usernamePassword(final UserPasswordCredentials credentials) {
return NoOpSigning.INSTANCE;
}

@Override
public AmqpConnectionSigning sshPublicKeyAuthentication(final SshPublicKeyCredentials credentials) {
return NoOpSigning.INSTANCE;
}

@Override
public AmqpConnectionSigning hmac(final HmacCredentials credentials) {
final AmqpConnectionSigningFactory factory = registry.get(credentials.getAlgorithm());
if (factory != null) {
return factory.createAmqpConnectionSigning(credentials);
} else {
throw MessageSendingFailedException.newBuilder()
.message("Failed to sign AMQP 1.0 connection.")
.description(String.format("The configured algorithm '%s' does not exist. Fix this by adding an" +
" implementation for it in the '%s' section of the amqp config.",
credentials.getAlgorithm(), Amqp10Config.Amqp10ConfigValue.HMAC_ALGORITHMS.getConfigPath()))
.build();
}
}

/**
* The extension ID.
*/
public static final class Id extends AbstractExtensionId<AmqpConnectionSigningExtension> {

private static AmqpConnectionSigningFactory instantiate(final ExtendedActorSystem system, final String className) {
final ClassTag<AmqpConnectionSigningFactory> tag =
scala.reflect.ClassTag$.MODULE$.apply(AmqpConnectionSigningFactory.class);
return system.dynamicAccess().createInstanceFor(className, List$.MODULE$.empty(), tag).get();
}

@Override
public AmqpConnectionSigningExtension createExtension(final ExtendedActorSystem system) {
final Config config = system.settings().config();
final Map<String, String> hmacAlgorithms =
DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(config))
.getConnectionConfig()
.getAmqp10Config()
.getHmacAlgorithms();
final Map<String, AmqpConnectionSigningFactory> factoryMap = hmacAlgorithms.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> instantiate(system, entry.getValue())));
return new AmqpConnectionSigningExtension(Collections.unmodifiableMap(factoryMap));
}
}

}

0 comments on commit 584183c

Please sign in to comment.