Skip to content

Commit

Permalink
Refactoring of ThingsSseRoute and WebsocketRoute:
Browse files Browse the repository at this point in the history
* `ThingsSseRoute` no longer extends `AbstractRoute` to get rid of actually unused fields and complexity.
* Adjusted code formatting and extracted methods of `ThingsSseRoute` to improve readability.
* Renamed `WebsocketRoute` to `WebSocketRoute` to comply to the correct notation for 'WebSocket'.
* Renamed `WebsocketRouteBuilder` to `WebSocketRouteBuilder` to comply to the correct notation for 'WebSocket'.
* Introduced static factory methods for `ThingsSseRoute` and `WebSocketRoute`.
* Introduced `WebSocketAuthorizationEnforcer` to optionally enforce authorization before establishing a WebSocket connection.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch-si.com>
  • Loading branch information
Juergen Fickel committed Oct 11, 2019
1 parent 735a181 commit e9783ba
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected AbstractRoute(final ActorRef proxyActor,
* @param fieldsString the fields as string.
* @return the Optional JsonFieldSelector
*/
protected static Optional<JsonFieldSelector> calculateSelectedFields(final Optional<String> fieldsString) {
public static Optional<JsonFieldSelector> calculateSelectedFields(final Optional<String> fieldsString) {
return fieldsString.map(fs -> JsonFactory.newFieldSelector(fs, JSON_FIELD_SELECTOR_PARSE_OPTIONS));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import org.eclipse.ditto.services.gateway.endpoints.routes.status.OverallStatusRoute;
import org.eclipse.ditto.services.gateway.endpoints.routes.things.ThingsRoute;
import org.eclipse.ditto.services.gateway.endpoints.routes.thingsearch.ThingSearchRoute;
import org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebsocketRouteBuilder;
import org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebSocketRouteBuilder;
import org.eclipse.ditto.services.gateway.endpoints.utils.DittoRejectionHandlerFactory;
import org.eclipse.ditto.services.utils.health.routes.StatusRoute;
import org.eclipse.ditto.services.utils.protocol.ProtocolAdapterProvider;
Expand Down Expand Up @@ -111,7 +111,7 @@ public final class RootRoute {
private final SseRouteBuilder sseThingsRouteBuilder;
private final ThingsRoute thingsRoute;
private final ThingSearchRoute thingSearchRoute;
private final WebsocketRouteBuilder websocketRouteBuilder;
private final WebSocketRouteBuilder websocketRouteBuilder;
private final StatsRoute statsRoute;

private final CustomApiRoutesProvider customApiRoutesProvider;
Expand Down Expand Up @@ -477,7 +477,7 @@ private static final class Builder implements RootRouteBuilder {
private SseRouteBuilder sseThingsRouteBuilder;
private ThingsRoute thingsRoute;
private ThingSearchRoute thingSearchRoute;
private WebsocketRouteBuilder websocketRouteBuilder;
private WebSocketRouteBuilder websocketRouteBuilder;
private StatsRoute statsRoute;

private CustomApiRoutesProvider customApiRoutesProvider;
Expand Down Expand Up @@ -545,7 +545,7 @@ public RootRouteBuilder thingSearchRoute(final ThingSearchRoute route) {
}

@Override
public RootRouteBuilder websocketRoute(final WebsocketRouteBuilder websocketRouteBuilder) {
public RootRouteBuilder websocketRoute(final WebSocketRouteBuilder websocketRouteBuilder) {
this.websocketRouteBuilder = websocketRouteBuilder;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.eclipse.ditto.services.gateway.endpoints.routes.status.OverallStatusRoute;
import org.eclipse.ditto.services.gateway.endpoints.routes.things.ThingsRoute;
import org.eclipse.ditto.services.gateway.endpoints.routes.thingsearch.ThingSearchRoute;
import org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebsocketRouteBuilder;
import org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebSocketRouteBuilder;
import org.eclipse.ditto.services.utils.health.routes.StatusRoute;
import org.eclipse.ditto.services.utils.protocol.ProtocolAdapterProvider;

Expand Down Expand Up @@ -108,7 +108,7 @@ public interface RootRouteBuilder {
* @param websocketRouteBuilder the route to set.
* @return the Builder to allow method chaining.
*/
RootRouteBuilder websocketRoute(WebsocketRouteBuilder websocketRouteBuilder);
RootRouteBuilder websocketRoute(WebSocketRouteBuilder websocketRouteBuilder);

/**
* Sets the stats sub-route.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
*/
package org.eclipse.ditto.services.gateway.endpoints.routes.things;

import static akka.http.javadsl.server.Directives.parameterOptional;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.services.gateway.endpoints.directives.CustomPathMatchers.mergeDoubleSlashes;

import java.time.Duration;
import java.util.Arrays;
Expand All @@ -24,6 +24,7 @@
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.annotation.Nullable;
Expand All @@ -39,8 +40,7 @@
import org.eclipse.ditto.model.query.things.ModelBasedThingsFieldExpressionFactory;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.services.gateway.endpoints.config.HttpConfig;
import org.eclipse.ditto.services.gateway.endpoints.directives.CustomPathMatchers;
import org.eclipse.ditto.services.gateway.endpoints.routes.AbstractRoute;
import org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseRouteBuilder;
import org.eclipse.ditto.services.gateway.endpoints.utils.EventSniffer;
Expand All @@ -55,13 +55,13 @@

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.http.javadsl.marshalling.sse.EventStreamMarshalling;
import akka.http.javadsl.model.HttpHeader;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.MediaTypes;
import akka.http.javadsl.model.headers.Accept;
import akka.http.javadsl.model.sse.ServerSentEvent;
import akka.http.javadsl.server.Directives;
import akka.http.javadsl.server.RequestContext;
import akka.http.javadsl.server.Route;
import akka.japi.JavaPartialFunction;
Expand All @@ -72,7 +72,7 @@
/**
* Builder for creating Akka HTTP routes for SSE (Server Sent Events) {@code /things} routes.
*/
public final class ThingsSseRoute extends AbstractRoute implements SseRouteBuilder {
public final class ThingsSseRoute implements SseRouteBuilder {

private static final String PATH_THINGS = "things";

Expand All @@ -81,52 +81,37 @@ public final class ThingsSseRoute extends AbstractRoute implements SseRouteBuild
private static final String PARAM_FILTER = "filter";
private static final String PARAM_NAMESPACES = "namespaces";

private final HttpConfig httpConfig;
private final ActorRef streamingActor;
private final EventSniffer<ServerSentEvent> eventSniffer;
private final QueryFilterCriteriaFactory queryFilterCriteriaFactory;
private final HeaderTranslator headerTranslator;

private ThingsSseRoute(final ActorRef proxyActor,
final ActorSystem actorSystem,
final HttpConfig httpConfig,
final ActorRef streamingActor,
final EventSniffer<ServerSentEvent> eventSniffer,
final QueryFilterCriteriaFactory queryFilterCriteriaFactory,
final HeaderTranslator headerTranslator) {

super(proxyActor, actorSystem, httpConfig, headerTranslator);
this.httpConfig = httpConfig;

private ThingsSseRoute(final ActorRef streamingActor, final EventSniffer<ServerSentEvent> eventSniffer,
final QueryFilterCriteriaFactory queryFilterCriteriaFactory) {

this.streamingActor = streamingActor;
this.eventSniffer = checkNotNull(eventSniffer, "eventSniffer");
this.eventSniffer = eventSniffer;
this.queryFilterCriteriaFactory = queryFilterCriteriaFactory;
this.headerTranslator = headerTranslator;
}

/**
* Constructs the SSE - ServerSentEvents supporting {@code /things} route builder.
* Returns an instance of this class.
*
* @param proxyActor an actor selection of the command delegating actor.
* @param actorSystem the ActorSystem to use.
* @param httpConfig the configuration settings of the Gateway service's HTTP endpoint.
* @param headerTranslator translates headers from external sources or to external sources.
* @throws NullPointerException if any argument is {@code null}.
* @param streamingActor is used for actual event streaming.
* @return the instance.
* @throws NullPointerException if {@code streamingActor} is {@code null}.
*/
public ThingsSseRoute(final ActorRef proxyActor,
final ActorSystem actorSystem,
final HttpConfig httpConfig,
final ActorRef streamingActor,
final HeaderTranslator headerTranslator) {

this(proxyActor, actorSystem, httpConfig, streamingActor, EventSniffer.noOp(),
new QueryFilterCriteriaFactory(new CriteriaFactoryImpl(), new ModelBasedThingsFieldExpressionFactory()),
headerTranslator);
public static ThingsSseRoute getInstance(final ActorRef streamingActor) {
checkNotNull(streamingActor, "streamingActor");
final QueryFilterCriteriaFactory queryFilterCriteriaFactory =
new QueryFilterCriteriaFactory(new CriteriaFactoryImpl(), new ModelBasedThingsFieldExpressionFactory());

return new ThingsSseRoute(streamingActor, EventSniffer.noOp(), queryFilterCriteriaFactory);
}

@Override
public ThingsSseRoute withEventSniffer(final EventSniffer<ServerSentEvent> eventSniffer) {
return new ThingsSseRoute(proxyActor, actorSystem, httpConfig, streamingActor, eventSniffer,
queryFilterCriteriaFactory, headerTranslator);
return new ThingsSseRoute(streamingActor, checkNotNull(eventSniffer, "eventSniffer"),
queryFilterCriteriaFactory);
}

/**
Expand All @@ -137,44 +122,45 @@ public ThingsSseRoute withEventSniffer(final EventSniffer<ServerSentEvent> event
@SuppressWarnings("squid:S1172") // allow unused ctx-Param in order to have a consistent route-"interface"
@Override
public Route build(final RequestContext ctx, final Supplier<DittoHeaders> dittoHeadersSupplier) {
return rawPathPrefix(mergeDoubleSlashes().concat(PATH_THINGS), () ->
pathEndOrSingleSlash(() ->
get(() ->
headerValuePF(AcceptHeaderExtractor.INSTANCE, accept ->
doBuildThingsSseRoute(dittoHeadersSupplier.get())
)
)
)
);
return Directives.rawPathPrefix(CustomPathMatchers.mergeDoubleSlashes().concat(PATH_THINGS),
() -> Directives.pathEndOrSingleSlash(
() -> Directives.get(
() -> Directives.headerValuePF(AcceptHeaderExtractor.INSTANCE,
accept -> buildThingsSseRoute(ctx, dittoHeadersSupplier.get())))));
}

private Route doBuildThingsSseRoute(final DittoHeaders dittoHeaders) {
return parameterOptional(ThingsParameter.FIELDS.toString(), fieldsString ->
parameterOptional(ThingsParameter.IDS.toString(),
idsString -> // "ids" is optional for SSE
parameterOptional(PARAM_NAMESPACES, namespacesString ->
parameterOptional(PARAM_FILTER, filterString ->
createSseRoute(dittoHeaders,
calculateSelectedFields(
fieldsString).orElse(null),
idsString.map(ThingsSseRoute::splitThingIdString)
.orElseGet(Collections::emptyList),
namespacesString.map(str -> str.split(","))
.map(Arrays::asList)
.orElse(Collections.emptyList()),
filterString.orElse(null))
)
)
)
);
private Route buildThingsSseRoute(final RequestContext ctx, final DittoHeaders dittoHeaders) {
return parameterOptional(ThingsParameter.FIELDS.toString(),
fieldsString -> parameterOptional(ThingsParameter.IDS.toString(),
idsString -> parameterOptional(PARAM_NAMESPACES,
namespacesString -> parameterOptional(PARAM_FILTER,
filterString -> createSseRoute(ctx.getRequest(),
dittoHeaders,
getFieldSelector(fieldsString),
getThingIds(idsString),
getNamespaces(namespacesString),
filterString.orElse(null))))));
}

private static List<ThingId> splitThingIdString(final String thingIdString) {
return Arrays.stream(thingIdString.split(","))
@Nullable
private static JsonFieldSelector getFieldSelector(final Optional<String> fieldsString) {
return AbstractRoute.calculateSelectedFields(fieldsString).orElse(null);
}

private static List<ThingId> getThingIds(final Optional<String> thingIdString) {
return thingIdString.map(s -> s.split(","))
.map(Arrays::stream)
.orElseGet(Stream::empty)
.map(ThingId::of)
.collect(Collectors.toList());
}

private static List<String> getNamespaces(final Optional<String> namespacesString) {
return namespacesString.map(s -> s.split(","))
.map(Arrays::asList)
.orElseGet(Collections::emptyList);
}

@SuppressWarnings("squid:CommentedOutCodeLine")
// Javascript example e.g. in Chrome console:
/*
Expand All @@ -183,27 +169,17 @@ private static List<ThingId> splitThingIdString(final String thingIdString) {
console.log(e.data);
}, false);
*/
private Route createSseRoute(final DittoHeaders dittoHeaders,
final JsonFieldSelector fieldSelector,
final Collection<ThingId> targetThingIds,
final List<String> namespaces,
@Nullable final String filterString) {

return extractRequest(request ->
createSseRoute(request, dittoHeaders, fieldSelector, targetThingIds, namespaces, filterString));
}

private Route createSseRoute(final HttpRequest request,
final DittoHeaders dittoHeaders,
final JsonFieldSelector fieldSelector,
@Nullable final JsonFieldSelector fieldSelector,
final Collection<ThingId> targetThingIds,
final List<String> namespaces,
@Nullable final String filterString) {

final String connectionCorrelationId = dittoHeaders.getCorrelationId()
.orElseGet(() -> UUID.randomUUID().toString());
final JsonSchemaVersion jsonSchemaVersion =
dittoHeaders.getSchemaVersion().orElse(dittoHeaders.getImplementedSchemaVersion());
.orElseGet(() -> String.valueOf(UUID.randomUUID()));
final JsonSchemaVersion jsonSchemaVersion = dittoHeaders.getSchemaVersion()
.orElse(dittoHeaders.getImplementedSchemaVersion());

final Counter messageCounter = DittoMetrics.counter("streaming_messages")
.tag("type", "sse")
Expand Down Expand Up @@ -251,7 +227,7 @@ private Route createSseRoute(final HttpRequest request,
.viaMat(eventSniffer.toAsyncFlow(request), Keep.left()) // sniffer shouldn't sniff heartbeats
.keepAlive(Duration.ofSeconds(1), ServerSentEvent::heartbeat);

return completeOK(sseSource, EventStreamMarshalling.toEventStream());
return Directives.completeOK(sseSource, EventStreamMarshalling.toEventStream());
}

private static String namespaceFromId(final ThingEvent thingEvent) {
Expand All @@ -263,6 +239,7 @@ private static final class AcceptHeaderExtractor extends JavaPartialFunction<Htt
private static final AcceptHeaderExtractor INSTANCE = new AcceptHeaderExtractor();

private AcceptHeaderExtractor() {
super();
}

@Override
Expand All @@ -271,7 +248,7 @@ public Accept apply(final HttpHeader x, final boolean isCheck) {
if (isCheck) {
return null;
} else if (matchesTextEventStream((Accept) x)) {
return ((Accept) x);
return (Accept) x;
}
}
throw noMatch();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.gateway.endpoints.routes.websocket;

import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.headers.DittoHeaders;

import akka.http.javadsl.model.HttpRequest;

/**
* Enforces authorization in order to establish a WebSocket connection.
* If the authorization check is successful nothing will happen, else a
* {@link org.eclipse.ditto.model.base.exceptions.DittoRuntimeException DittoRuntimeException} is thrown.
*/
public interface WebSocketAuthorizationEnforcer {

/**
* Ensures that the establishment of a WebSocket connection is authorized for the given arguments.
*
* @param authorizationContext
* @param request
* @param dittoHeaders
* @throws NullPointerException if any argument is {@code null}.
* @throws org.eclipse.ditto.model.base.exceptions.DittoRuntimeException if
*/
void checkAuthorization(HttpRequest request, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders);

}
Loading

0 comments on commit e9783ba

Please sign in to comment.