Skip to content

Commit

Permalink
Validate extra field declaration when subscribing via SSE/Websocket
Browse files Browse the repository at this point in the history
or when a Connection is created

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed Mar 11, 2022
1 parent b142cd8 commit 9b92da0
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 17 deletions.
Expand Up @@ -71,6 +71,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade
final Supplier<String> targetDescription) {
validateHeaderMapping(target.getHeaderMapping(), dittoHeaders);
validateTemplate(target.getAddress(), dittoHeaders, Resolvers.getPlaceholders());
validateExtraFields(target);
}

@Override
Expand Down
Expand Up @@ -106,6 +106,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade
validateHeaderMapping(target.getHeaderMapping(), dittoHeaders);
validateTemplate(target.getAddress(), dittoHeaders, Resolvers.getPlaceholders());
validateTargetAddress(target.getAddress(), dittoHeaders, targetDescription);
validateExtraFields(target);
}

private static void validateTargetAddress(final String targetAddress,
Expand Down
Expand Up @@ -126,6 +126,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade

validateTargetAddress(addressWithoutPlaceholders, dittoHeaders, placeholderReplacement);
validateHeaderMapping(target.getHeaderMapping(), dittoHeaders);
validateExtraFields(target);
}

private static void validateSourceAddress(final String address, final DittoHeaders dittoHeaders,
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.Enforcement;
import org.eclipse.ditto.connectivity.model.FilteredTopic;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.connectivity.service.config.MqttConfig;
Expand Down Expand Up @@ -94,6 +95,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade

validateTargetQoS(qos.get(), dittoHeaders, targetDescription);
validateTemplate(target.getAddress(), dittoHeaders, Resolvers.getPlaceholders());
validateExtraFields(target);
}

