Skip to content

Commit

Permalink
Allow custom configuration for the StreamingAuthorizationEnforcer ext…
Browse files Browse the repository at this point in the history
…ension

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 12, 2022
1 parent b396ba8 commit 1a9064b
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;

import com.typesafe.config.Config;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
Expand Down Expand Up @@ -151,9 +153,11 @@ private ThingsSseRouteBuilder(final ActorSystem actorSystem,
this.streamingConfig = streamingConfig;
this.queryFilterCriteriaFactory = queryFilterCriteriaFactory;
this.pubSubMediator = pubSubMediator;
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
final Config config = actorSystem.settings().config();
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(config);
eventSniffer = SseEventSniffer.get(actorSystem, dittoExtensionsConfig);
sseAuthorizationEnforcer = StreamingAuthorizationEnforcer.sse(actorSystem);
final var sseConfig = ScopedConfig.getOrEmpty(config, "ditto.gateway.streaming.sse");
sseAuthorizationEnforcer = StreamingAuthorizationEnforcer.get(actorSystem, sseConfig);
sseConnectionSupervisor = SseConnectionSupervisor.get(actorSystem);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
import org.eclipse.ditto.thingsearch.model.signals.commands.SearchErrorResponse;
import org.slf4j.Logger;

import com.typesafe.config.Config;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -186,10 +188,12 @@ private WebSocketRoute(final ActorSystem actorSystem,
this.streamingActor = checkNotNull(streamingActor, "streamingActor");
this.streamingConfig = streamingConfig;

final var dittoExtensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
final var config = actorSystem.settings().config();
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(config);
incomingMessageSniffer = IncomingWebSocketEventSniffer.get(actorSystem);
outgoingMessageSniffer = OutgoingWebSocketEventSniffer.get(actorSystem);
authorizationEnforcer = StreamingAuthorizationEnforcer.ws(actorSystem);
final var websocketConfig = ScopedConfig.getOrEmpty(config, "ditto.gateway.streaming.websocket");
authorizationEnforcer = StreamingAuthorizationEnforcer.get(actorSystem, websocketConfig);
webSocketSupervisor = WebSocketSupervisor.get(actorSystem);
webSocketConfigProvider = WebSocketConfigProvider.get(actorSystem, dittoExtensionsConfig);
signalEnrichmentProvider = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.base.service.RootChildActorStarter;

import com.typesafe.config.Config;

import akka.actor.ActorSystem;
import akka.http.javadsl.server.RequestContext;
Expand All @@ -39,20 +43,6 @@ public interface StreamingAuthorizationEnforcer extends DittoExtensionPoint {
*/
CompletionStage<DittoHeaders> checkAuthorization(RequestContext requestContext, DittoHeaders dittoHeaders);

/**
* Loads the implementation of {@code SseAuthorizationEnforcer} which is configured for the
* {@code ActorSystem}.
*
* @param actorSystem the actorSystem in which the {@code SseAuthorizationEnforcer} should be loaded.
* @return the {@code SseAuthorizationEnforcer} implementation.
* @throws NullPointerException if {@code actorSystem} is {@code null}.
* @since 3.0.0
*/
static StreamingAuthorizationEnforcer sse(final ActorSystem actorSystem) {
checkNotNull(actorSystem, "actorSystem");
return SseExtensionId.INSTANCE.get(actorSystem);
}

/**
* Loads the implementation of {@code WebSocketAuthorizationEnforcer} which is configured for the
* {@code ActorSystem}.
Expand All @@ -62,34 +52,26 @@ static StreamingAuthorizationEnforcer sse(final ActorSystem actorSystem) {
* @throws NullPointerException if {@code actorSystem} is {@code null}.
* @since 3.0.0
*/
static StreamingAuthorizationEnforcer ws(final ActorSystem actorSystem) {
static StreamingAuthorizationEnforcer get(final ActorSystem actorSystem, final Config config) {
checkNotNull(actorSystem, "actorSystem");
return WsExtensionId.INSTANCE.get(actorSystem);
checkNotNull(config, "config");
final var extensionIdConfig = ExtensionId.computeConfig(config);
return DittoExtensionIds.get(actorSystem)
.computeIfAbsent(extensionIdConfig, ExtensionId::new)
.get(actorSystem);
}

final class WsExtensionId extends DittoExtensionPoint.ExtensionId<StreamingAuthorizationEnforcer> {
final class ExtensionId extends DittoExtensionPoint.ExtensionId<StreamingAuthorizationEnforcer> {

private static final String CONFIG_PATH = "ditto.gateway.streaming.websocket.authorization-enforcer";
private static final WsExtensionId INSTANCE = new WsExtensionId(StreamingAuthorizationEnforcer.class);
private static final String CONFIG_KEY = "streaming-authorization-enforcer";
private static final String CONFIG_PATH = "ditto.extensions." + CONFIG_KEY;

private WsExtensionId(final Class<StreamingAuthorizationEnforcer> parentClass) {
super(parentClass);
private ExtensionId(final ExtensionIdConfig<StreamingAuthorizationEnforcer> extensionIdConfig) {
super(extensionIdConfig);
}

@Override
protected String getConfigPath() {
return CONFIG_PATH;
}

}

final class SseExtensionId extends DittoExtensionPoint.ExtensionId<StreamingAuthorizationEnforcer> {

private static final String CONFIG_PATH = "ditto.gateway.streaming.sse.authorization-enforcer";
private static final SseExtensionId INSTANCE = new SseExtensionId(StreamingAuthorizationEnforcer.class);

private SseExtensionId(final Class<StreamingAuthorizationEnforcer> parentClass) {
super(parentClass);
static ExtensionIdConfig<StreamingAuthorizationEnforcer> computeConfig(final Config config) {
return ExtensionIdConfig.of(StreamingAuthorizationEnforcer.class, config, CONFIG_KEY);
}

@Override
Expand Down
7 changes: 3 additions & 4 deletions gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ditto {
http-request-actor-props-factory = org.eclipse.ditto.gateway.service.endpoints.actors.DefaultHttpRequestActorPropsFactory
# The listener for SSE events
sse-event-sniffer = org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseEventSniffer
streaming-authorization-enforcer = org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer
}

service-name = "gateway"
Expand Down Expand Up @@ -157,8 +158,7 @@ ditto {
}

# The provider enforcer for WebSocket connections
authorization-enforcer = "org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer"
authorization-enforcer = ${?GATEWAY_WEBSOCKET_AUTHORIZATION_ENFORCER}
streaming-authorization-enforcer = "org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer"

# The supervisor for WebSocket connections
connection-supervisor = "org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpWebSocketSupervisor"
Expand Down Expand Up @@ -189,8 +189,7 @@ ditto {
}

# The provider enforcer for SSE connections
authorization-enforcer = "org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer"
authorization-enforcer = ${?GATEWAY_SSE_AUTHORIZATION_ENFORCER}
streaming-authorization-enforcer = "org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer"

# The supervisor for SSE connections
connection-supervisor = "org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseConnectionSupervisor"
Expand Down
10 changes: 5 additions & 5 deletions gateway/service/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ ditto {

streaming {
sse {
authorization-enforcer = "org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer"
streaming-authorization-enforcer = org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer
connection-supervisor = "org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseConnectionSupervisor"
}
websocket {
Expand All @@ -169,10 +169,10 @@ ditto {
# additionally CommandResponses and Events are dropped if this size is reached
publisher.backpressure-buffer-size = 200
# The provider enforcer for WebSocket connections
authorization-enforcer = "org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer"
connection-supervisor = "org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpWebSocketSupervisor"
incoming-event-sniffer = "org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpIncomingWebSocketEventSniffer"
outgoing-event-sniffer = "org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpOutgoingWebSocketEventSniffer"
streaming-authorization-enforcer = org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer
connection-supervisor = org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpWebSocketSupervisor
incoming-event-sniffer = org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpIncomingWebSocketEventSniffer
outgoing-event-sniffer = org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpOutgoingWebSocketEventSniffer
}
}

Expand Down

0 comments on commit 1a9064b

Please sign in to comment.