Skip to content

Commit

Permalink
Added validation for source address and target header mappings
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed May 17, 2021
1 parent f1eb522 commit 2b173bd
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,20 @@ public void validate(final Connection connection, final DittoHeaders dittoHeader
@Override
protected void validateSource(final Source source, final DittoHeaders dittoHeaders,
final Supplier<String> sourceDescription) {
// TODO: kafka source - Check if this validation is correct and sufficient. It is copied from AMQP 1.0 right now.
source.getEnforcement().ifPresent(enforcement -> {
validateTemplate(enforcement.getInput(), dittoHeaders, PlaceholderFactory.newHeadersPlaceholder());
enforcement.getFilters().forEach(filterTemplate ->
validateTemplate(filterTemplate, dittoHeaders, newThingPlaceholder(), newPolicyPlaceholder(),
newEntityPlaceholder(), newFeaturePlaceholder()));
});
validateHeaderMapping(source.getHeaderMapping(), dittoHeaders);

final String placeholderReplacement = UUID.randomUUID().toString();
source.getAddresses().forEach(address -> {
final String addressWithoutPlaceholders = validateTemplateAndReplace(address, dittoHeaders,
placeholderReplacement, Resolvers.getPlaceholders());
validateSourceAddress(addressWithoutPlaceholders, dittoHeaders, placeholderReplacement);
});
}

@Override
Expand All @@ -111,10 +117,16 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade
final String addressWithoutPlaceholders = validateTemplateAndReplace(target.getAddress(), dittoHeaders,
placeholderReplacement, Resolvers.getPlaceholders());

validateAddress(addressWithoutPlaceholders, dittoHeaders, placeholderReplacement);
validateTargetAddress(addressWithoutPlaceholders, dittoHeaders, placeholderReplacement);
validateHeaderMapping(target.getHeaderMapping(), dittoHeaders);
}

private static void validateSourceAddress(final String address, final DittoHeaders dittoHeaders,
final String placeholderReplacement) {
validateTopic(address, dittoHeaders, placeholderReplacement);
}

private static void validateAddress(final String address, final DittoHeaders dittoHeaders,
private static void validateTargetAddress(final String address, final DittoHeaders dittoHeaders,
final String placeholderReplacement) {

if (KafkaPublishTarget.containsKey(address)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.Topic;
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import akka.actor.ActorSystem;
Expand Down Expand Up @@ -78,13 +76,27 @@ public void testImmutability() {
}

@Test
@Ignore
public void testSourcesAreInvalid() {
// TODO: kafka source - Test is no longer correct. Test validation instead.
final Source source = ConnectivityModelFactory.newSource(AUTHORIZATION_CONTEXT, "any");
public void testValidSourceAddress() {
final DittoHeaders emptyDittoHeaders = DittoHeaders.empty();
underTest.validate(getConnectionWithSource("events"), emptyDittoHeaders, actorSystem);
underTest.validate(getConnectionWithSource("events.with.dots"), emptyDittoHeaders, actorSystem);
underTest.validate(getConnectionWithSource("events_with_underscores"), emptyDittoHeaders, actorSystem);
underTest.validate(getConnectionWithSource("events-with-dashes"), emptyDittoHeaders, actorSystem);
}

assertThatExceptionOfType(ConnectionConfigurationInvalidException.class)
.isThrownBy(() -> underTest.validateSource(source, DittoHeaders.empty(), () -> ""));
@Test
public void testInvalidSourceAddress() {
verifyConnectionConfigurationInvalidExceptionIsThrown(getConnectionWithSource(""));
verifyConnectionConfigurationInvalidExceptionIsThrown(getConnectionWithSource("events/"));
verifyConnectionConfigurationInvalidExceptionIsThrown(getConnectionWithSource("ditto#"));
verifyConnectionConfigurationInvalidExceptionIsThrown(getConnectionWithSource("ditto#notANumber"));
verifyConnectionConfigurationInvalidExceptionIsThrown(getConnectionWithSource("ditto*a"));
verifyConnectionConfigurationInvalidExceptionIsThrown(getConnectionWithSource("ditto\\"));
verifyConnectionConfigurationInvalidExceptionIsThrown(getConnectionWithSource("ditto/{{thing:id}}"));
verifyConnectionConfigurationInvalidExceptionIsThrown(
getConnectionWithSource("{{thing:namespace}}/{{thing:name}}"));
verifyConnectionConfigurationInvalidExceptionIsThrown(getConnectionWithSource("events#{{topic:full}}"));
verifyConnectionConfigurationInvalidExceptionIsThrown(getConnectionWithSource("ditto/{{header:x}}"));
}

@Test
Expand Down Expand Up @@ -139,6 +151,18 @@ private static Connection getConnectionWithTarget(final String target) {
.build();
}

private static Connection getConnectionWithSource(final String sourceAddress) {
return ConnectivityModelFactory.newConnectionBuilder(CONNECTION_ID, ConnectionType.KAFKA,
ConnectivityStatus.OPEN, "tcp://localhost:1883")
.sources(singletonList(ConnectivityModelFactory.newSourceBuilder()
.address(sourceAddress)
.authorizationContext(AUTHORIZATION_CONTEXT)
.qos(1)
.build()))
.specificConfig(defaultSpecificConfig)
.build();
}

private static Connection getConnectionWithBootstrapServers(final String bootstrapServers) {
final Map<String, String> specificConfig = new HashMap<>();
specificConfig.put("bootstrapServers", bootstrapServers);
Expand Down

0 comments on commit 2b173bd

Please sign in to comment.