Skip to content

Commit

Permalink
added hostname blacklisting for HTTP endpoints which should not be in…
Browse files Browse the repository at this point in the history
…voked (e.g. localhost)

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Oct 14, 2019
1 parent c9cbcce commit e816083
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 32 deletions.
Expand Up @@ -12,9 +12,11 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.config;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.services.base.config.http.DefaultHttpProxyConfig;
Expand All @@ -35,11 +37,14 @@ final class DefaultHttpPushConfig implements HttpPushConfig {
private final int maxParallelism;
private final int maxQueueSize;
private final HttpProxyConfig httpProxyConfig;
private final Collection<String> blacklistedHostnames;

private DefaultHttpPushConfig(final ScopedConfig config) {
maxParallelism = config.getInt(ConfigValue.MAX_PARALLELISM.getConfigPath());
maxQueueSize = config.getInt(ConfigValue.MAX_QUEUE_SIZE.getConfigPath());
httpProxyConfig = DefaultHttpProxyConfig.ofProxy(config);
final String blacklistedHostnamesStr = config.getString(ConfigValue.BLACKLISTED_HOSTNAMES.getConfigPath());
blacklistedHostnames = Collections.unmodifiableCollection(Arrays.asList(blacklistedHostnamesStr.split(",")));
}

static DefaultHttpPushConfig of(final Config config) {
Expand All @@ -62,7 +67,12 @@ public HttpProxyConfig getHttpProxyConfig() {
}

@Override
public boolean equals(@Nullable final Object o) {
public Collection<String> getBlacklistedHostnames() {
return blacklistedHostnames;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
Expand All @@ -71,20 +81,23 @@ public boolean equals(@Nullable final Object o) {
}
final DefaultHttpPushConfig that = (DefaultHttpPushConfig) o;
return maxParallelism == that.maxParallelism &&
maxQueueSize == that.maxQueueSize;
maxQueueSize == that.maxQueueSize &&
Objects.equals(httpProxyConfig, that.httpProxyConfig) &&
Objects.equals(blacklistedHostnames, that.blacklistedHostnames);
}

@Override
public int hashCode() {
return Objects.hash(maxParallelism, maxQueueSize);
return Objects.hash(maxParallelism, maxQueueSize, httpProxyConfig, blacklistedHostnames);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"maxParallelism=" + maxParallelism +
", maxQueueSize=" + maxQueueSize +
", httpProxyConfig=" + httpProxyConfig +
", blacklistedHostnames=" + blacklistedHostnames +
"]";
}

}
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.config;

import java.util.Collection;

import org.eclipse.ditto.services.base.config.http.HttpProxyConfig;
import org.eclipse.ditto.services.utils.config.KnownConfigValue;

Expand All @@ -35,6 +37,11 @@ public interface HttpPushConfig {
*/
HttpProxyConfig getHttpProxyConfig();

/**
* @return the list of blacklisted HTTP hostnames to which sending out data will be prevented.
*/
Collection<String> getBlacklistedHostnames();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code HttpPushConfig}.
Expand All @@ -49,7 +56,12 @@ enum ConfigValue implements KnownConfigValue {
/**
* How many messages to buffer in the publisher actor before dropping them. Each takes up to 100 KB heap space.
*/
MAX_QUEUE_SIZE("max-queue-size", 10);
MAX_QUEUE_SIZE("max-queue-size", 10),

/**
* A comma separated list of blacklisted hostnames to which not http requests will be send out.
*/
BLACKLISTED_HOSTNAMES("blacklisted-hostnames", "");

private final String path;
private final Object defaultValue;
Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.services.base.config.http.HttpProxyConfig;
import org.eclipse.ditto.services.connectivity.messaging.config.HttpPushConfig;
import org.eclipse.ditto.services.connectivity.messaging.internal.ssl.SSLContextCreator;

import akka.actor.ActorSystem;
Expand Down Expand Up @@ -59,24 +59,24 @@ final class DefaultHttpPushFactory implements HttpPushFactory {
private final ClientTransport clientTransport;

private DefaultHttpPushFactory(final ConnectionId connectionId, final Uri baseUri, final int parallelism,
final SSLContextCreator sslContextCreator, @Nullable final HttpProxyConfig httpProxyConfig) {
final SSLContextCreator sslContextCreator, final HttpPushConfig httpPushConfig) {
this.connectionId = connectionId;
this.baseUri = baseUri;
this.parallelism = parallelism;
this.sslContextCreator = sslContextCreator;
if (httpProxyConfig == null || !httpProxyConfig.isEnabled()) {
if (!httpPushConfig.getHttpProxyConfig().isEnabled()) {
clientTransport = null;
} else {
clientTransport = httpProxyConfig.toClientTransport();
clientTransport = httpPushConfig.getHttpProxyConfig().toClientTransport();
}
}

static HttpPushFactory of(final Connection connection, @Nullable final HttpProxyConfig httpProxyConfig) {
static HttpPushFactory of(final Connection connection, final HttpPushConfig httpPushConfig) {
final ConnectionId connectionId = connection.getId();
final Uri baseUri = Uri.create(connection.getUri());
final int parallelism = parseParallelism(connection.getSpecificConfig());
final SSLContextCreator sslContextCreator = SSLContextCreator.fromConnection(connection, DittoHeaders.empty());
return new DefaultHttpPushFactory(connectionId, baseUri, parallelism, sslContextCreator, httpProxyConfig);
return new DefaultHttpPushFactory(connectionId, baseUri, parallelism, sslContextCreator, httpPushConfig);
}

@Override
Expand Down
Expand Up @@ -12,11 +12,17 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.httppush;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;

Expand All @@ -38,6 +44,7 @@
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.event.LoggingAdapter;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpEntity;
import akka.http.javadsl.model.HttpHeader;
Expand Down Expand Up @@ -76,6 +83,7 @@ final class HttpPublisherActor extends BasePublisherActor<HttpPublishTarget> {

private final ActorMaterializer materializer;
private final SourceQueue<Pair<HttpRequest, HttpPushContext>> sourceQueue;
private final Collection<String> blacklistedHostsAndIps;

@SuppressWarnings("unused")
private HttpPublisherActor(final ConnectionId connectionId, final List<Target> targets,
Expand All @@ -87,6 +95,7 @@ private HttpPublisherActor(final ConnectionId connectionId, final List<Target> t
config = DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(system.settings().config()))
.getConnectionConfig()
.getHttpPushConfig();
blacklistedHostsAndIps = calculateBlacklistedHostnames(config.getBlacklistedHostnames(), log);

materializer = ActorMaterializer.create(getContext());
sourceQueue =
Expand All @@ -96,6 +105,26 @@ private HttpPublisherActor(final ConnectionId connectionId, final List<Target> t
.run(materializer);
}

static Collection<String> calculateBlacklistedHostnames(final Collection<String> configuredBlacklistedHostnames,
final LoggingAdapter log) {
final Set<String> blacklistedHostsAndIps = new HashSet<>(configuredBlacklistedHostnames);
configuredBlacklistedHostnames.stream()
.filter(host -> !host.isEmpty())
.forEach(host -> {
try {
Arrays.asList(InetAddress.getAllByName(host)).forEach(inetAddress -> {
blacklistedHostsAndIps.add(inetAddress.getHostName());
blacklistedHostsAndIps.add(inetAddress.getHostAddress());
});
} catch (final UnknownHostException e) {
log.info("Could not resolve hostname during building blacklisted hostnames set: <{}>",
host);
}
}
);
return blacklistedHostsAndIps;
}

static Props props(final ConnectionId connectionId, final List<Target> targets, final HttpPushFactory factory) {
return Props.create(HttpPublisherActor.class, connectionId, targets, factory);
}
Expand Down Expand Up @@ -125,8 +154,15 @@ protected void publishMessage(@Nullable final Target target, final HttpPublishTa
final ExternalMessage message, final ConnectionMonitor publishedMonitor) {

final HttpRequest request = createRequest(publishTarget, message);
sourceQueue.offer(Pair.create(request, new HttpPushContext(message, request.getUri())))
.handle(handleQueueOfferResult(message));
final String requestAddress = request.getUri().host().address();
if (blacklistedHostsAndIps.contains(requestAddress)) {
log.warning("Tried to publish HTTP message to blacklisted host: <{}> - dropping!", requestAddress);
responseDroppedMonitor.failure(message, "Message dropped as the target address <{0}> is blacklisted " +
"and may not be used", requestAddress);
} else {
sourceQueue.offer(Pair.create(request, new HttpPushContext(message, request.getUri())))
.handle(handleQueueOfferResult(message));
}
}

@Override
Expand Down Expand Up @@ -181,7 +217,7 @@ private BiFunction<QueueOfferResult, Throwable, Void> handleQueueOfferResult(fin
} else if (queueOfferResult == QueueOfferResult.dropped()) {
log.debug("HTTP request dropped due to full queue");
responseDroppedMonitor.failure(message,
"Message dropped because the number of ongoing requests exceeded {0}.",
"Message dropped because the number of ongoing requests exceeded <{0}>",
config.getMaxQueueSize());
}
return null;
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.ditto.services.base.config.http.HttpProxyConfig;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientActor;
import org.eclipse.ditto.services.connectivity.messaging.config.DittoConnectivityConfig;
import org.eclipse.ditto.services.connectivity.messaging.config.HttpPushConfig;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientConnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ssl.SSLContextCreator;
Expand All @@ -57,19 +58,18 @@ public final class HttpPushClientActor extends BaseClientActor {

@Nullable
private ActorRef httpPublisherActor;
private final HttpProxyConfig httpProxyConfig;
private final HttpPushConfig httpPushConfig;

@SuppressWarnings("unused")
private HttpPushClientActor(final Connection connection, final ConnectivityStatus desiredConnectionStatus) {
super(connection, desiredConnectionStatus, ActorRef.noSender());

httpProxyConfig = DittoConnectivityConfig.of(
httpPushConfig = DittoConnectivityConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
)
.getConnectionConfig()
.getHttpPushConfig()
.getHttpProxyConfig();
factory = HttpPushFactory.of(connection, httpProxyConfig);
.getHttpPushConfig();
factory = HttpPushFactory.of(connection, httpPushConfig);
}

/**
Expand All @@ -84,7 +84,7 @@ public static Props props(final Connection connection) {

@Override
protected boolean canConnectViaSocket(final Connection connection) {
if (httpProxyConfig.isEnabled()) {
if (httpPushConfig.getHttpProxyConfig().isEnabled()) {
return connectViaProxy(connection.getHostname(), connection.getPort())
.handle((status, throwable) -> status instanceof Status.Success)
.toCompletableFuture()
Expand Down Expand Up @@ -145,7 +145,7 @@ protected CompletionStage<Status.Status> startPublisherActor() {

private CompletionStage<Status.Status> testSSL(final Connection connection, final String hostWithoutLookup,
final int port) {
if (httpProxyConfig.isEnabled()) {
if (httpPushConfig.getHttpProxyConfig().isEnabled()) {
// don't do a second proxy check
return statusSuccessFuture("TLS connection to '%s:%d' via Http proxy established successfully.",
hostWithoutLookup, port);
Expand All @@ -164,6 +164,7 @@ private CompletionStage<Status.Status> testSSL(final Connection connection, fina
}

private CompletionStage<Status.Status> connectViaProxy(final String hostWithoutLookup, final int port) {
final HttpProxyConfig httpProxyConfig = this.httpPushConfig.getHttpProxyConfig();
try (final Socket proxySocket = new Socket(httpProxyConfig.getHostname(), httpProxyConfig.getPort())) {
String proxyConnect = "CONNECT " + hostWithoutLookup + ":" + port + " HTTP/1.1\n";
proxyConnect += "Host: " + hostWithoutLookup + ":" + port;
Expand Down
Expand Up @@ -12,10 +12,8 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.httppush;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.services.base.config.http.HttpProxyConfig;
import org.eclipse.ditto.services.connectivity.messaging.config.HttpPushConfig;

import akka.actor.ActorSystem;
import akka.event.LoggingAdapter;
Expand Down Expand Up @@ -59,10 +57,10 @@ public interface HttpPushFactory {
* with undefined behavior if the connection is not valid.
*
* @param connection the connection.
* @param httpProxyConfig proxy configuration, or null if no proxy is needed.
* @param httpPushConfig configuration of Http connections.
* @return the HTTP-push-factory.
*/
static HttpPushFactory of(final Connection connection, @Nullable final HttpProxyConfig httpProxyConfig) {
return DefaultHttpPushFactory.of(connection, httpProxyConfig);
static HttpPushFactory of(final Connection connection, final HttpPushConfig httpPushConfig) {
return DefaultHttpPushFactory.of(connection, httpPushConfig);
}
}
Expand Up @@ -40,6 +40,7 @@
import akka.actor.ActorSystem;
import akka.http.javadsl.model.HttpMethod;
import akka.http.javadsl.model.HttpMethods;
import akka.http.javadsl.model.Uri;

/**
* Validation of http-push connections.
Expand Down Expand Up @@ -75,12 +76,35 @@ public ConnectionType type() {
@Override
public void validate(final Connection connection, final DittoHeaders dittoHeaders, final ActorSystem actorSystem) {
validateUriScheme(connection, dittoHeaders, ACCEPTED_SCHEMES, SECURE_SCHEMES, "HTTP");
validateBlacklistedHostnames(connection, dittoHeaders, actorSystem);
validateSourceConfigs(connection, dittoHeaders);
validateTargetConfigs(connection, dittoHeaders);
validateMappingContext(connection, actorSystem, dittoHeaders);
validateParallelism(connection.getSpecificConfig(), actorSystem, dittoHeaders);
}

private static void validateBlacklistedHostnames(final Connection connection, final DittoHeaders dittoHeaders,
final ActorSystem actorSystem) {

final Collection<String> configuredBlacklistedHostnames = DittoConnectivityConfig.of(
DefaultScopedConfig.dittoScoped(actorSystem.settings().config())
).getConnectionConfig()
.getHttpPushConfig()
.getBlacklistedHostnames();
final Collection<String> blacklisted =
HttpPublisherActor.calculateBlacklistedHostnames(configuredBlacklistedHostnames, actorSystem.log());

final String connectionHostAddress = Uri.create(connection.getUri()).getHost().address();
if (blacklisted.contains(connectionHostAddress)) {
final String errorMessage = String.format("The configured host '%s' may not be used for the connection.",
connectionHostAddress);
throw ConnectionConfigurationInvalidException.newBuilder(errorMessage)
.description("It is a blacklisted hostname which may not be used.")
.dittoHeaders(dittoHeaders)
.build();
}
}

@Override
protected void validateSource(final Source source, final DittoHeaders dittoHeaders,
final Supplier<String> sourceDescription) {
Expand Down

0 comments on commit e816083

Please sign in to comment.