Skip to content

Commit

Permalink
Allow custom configuration for the SseConnectionSupervisor extension
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 12, 2022
1 parent 8945fc7 commit 3894dc6
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.base.service.RootChildActorStarter;
import org.eclipse.ditto.gateway.service.streaming.actors.StreamSupervisor;

import com.typesafe.config.Config;

import akka.actor.ActorSystem;

/**
Expand All @@ -33,18 +37,26 @@ public interface SseConnectionSupervisor extends DittoExtensionPoint, StreamSupe
* @throws NullPointerException if {@code actorSystem} is {@code null}.
* @since 3.0.0
*/
static SseConnectionSupervisor get(final ActorSystem actorSystem) {
static SseConnectionSupervisor get(final ActorSystem actorSystem, final Config config) {
checkNotNull(actorSystem, "actorSystem");
return ExtensionId.INSTANCE.get(actorSystem);
checkNotNull(config, "config");
final var extensionIdConfig = ExtensionId.computeConfig(config);
return DittoExtensionIds.get(actorSystem)
.computeIfAbsent(extensionIdConfig, ExtensionId::new)
.get(actorSystem);
}

final class ExtensionId extends DittoExtensionPoint.ExtensionId<SseConnectionSupervisor> {

private static final String CONFIG_PATH = "ditto.gateway.streaming.sse.connection-supervisor";
private static final ExtensionId INSTANCE = new ExtensionId(SseConnectionSupervisor.class);
private static final String CONFIG_KEY = "sse-connection-supervisor";
private static final String CONFIG_PATH = "ditto.extensions." + CONFIG_KEY;

private ExtensionId(final ExtensionIdConfig<SseConnectionSupervisor> extensionIdConfig) {
super(extensionIdConfig);
}

private ExtensionId(final Class<SseConnectionSupervisor> parentClass) {
super(parentClass);
static ExtensionIdConfig<SseConnectionSupervisor> computeConfig(final Config config) {
return ExtensionIdConfig.of(SseConnectionSupervisor.class, config, CONFIG_KEY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ private ThingsSseRouteBuilder(final ActorSystem actorSystem,
this.streamingConfig = streamingConfig;
this.queryFilterCriteriaFactory = queryFilterCriteriaFactory;
this.pubSubMediator = pubSubMediator;

final Config config = actorSystem.settings().config();
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(config);
eventSniffer = SseEventSniffer.get(actorSystem, dittoExtensionsConfig);
sseConnectionSupervisor = SseConnectionSupervisor.get(actorSystem, dittoExtensionsConfig);

final var sseConfig = ScopedConfig.getOrEmpty(config, "ditto.gateway.streaming.sse");
sseAuthorizationEnforcer = StreamingAuthorizationEnforcer.get(actorSystem, sseConfig);
sseConnectionSupervisor = SseConnectionSupervisor.get(actorSystem);
}

/**
Expand Down
6 changes: 2 additions & 4 deletions gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ ditto {
outgoing-websocket-event-sniffer = org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpOutgoingWebSocketEventSniffer
# The provider for custom API routes.
custom-api-routes-provider = org.eclipse.ditto.gateway.service.endpoints.routes.NoopCustomApiRoutesProvider
# The supervisor for SSE connections
sse-connection-supervisor = org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseConnectionSupervisor
}

service-name = "gateway"
Expand Down Expand Up @@ -185,10 +187,6 @@ ditto {

# The provider enforcer for SSE connections
streaming-authorization-enforcer = "org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer"

# The supervisor for SSE connections
connection-supervisor = "org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseConnectionSupervisor"
connection-supervisor = ${?GATEWAY_SSE_CONNECTION_SUPERVISOR}
}
}

Expand Down
2 changes: 1 addition & 1 deletion gateway/service/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ ditto {
incoming-websocket-event-sniffer = org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpIncomingWebSocketEventSniffer
outgoing-websocket-event-sniffer = org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpOutgoingWebSocketEventSniffer
custom-api-routes-provider = org.eclipse.ditto.gateway.service.endpoints.routes.NoopCustomApiRoutesProvider
sse-connection-supervisor = org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseConnectionSupervisor
}

mapping-strategy.implementation = "org.eclipse.ditto.gateway.service.util.GatewayMappingStrategies"
Expand All @@ -162,7 +163,6 @@ ditto {
streaming {
sse {
streaming-authorization-enforcer = org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer
connection-supervisor = "org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseConnectionSupervisor"
}
websocket {
# the max queue size of how many inflight Commands a single Websocket client can have
Expand Down

0 comments on commit 3894dc6

Please sign in to comment.