/**
Expand Down
Expand Up @@ -71,6 +71,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade
final Supplier<String> targetDescription) {
validateHeaderMapping(target.getHeaderMapping(), dittoHeaders);
validateTemplate(target.getAddress(), dittoHeaders, Resolvers.getPlaceholders());
validateExtraFields(target);
}

@Override
Expand Down
Expand Up @@ -17,13 +17,17 @@
import java.util.List;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.common.Placeholders;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.ConnectionUriInvalidException;
import org.eclipse.ditto.connectivity.model.FilteredTopic;
import org.eclipse.ditto.connectivity.model.HeaderMapping;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.Target;
Expand All @@ -33,6 +37,7 @@
import org.eclipse.ditto.connectivity.service.mapping.MessageMapperFactory;
import org.eclipse.ditto.connectivity.service.mapping.MessageMapperRegistry;
import org.eclipse.ditto.connectivity.service.messaging.Resolvers;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.placeholders.Placeholder;
import org.eclipse.ditto.placeholders.PlaceholderFilter;

Expand Down Expand Up @@ -188,6 +193,21 @@ protected void validateHeaderMapping(final HeaderMapping headerMapping, final Di
-> validateTemplate(value, dittoHeaders, Resolvers.getPlaceholders()));
}

protected static void validateExtraFields(final Target target) {
target.getTopics().stream().map(FilteredTopic::getExtraFields)
.forEach(extraFields -> extraFields.ifPresent(AbstractProtocolValidator::validateExtraFields));
}

private static void validateExtraFields(@Nullable final JsonFieldSelector extraFields) {
if (extraFields == null) {
return;
}
final String fieldSelector = extraFields.toString();
if (Placeholders.containsAnyPlaceholder(fieldSelector)) {
PlaceholderFilter.validate(fieldSelector, Resolvers.getPlaceholders());
}
}

/**
* Validates the passed in {@code target} e.g. by validating its {@code address} and {@code headerMapping}
* for valid placeholder usage.
Expand Down
Expand Up @@ -447,8 +447,12 @@ private Flow<Message, String, NotUsed> getStrictifyFlow(final HttpRequest reques

return Filter.multiplexByEither(
cmdString -> {
final Optional<StreamControlMessage> streamControlMessage =
protocolMessageExtractor.apply(cmdString);
final Optional<StreamControlMessage> streamControlMessage;
try {
streamControlMessage = protocolMessageExtractor.apply(cmdString);
} catch (final DittoRuntimeException dre) {
return Left.apply(dre);
}
Either<DittoRuntimeException, Either<StreamControlMessage, Signal<?>>> result;
if (streamControlMessage.isPresent()) {
result = Right.apply(Left.apply(streamControlMessage.get()));
Expand Down Expand Up @@ -540,13 +544,13 @@ private Pair<Connect, Flow<DittoRuntimeException, Message, NotUsed>> createOutgo
SupervisedStream.sourceQueue(websocketConfig.getPublisherBackpressureBufferSize());

final Source<SessionedJsonifiable, Connect> sourceToPreMaterialize = publisherSource.mapMaterializedValue(
withQueue -> {
webSocketSupervisor.supervise(withQueue.getSupervisedStream(), connectionCorrelationId,
additionalHeaders);
return new Connect(withQueue.getSourceQueue(), connectionCorrelationId, STREAMING_TYPE_WS, version,
optJsonWebToken.map(JsonWebToken::getExpirationTime).orElse(null),
readDeclaredAcknowledgementLabels(additionalHeaders), connectionAuthContext);
})
withQueue -> {
webSocketSupervisor.supervise(withQueue.getSupervisedStream(), connectionCorrelationId,
additionalHeaders);
return new Connect(withQueue.getSourceQueue(), connectionCorrelationId, STREAMING_TYPE_WS, version,
optJsonWebToken.map(JsonWebToken::getExpirationTime).orElse(null),
readDeclaredAcknowledgementLabels(additionalHeaders), connectionAuthContext);
})
.recoverWithRetries(1, new PFBuilder<Throwable, Source<SessionedJsonifiable, NotUsed>>()
.match(GatewayWebsocketSessionExpiredException.class,
ex -> {
Expand Down Expand Up @@ -750,10 +754,10 @@ private void issuePotentialWeakAcknowledgements(final SessionedJsonifiable sessi
final ActorRef streamingSessionActor = session.getStreamingSessionActor();
WithEntityId.getEntityIdOfType(EntityId.class, jsonifiable).ifPresent(entityId ->
dittoHeaders.getAcknowledgementRequests()
.stream()
.map(request -> weakAck(request.getLabel(), entityId, dittoHeaders))
.map(IncomingSignal::of)
.forEach(weakAck -> streamingSessionActor.tell(weakAck, ActorRef.noSender()))
.stream()
.map(request -> weakAck(request.getLabel(), entityId, dittoHeaders))
.map(IncomingSignal::of)
.forEach(weakAck -> streamingSessionActor.tell(weakAck, ActorRef.noSender()))
);
});
}
Expand Down Expand Up @@ -808,9 +812,9 @@ private static boolean matchesFilter(final SessionedJsonifiable sessionedJsonifi
return sessionedJsonifiable.getSession()
.filter(session -> jsonifiable instanceof Signal)
.map(session -> {
// evaluate to false if filter is present but does not match or has insufficient info to match
final Signal<?> signal = (Signal<?>) jsonifiable;
return session.matchesFilter(session.mergeThingWithExtra(signal, extra), signal);
// evaluate to false if filter is present but does not match or has insufficient info to match
final Signal<?> signal = (Signal<?>) jsonifiable;
return session.matchesFilter(session.mergeThingWithExtra(signal, extra), signal);
})
.orElse(true);
}
Expand Down
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.gateway.service.streaming;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.placeholders.PlaceholderFactory.newHeadersPlaceholder;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -24,7 +25,16 @@
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.Placeholders;
import org.eclipse.ditto.edge.api.placeholders.EntityIdPlaceholder;
import org.eclipse.ditto.edge.api.placeholders.FeaturePlaceholder;
import org.eclipse.ditto.edge.api.placeholders.RequestPlaceholder;
import org.eclipse.ditto.edge.api.placeholders.ThingPlaceholder;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.placeholders.PlaceholderFilter;
import org.eclipse.ditto.placeholders.TimePlaceholder;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;
import org.eclipse.ditto.things.model.ThingFieldSelector;

/**
Expand All @@ -47,10 +57,30 @@ private StartStreaming(final StartStreamingBuilder builder) {
@Nullable final Collection<String> namespacesFromBuilder = builder.namespaces;
namespaces = null != namespacesFromBuilder ? List.copyOf(namespacesFromBuilder) : Collections.emptyList();
filter = Objects.toString(builder.filter, null);
extraFields = builder.extraFields;
extraFields = validateExtraFields(builder.extraFields);
correlationId = builder.correlationId;
}

@Nullable
private static ThingFieldSelector validateExtraFields(@Nullable final ThingFieldSelector extraFields) {
if (extraFields == null) {
return null;
}
final String fieldSelector = extraFields.toString();
if (Placeholders.containsAnyPlaceholder(fieldSelector)) {
PlaceholderFilter.validate(fieldSelector,
newHeadersPlaceholder(),
EntityIdPlaceholder.getInstance(),
ThingPlaceholder.getInstance(),
FeaturePlaceholder.getInstance(),
TopicPathPlaceholder.getInstance(),
ResourcePlaceholder.getInstance(),
TimePlaceholder.getInstance(),
RequestPlaceholder.getInstance());
}
return extraFields;
}

/**
* Returns a mutable builder with a fluent API for creating an instance of StartStreaming.
*
Expand Down

0 comments on commit 9b92da0

Please sign in to comment.