Skip to content

Commit

Permalink
handle UnknownHostException in ConnectionValidator, add tests to veri…
Browse files Browse the repository at this point in the history
…fy correct handling of unknown host in connection

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed May 6, 2020
1 parent 546d3d8 commit 1431142
Show file tree
Hide file tree
Showing 16 changed files with 332 additions and 103 deletions.
Expand Up @@ -12,11 +12,9 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.httppush;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
Expand All @@ -33,15 +31,13 @@
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.services.connectivity.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.http.javadsl.model.Host;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpEntity;
import akka.http.javadsl.model.HttpHeader;
Expand Down Expand Up @@ -80,7 +76,6 @@ final class HttpPublisherActor extends BasePublisherActor<HttpPublishTarget> {

private final ActorMaterializer materializer;
private final SourceQueue<Pair<HttpRequest, HttpPushContext>> sourceQueue;
private final Collection<InetAddress> blacklistedAddresses;

@SuppressWarnings("unused")
private HttpPublisherActor(final Connection connection, final HttpPushFactory factory) {
Expand All @@ -92,8 +87,6 @@ private HttpPublisherActor(final Connection connection, final HttpPushFactory fa
DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(system.settings().config()))
.getConnectionConfig();
config = connectionConfig.getHttpPushConfig();
blacklistedAddresses =
ConnectionValidator.calculateBlacklistedAddresses(connectionConfig.getBlacklistedHostnames(), log);

materializer = ActorMaterializer.create(getContext());
sourceQueue =
Expand Down Expand Up @@ -125,17 +118,9 @@ protected HttpPublishTarget toPublishTarget(final String address) {
@Override
protected void publishMessage(@Nullable final Target target, final HttpPublishTarget publishTarget,
final ExternalMessage message, final ConnectionMonitor publishedMonitor) {

final HttpRequest request = createRequest(publishTarget, message);
final Host requestHost = request.getUri().getHost();
if (ConnectionValidator.isHostForbidden(requestHost, blacklistedAddresses)) {
log.warning("Tried to publish HTTP message to forbidden host: <{}> - dropping!", requestHost);
responseDroppedMonitor.failure(message, "Message dropped as the target address <{0}> is blacklisted " +
"or otherwise forbidden and may not be used", requestHost);
} else {
sourceQueue.offer(Pair.create(request, new HttpPushContext(message, request.getUri())))
sourceQueue.offer(Pair.create(request, new HttpPushContext(message, request.getUri())))
.handle(handleQueueOfferResult(message));
}
}

@Override
Expand Down
Expand Up @@ -91,6 +91,7 @@
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionNotAccessibleException;
import org.eclipse.ditto.signals.commands.connectivity.modify.CheckConnectionLogsActive;
Expand Down Expand Up @@ -156,7 +157,7 @@ public final class ConnectionPersistenceActor
private final ActorRef conciergeForwarder;
private final ClientActorPropsFactory propsFactory;
private final int clientActorsPerNode;
private final Consumer<ConnectivityCommand<?>> commandValidator;
private final ConnectivityCommandInterceptor commandValidator;
private final ConnectionLogger connectionLogger;
private Instant connectionClosedAt = Instant.now();

Expand All @@ -179,7 +180,7 @@ public final class ConnectionPersistenceActor
final DittoProtocolSub dittoProtocolSub,
final ActorRef conciergeForwarder,
final ClientActorPropsFactory propsFactory,
@Nullable final Consumer<ConnectivityCommand<?>> customCommandValidator,
@Nullable final ConnectivityCommandInterceptor customCommandValidator,
final int clientActorsPerNode) {

super(connectionId, new ConnectionMongoSnapshotAdapter());
Expand Down Expand Up @@ -245,7 +246,7 @@ public static Props props(final ConnectionId connectionId,
final DittoProtocolSub dittoProtocolSub,
final ActorRef conciergeForwarder,
final ClientActorPropsFactory propsFactory,
@Nullable final Consumer<ConnectivityCommand<?>> commandValidator
@Nullable final ConnectivityCommandInterceptor commandValidator
) {
return Props.create(ConnectionPersistenceActor.class, connectionId, dittoProtocolSub, conciergeForwarder,
propsFactory, commandValidator, CLIENT_ACTORS_PER_NODE);
Expand Down
Expand Up @@ -12,11 +12,9 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.persistence.stages;

import java.util.function.Consumer;

import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;

/**
* Everything needed by connection strategies from the state of a connection actor.
Expand All @@ -25,10 +23,10 @@ public final class ConnectionState {

private final ConnectionId connectionId;
private final ConnectionLogger connectionLogger;
private final Consumer<ConnectivityCommand<?>> validator;
private final ConnectivityCommandInterceptor validator;

private ConnectionState(final ConnectionId connectionId,
final ConnectionLogger connectionLogger, final Consumer<ConnectivityCommand<?>> validator) {
final ConnectionLogger connectionLogger, final ConnectivityCommandInterceptor validator) {
this.connectionId = connectionId;
this.connectionLogger = connectionLogger;
this.validator = validator;
Expand All @@ -45,7 +43,7 @@ private ConnectionState(final ConnectionId connectionId,
public static ConnectionState of(
final ConnectionId connectionId,
final ConnectionLogger connectionLogger,
final Consumer<ConnectivityCommand<?>> validator) {
final ConnectivityCommandInterceptor validator) {

return new ConnectionState(connectionId, connectionLogger, validator);
}
Expand All @@ -67,7 +65,7 @@ public ConnectionLogger getConnectionLogger() {
/**
* @return the command validator.
*/
public Consumer<ConnectivityCommand<?>> getValidator() {
public ConnectivityCommandInterceptor getValidator() {
return validator;
}
}
Expand Up @@ -52,22 +52,37 @@ ConnectionNotAccessibleException notAccessible(final Context<ConnectionState> co
}

static Optional<DittoRuntimeException> validate(final Context<ConnectionState> context,
final ConnectivityCommand command) {
final ConnectivityCommand command, final Connection connection) {
try {
context.getState().getValidator().accept(command, () -> connection);
return Optional.empty();
} catch (final Exception error) {
return handleValidationException(context, command, error);
}
}

static Optional<DittoRuntimeException> validate(final Context<ConnectionState> context,
final ConnectivityCommand<?> command) {
try {
context.getState().getValidator().accept(command);
return Optional.empty();
} catch (final Exception error) {
final DittoRuntimeException dre =
toDittoRuntimeException(error, context.getState().id(), command.getDittoHeaders());
context.getLog().info("Operation <{}> failed due to <{}>", command, dre);
context.getState()
.getConnectionLogger()
.failure("Operation {0} failed due to {1}", command.getType(), dre.getMessage());
return Optional.of(dre);
return handleValidationException(context, command, error);
}
}

private static Optional<DittoRuntimeException> handleValidationException(
final Context<ConnectionState> context, final ConnectivityCommand<?> command,
final Exception error) {
final DittoRuntimeException dre =
toDittoRuntimeException(error, context.getState().id(), command.getDittoHeaders());
context.getLog().info("Operation <{}> failed due to <{}>", command, dre);
context.getState()
.getConnectionLogger()
.failure("Operation {0} failed due to {1}", command.getType(), dre.getMessage());
return Optional.of(dre);
}

private static DittoRuntimeException toDittoRuntimeException(final Throwable error, final ConnectionId id,
final DittoHeaders headers) {
return DittoRuntimeException.asDittoRuntimeException(error,
Expand Down
Expand Up @@ -16,13 +16,16 @@
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.PERSIST_AND_APPLY_EVENT;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.SEND_RESPONSE;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.UPDATE_SUBSCRIPTIONS;
import static org.eclipse.ditto.services.utils.persistentactors.results.ResultFactory.newErrorResult;
import static org.eclipse.ditto.services.utils.persistentactors.results.ResultFactory.newMutationResult;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction;
Expand All @@ -46,11 +49,16 @@ final class OpenConnectionStrategy extends AbstractConnectivityCommandStrategy<O
@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection connection, final long nextRevision, final OpenConnection command) {
final ConnectivityEvent event = ConnectionOpened.of(context.getState().id(), command.getDittoHeaders());
final WithDittoHeaders response =
OpenConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
final List<ConnectionAction> actions =
Arrays.asList(PERSIST_AND_APPLY_EVENT, OPEN_CONNECTION, UPDATE_SUBSCRIPTIONS, SEND_RESPONSE);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
final Optional<DittoRuntimeException> validationError = validate(context, command, connection);
if (validationError.isPresent()) {
return newErrorResult(validationError.get());
} else {
final ConnectivityEvent event = ConnectionOpened.of(context.getState().id(), command.getDittoHeaders());
final WithDittoHeaders response =
OpenConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
final List<ConnectionAction> actions =
Arrays.asList(PERSIST_AND_APPLY_EVENT, OPEN_CONNECTION, UPDATE_SUBSCRIPTIONS, SEND_RESPONSE);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
}
}
}
Expand Up @@ -14,8 +14,9 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;

Expand All @@ -24,15 +25,15 @@
*/
public final class CompoundConnectivityCommandInterceptor implements ConnectivityCommandInterceptor {

private final Collection<Consumer<ConnectivityCommand<?>>> validators;
private final Collection<ConnectivityCommandInterceptor> validators;

@SafeVarargs
public CompoundConnectivityCommandInterceptor(final Consumer<ConnectivityCommand<?>>... validators) {
public CompoundConnectivityCommandInterceptor(final ConnectivityCommandInterceptor... validators) {
this.validators = Arrays.asList(validators);
}

@Override
public void accept(final ConnectivityCommand<?> command) {
validators.forEach(c -> c.accept(command));
public void accept(final ConnectivityCommand<?> command, final Supplier<Connection> connectionSupplier) {
validators.forEach(c -> c.accept(command, connectionSupplier));
}
}
Expand Up @@ -150,7 +150,8 @@ public static boolean isHostForbidden(final Host host, final Collection<InetAddr
return false;
} else {
// Forbid blacklisted, private, loopback, multicast and wildcard IPs.
return StreamSupport.stream(host.getInetAddresses().spliterator(), false)
final Iterable<InetAddress> inetAddresses = getInetAddressesAndHandleUnknownHost(host);
return StreamSupport.stream(inetAddresses.spliterator(), false)
.anyMatch(requestAddress ->
requestAddress.isLoopbackAddress() ||
requestAddress.isSiteLocalAddress() ||
Expand All @@ -160,6 +161,26 @@ public static boolean isHostForbidden(final Host host, final Collection<InetAddr
}
}

private static Iterable<InetAddress> getInetAddressesAndHandleUnknownHost(final Host host) {
final Iterable<InetAddress> inetAddresses;
try {
inetAddresses = getInetAddresses(host);
} catch (UnknownHostException e) {
final String errorMessage = String.format("The configured host '%s' is invalid: %s", host, e.getMessage());
throw ConnectionConfigurationInvalidException
.newBuilder(errorMessage)
.description("The configured host could not be resolved, make sure the Connection URI is correct.")
.cause(e)
.build();
}
return inetAddresses;
}

@SuppressWarnings("RedundantThrows") // UnknownHostException is thrown by java.net.InetAddress#getAllByName
private static Iterable<InetAddress> getInetAddresses(final Host host) throws UnknownHostException {
return host.getInetAddresses();
}

/**
* Check if number of mappings are valid
*
Expand Down Expand Up @@ -235,6 +256,11 @@ private static void validateBlacklistedHostnames(final Connection connection, fi
final Collection<InetAddress> blacklisted =
calculateBlacklistedAddresses(configuredBlacklistedHostnames, actorSystem.log());

validateBlacklistedHostnames(connection, dittoHeaders, blacklisted);
}

public static void validateBlacklistedHostnames(final Connection connection, final DittoHeaders dittoHeaders,
final Collection<InetAddress> blacklisted) {
final Host connectionHost = Uri.create(connection.getUri()).getHost();
if (isHostForbidden(connectionHost, blacklisted)) {
final String errorMessage = String.format("The configured host '%s' may not be used for the connection.",
Expand Down
Expand Up @@ -12,12 +12,19 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.validation;

import java.util.Optional;
import java.util.function.Supplier;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.services.connectivity.messaging.ClientActorPropsFactory;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;
import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.ModifyConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.OpenConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.TestConnection;

import akka.actor.ActorRef;
Expand Down Expand Up @@ -48,21 +55,35 @@ public DittoConnectivityCommandValidator(
}

@Override
public void accept(final ConnectivityCommand<?> command) {
public void accept(final ConnectivityCommand<?> command, final Supplier<Connection> connectionSupplier) {
switch (command.getType()) {
case CreateConnection.TYPE:
case TestConnection.TYPE:
case ModifyConnection.TYPE:
final Connection connection = getConnectionFromCommand(command);
if (connection != null) {
connectionValidator.validate(connection, command.getDittoHeaders(), actorSystem);
propsFactory.getActorPropsForType(connection, conciergeForwarder, connectionActor);
} else {
// should never happen
throw new IllegalStateException("connection=null in " + command);
}
resolveConnection(connectionSupplier)
.ifPresentOrElse(connection -> {
connectionValidator.validate(connection, command.getDittoHeaders(), actorSystem);
propsFactory.getActorPropsForType(connection, conciergeForwarder, connectionActor);
},
// should never happen
handleNullConnection(command));
break;
default: //nothing to validate for other commands
case OpenConnection.TYPE:
resolveConnection(connectionSupplier).ifPresentOrElse(c -> connectionValidator.validate(c,
command.getDittoHeaders(), actorSystem), handleNullConnection(command));
break;
default: // nothing to validate for other commands
}
}

@Nonnull
private Runnable handleNullConnection(final ConnectivityCommand<?> command) {
return () -> {
throw new IllegalStateException("connection=null for " + command);
};
}

private Optional<Connection> resolveConnection(@Nullable Supplier<Connection> connectionSupplier) {
return Optional.ofNullable(connectionSupplier).map(Supplier::get);
}
}
Expand Up @@ -16,6 +16,7 @@
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.Collections;

import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -186,7 +187,7 @@ public void testTLSConnectionWithoutCertificateCheck() {
underTest.tell(TestConnection.of(insecureConnection, DittoHeaders.empty()), getRef());

// THEN: the test should succeed, or it should fail with a different reason than SSL validation
final Object response = expectMsgClass(Object.class);
final Object response = expectMsgClass(Duration.ofSeconds(5), Object.class);
if (response instanceof Status.Failure) {
final DittoRuntimeException error =
(DittoRuntimeException) getEventualCause(((Status.Failure) response).cause());
Expand Down

0 comments on commit 1431142

Please sign in to comment.