Skip to content

Commit

Permalink
Allow custom configuration for the SseEventSniffer extension
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 12, 2022
1 parent db95bd2 commit b396ba8
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
Expand Up @@ -14,7 +14,11 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.base.service.RootChildActorStarter;

import com.typesafe.config.Config;

import akka.NotUsed;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -44,18 +48,26 @@ public interface SseEventSniffer extends DittoExtensionPoint {
* @throws NullPointerException if {@code actorSystem} is {@code null}.
* @since 3.0.0
*/
static SseEventSniffer get(final ActorSystem actorSystem) {
static SseEventSniffer get(final ActorSystem actorSystem, final Config config) {
checkNotNull(actorSystem, "actorSystem");
return ExtensionId.INSTANCE.get(actorSystem);
checkNotNull(config, "config");
final var extensionIdConfig = ExtensionId.computeConfig(config);
return DittoExtensionIds.get(actorSystem)
.computeIfAbsent(extensionIdConfig, ExtensionId::new)
.get(actorSystem);
}

final class ExtensionId extends DittoExtensionPoint.ExtensionId<SseEventSniffer> {

private static final String CONFIG_PATH = "ditto.gateway.streaming.sse.event-sniffer";
private static final ExtensionId INSTANCE = new ExtensionId(SseEventSniffer.class);
private static final String CONFIG_KEY = "sse-event-sniffer";
private static final String CONFIG_PATH = "ditto.extensions." + CONFIG_KEY;

private ExtensionId(final ExtensionIdConfig<SseEventSniffer> extensionIdConfig) {
super(extensionIdConfig);
}

private ExtensionId(final Class<SseEventSniffer> parentClass) {
super(parentClass);
static ExtensionIdConfig<SseEventSniffer> computeConfig(final Config config) {
return ExtensionIdConfig.of(SseEventSniffer.class, config, CONFIG_KEY);
}

@Override
Expand Down
Expand Up @@ -48,13 +48,14 @@
import org.eclipse.ditto.gateway.service.endpoints.routes.things.ThingsParameter;
import org.eclipse.ditto.gateway.service.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.gateway.service.streaming.StreamingAuthorizationEnforcer;
import org.eclipse.ditto.gateway.service.streaming.signals.Connect;
import org.eclipse.ditto.gateway.service.streaming.signals.StartStreaming;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.gateway.service.streaming.actors.StreamingSession;
import org.eclipse.ditto.gateway.service.streaming.actors.SupervisedStream;
import org.eclipse.ditto.gateway.service.streaming.signals.Connect;
import org.eclipse.ditto.gateway.service.streaming.signals.StartStreaming;
import org.eclipse.ditto.gateway.service.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
Expand Down Expand Up @@ -150,7 +151,8 @@ private ThingsSseRouteBuilder(final ActorSystem actorSystem,
this.streamingConfig = streamingConfig;
this.queryFilterCriteriaFactory = queryFilterCriteriaFactory;
this.pubSubMediator = pubSubMediator;
eventSniffer = SseEventSniffer.get(actorSystem);
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
eventSniffer = SseEventSniffer.get(actorSystem, dittoExtensionsConfig);
sseAuthorizationEnforcer = StreamingAuthorizationEnforcer.sse(actorSystem);
sseConnectionSupervisor = SseConnectionSupervisor.get(actorSystem);
}
Expand Down
6 changes: 2 additions & 4 deletions gateway/service/src/main/resources/gateway.conf
Expand Up @@ -46,6 +46,8 @@ ditto {
gateway-authentication-directive-factory = org.eclipse.ditto.gateway.service.endpoints.directives.auth.DittoGatewayAuthenticationDirectiveFactory
# Creator of props of HTTP request actors. Must implement HttpRequestActorPropsFactory.
http-request-actor-props-factory = org.eclipse.ditto.gateway.service.endpoints.actors.DefaultHttpRequestActorPropsFactory
# The listener for SSE events
sse-event-sniffer = org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseEventSniffer
}

service-name = "gateway"
Expand Down Expand Up @@ -193,10 +195,6 @@ ditto {
# The supervisor for SSE connections
connection-supervisor = "org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseConnectionSupervisor"
connection-supervisor = ${?GATEWAY_SSE_CONNECTION_SUPERVISOR}

# The listener for SSE events
event-sniffer = "org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseEventSniffer"
event-sniffer = ${?GATEWAY_SSE_EVENT_SNIFFER}
}
}

Expand Down
2 changes: 1 addition & 1 deletion gateway/service/src/test/resources/test.conf
Expand Up @@ -142,6 +142,7 @@ ditto {
}
http-bind-flow-provider = org.eclipse.ditto.gateway.service.endpoints.routes.LoggingHttpBindFlowProvider
websocket-config-provider = org.eclipse.ditto.gateway.service.endpoints.routes.websocket.NoOpWebSocketConfigProvider
sse-event-sniffer = org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseEventSniffer
}

mapping-strategy.implementation = "org.eclipse.ditto.gateway.service.util.GatewayMappingStrategies"
Expand All @@ -159,7 +160,6 @@ ditto {
sse {
authorization-enforcer = "org.eclipse.ditto.gateway.service.streaming.NoOpAuthorizationEnforcer"
connection-supervisor = "org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseConnectionSupervisor"
event-sniffer = "org.eclipse.ditto.gateway.service.endpoints.routes.sse.NoOpSseEventSniffer"
}
websocket {
# the max queue size of how many inflight Commands a single Websocket client can have
Expand Down

0 comments on commit b396ba8

Please sign in to comment.