Skip to content

Commit

Permalink
move URI scheme validation to protocol level
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Aug 13, 2018
1 parent 403edce commit 32881a6
Show file tree
Hide file tree
Showing 12 changed files with 442 additions and 5 deletions.
Expand Up @@ -38,7 +38,7 @@ public final class ConnectionUriInvalidException extends DittoRuntimeException i
private static final String MESSAGE_TEMPLATE = "The Connection URI ''{0}'' has an invalid format.";

private static final String DEFAULT_DESCRIPTION =
"The accepted format is amqp(s)://{username}:{password}@{hostname}:{port}/{path}";
"The accepted format is {protocol}://{username}:{password}@{hostname}:{port}/{path}";

private static final long serialVersionUID = -3899791430534146626L;

Expand Down
Expand Up @@ -39,8 +39,12 @@
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.model.connectivity.Topic;
import org.eclipse.ditto.services.connectivity.messaging.amqp.AmqpValidator;
import org.eclipse.ditto.services.connectivity.messaging.mqtt.MqttValidator;
import org.eclipse.ditto.services.connectivity.messaging.persistence.ConnectionMongoSnapshotAdapter;
import org.eclipse.ditto.services.connectivity.messaging.rabbitmq.RabbitMQValidator;
import org.eclipse.ditto.services.connectivity.messaging.validation.CompoundConnectivityCommandInterceptor;
import org.eclipse.ditto.services.connectivity.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.services.connectivity.messaging.validation.DittoConnectivityCommandValidator;
import org.eclipse.ditto.services.connectivity.util.ConfigKeys;
import org.eclipse.ditto.services.utils.akka.LogUtil;
Expand Down Expand Up @@ -128,6 +132,14 @@ public final class ConnectionActor extends AbstractPersistentActor {

private static final String PUB_SUB_GROUP_PREFIX = "connection:";

/**
* Validator of all supported connections.
*/
private static final ConnectionValidator CONNECTION_VALIDATOR = ConnectionValidator.of(
RabbitMQValidator.newInstance(),
AmqpValidator.newInstance(),
MqttValidator.newInstance());

private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);

private final String connectionId;
Expand Down Expand Up @@ -162,8 +174,8 @@ private ConnectionActor(final String connectionId,
this.pubSubMediator = pubSubMediator;
this.conciergeForwarder = conciergeForwarder;
this.propsFactory = propsFactory;
final DittoConnectivityCommandValidator
dittoCommandValidator = new DittoConnectivityCommandValidator(propsFactory, conciergeForwarder);
final DittoConnectivityCommandValidator dittoCommandValidator =
new DittoConnectivityCommandValidator(propsFactory, conciergeForwarder, CONNECTION_VALIDATOR);
if (customCommandValidator != null) {
this.commandValidator =
new CompoundConnectivityCommandInterceptor(dittoCommandValidator, customCommandValidator);
Expand Down
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*/
package org.eclipse.ditto.services.connectivity.messaging.amqp;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionType;
import org.eclipse.ditto.services.connectivity.messaging.validation.ProtocolValidator;

/**
* Connection specification for Amqp protocol.
*/
@Immutable
public final class AmqpValidator implements ProtocolValidator {

private static final Collection<String> ACCEPTED_SCHEMES =
Collections.unmodifiableList(Arrays.asList("amqp", "amqps"));

/**
* Create a new {@code AmqpConnectionSpec}.
*
* @return a new instance.
*/
public static AmqpValidator newInstance() {
return new AmqpValidator();
}

@Override
public ConnectionType type() {
return ConnectionType.AMQP_10;
}

@Override
public void validate(final Connection connection, final DittoHeaders dittoHeaders) throws DittoRuntimeException {
ProtocolValidator.validateUriScheme(connection, dittoHeaders, ACCEPTED_SCHEMES, "AMQP 1.0");
}
}
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*/
package org.eclipse.ditto.services.connectivity.messaging.mqtt;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionType;
import org.eclipse.ditto.services.connectivity.messaging.validation.ProtocolValidator;

/**
* Connection specification for Mqtt protocol.
*/
@Immutable
public final class MqttValidator implements ProtocolValidator {

private static final Collection<String> ACCEPTED_SCHEMES =
Collections.unmodifiableList(Arrays.asList("tcp", "ssl", "ws", "wss"));

/**
* Create a new {@code MqttConnectionSpec}.
*
* @return a new instance.
*/
public static MqttValidator newInstance() {
return new MqttValidator();
}

@Override
public ConnectionType type() {
return ConnectionType.MQTT;
}

@Override
public void validate(final Connection connection, final DittoHeaders dittoHeaders) throws DittoRuntimeException {
ProtocolValidator.validateUriScheme(connection, dittoHeaders, ACCEPTED_SCHEMES, "MQTT 3.1.1");
// TODO: check specificConfig of sources and targets
}
}
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*/
package org.eclipse.ditto.services.connectivity.messaging.rabbitmq;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionType;
import org.eclipse.ditto.services.connectivity.messaging.validation.ProtocolValidator;

/**
* Connection specification for RabbitMQ protocol.
*/
@Immutable
public final class RabbitMQValidator implements ProtocolValidator {

private static final Collection<String> ACCEPTED_SCHEMES =
Collections.unmodifiableList(Arrays.asList("amqp", "amqps"));

/**
* Create a new {@code RabbitMQConnectionSpec}.
*
* @return a new instance.
*/
public static RabbitMQValidator newInstance() {
return new RabbitMQValidator();
}

@Override
public ConnectionType type() {
return ConnectionType.AMQP_091;
}

@Override
public void validate(final Connection connection, final DittoHeaders dittoHeaders) throws DittoRuntimeException {
ProtocolValidator.validateUriScheme(connection, dittoHeaders, ACCEPTED_SCHEMES, "AMQP 0.9.1");
}
}
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*
*/
package org.eclipse.ditto.services.connectivity.messaging.validation;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionType;

/**
* Validate a connection according to its type.
*/
@Immutable
public final class ConnectionValidator {

private final Map<ConnectionType, ProtocolValidator> specMap;

private ConnectionValidator(final ProtocolValidator... connectionSpecs) {
final Map<ConnectionType, ProtocolValidator> specMap = Arrays.stream(connectionSpecs)
.collect(Collectors.toMap(ProtocolValidator::type, Function.identity()));
this.specMap = Collections.unmodifiableMap(specMap);
}

/**
* Create a connection validator from connection specs.
*
* @param connectionSpecs specs of supported connection types.
* @return a connection validator.
*/
public static ConnectionValidator of(final ProtocolValidator... connectionSpecs) {
return new ConnectionValidator(connectionSpecs);
}

/**
* Check a connection for errors and throw them.
*
* @param connection the connection to validate.
* @param dittoHeaders headers of the command that triggered the connection validation.
* @throws org.eclipse.ditto.model.base.exceptions.DittoRuntimeException if the connection has errors.
* @throws java.lang.IllegalStateException if the connection type is not known.
*/
public void validate(final Connection connection, final DittoHeaders dittoHeaders)
throws DittoRuntimeException, IllegalStateException {
final ProtocolValidator spec = specMap.get(connection.getConnectionType());
if (spec != null) {
// throw error at validation site for clarity of stack trace
spec.validate(connection, dittoHeaders);
} else {
throw new IllegalStateException("Unknown connection type: " + connection);
}
}
}
Expand Up @@ -24,15 +24,18 @@
/**
* Checks if the given {@link ConnectivityCommand} is valid by trying to create the client actor props.
*/
public class DittoConnectivityCommandValidator implements ConnectivityCommandInterceptor {
public final class DittoConnectivityCommandValidator implements ConnectivityCommandInterceptor {

private final ClientActorPropsFactory propsFactory;
private final ActorRef conciergeForwarder;
private final ConnectionValidator connectionValidator;

public DittoConnectivityCommandValidator(
final ClientActorPropsFactory propsFactory, final ActorRef conciergeForwarder) {
final ClientActorPropsFactory propsFactory, final ActorRef conciergeForwarder,
final ConnectionValidator connectionValidator) {
this.propsFactory = propsFactory;
this.conciergeForwarder = conciergeForwarder;
this.connectionValidator = connectionValidator;
}

@Override
Expand All @@ -43,7 +46,11 @@ public void accept(final ConnectivityCommand<?> command) {
case ModifyConnection.TYPE:
final Connection connection = getConnectionFromCommand(command);
if (connection != null) {
connectionValidator.validate(connection, command.getDittoHeaders());
propsFactory.getActorPropsForType(connection, conciergeForwarder);
} else {
// should never happen
throw new IllegalStateException("connection=null in " + command);
}
break;
default: //nothing to validate for other commands
Expand Down
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*
*/
package org.eclipse.ditto.services.connectivity.messaging.validation;

import java.text.MessageFormat;
import java.util.Collection;
import java.util.stream.Collectors;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionType;
import org.eclipse.ditto.model.connectivity.ConnectionUriInvalidException;

/**
* Protocol-specific specification for {@link org.eclipse.ditto.model.connectivity.Connection} objects.
*/
public interface ProtocolValidator {

/**
* Type of connection for which this spec applies.
*
* @return the connection type.
*/
ConnectionType type();

/**
* Check a connection of the declared type for errors and throw them if any exists.
*
* @param connection the connection to check for errors.
* @param dittoHeaders headers of the command that triggered the connection validation.
* @throws DittoRuntimeException if the connection has errors.
*/
void validate(final Connection connection, final DittoHeaders dittoHeaders) throws DittoRuntimeException;

/**
* Check whether the URI scheme of the connection belongs to an accepted scheme.
*
* @param connection the connection to check.
* @param dittoHeaders headers of the command that triggered the connection validation.
* @param acceptedSchemes valid URI schemes for the connection type.
* @param protocolName protocol name of the connection type.
* @throws DittoRuntimeException if the URI scheme is not accepted.
*/
static void validateUriScheme(final Connection connection,
final DittoHeaders dittoHeaders,
final Collection<String> acceptedSchemes,
final String protocolName) throws DittoRuntimeException {

if (!acceptedSchemes.contains(connection.getProtocol())) {
final String message =
MessageFormat.format("The URI scheme ''{0}'' is not valid for {1}.", connection.getProtocol(),
protocolName);
final String description =
MessageFormat.format("Accepted URI schemes are: {0}",
acceptedSchemes.stream().collect(Collectors.joining(", ")));
throw ConnectionUriInvalidException.newBuilder(connection.getUri())
.message(message)
.description(description)
.dittoHeaders(dittoHeaders)
.build();
}
}
}

0 comments on commit 32881a6

Please sign in to comment.