Skip to content

Commit

Permalink
Minor review adaptions
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed May 10, 2022
1 parent 469c48b commit b7adb4d
Show file tree
Hide file tree
Showing 34 changed files with 132 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private Route ws(final RequestContext ctx, final CharSequence correlationId,
final ProtocolAdapter chosenProtocolAdapter =
protocolAdapterProvider.getProtocolAdapter(userAgent);
return websocketRouteBuilder.build(wsVersion, correlationId, dittoHeaders,
chosenProtocolAdapter);
chosenProtocolAdapter, ctx);
});
}
)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.gateway.service.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.gateway.service.streaming.StreamingAuthorizationEnforcer;

import akka.actor.ActorRef;
import akka.http.javadsl.server.RequestContext;
Expand All @@ -37,7 +38,7 @@ public interface SseRouteBuilder {
* @return this builder instance to allow method chaining.
* @throws NullPointerException if {@code enforcer} is {@code null}.
*/
SseRouteBuilder withAuthorizationEnforcer(SseAuthorizationEnforcer enforcer);
SseRouteBuilder withAuthorizationEnforcer(StreamingAuthorizationEnforcer enforcer);

/**
* Sets the given event sniffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@
import org.eclipse.ditto.gateway.service.endpoints.routes.AbstractRoute;
import org.eclipse.ditto.gateway.service.endpoints.routes.things.ThingsParameter;
import org.eclipse.ditto.gateway.service.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.gateway.service.streaming.Connect;
import org.eclipse.ditto.gateway.service.streaming.StartStreaming;
import org.eclipse.ditto.gateway.service.streaming.StreamingAuthorizationEnforcer;
import org.eclipse.ditto.gateway.service.streaming.signals.Connect;
import org.eclipse.ditto.gateway.service.streaming.signals.StartStreaming;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.gateway.service.streaming.actors.StreamingSession;
import org.eclipse.ditto.gateway.service.streaming.actors.SupervisedStream;
Expand Down Expand Up @@ -135,7 +136,7 @@ public final class ThingsSseRouteBuilder extends RouteDirectives implements SseR
private final ActorRef pubSubMediator;
private SseConnectionSupervisor sseConnectionSupervisor;
private SseEventSniffer eventSniffer;
private SseAuthorizationEnforcer sseAuthorizationEnforcer;
private StreamingAuthorizationEnforcer sseAuthorizationEnforcer;
@Nullable private GatewaySignalEnrichmentProvider signalEnrichmentProvider;
@Nullable private ActorRef proxyActor;

Expand All @@ -150,7 +151,7 @@ private ThingsSseRouteBuilder(final ActorSystem actorSystem,
this.queryFilterCriteriaFactory = queryFilterCriteriaFactory;
this.pubSubMediator = pubSubMediator;
eventSniffer = SseEventSniffer.get(actorSystem);
sseAuthorizationEnforcer = SseAuthorizationEnforcer.get(actorSystem);
sseAuthorizationEnforcer = StreamingAuthorizationEnforcer.sse(actorSystem);
sseConnectionSupervisor = SseConnectionSupervisor.get(actorSystem);
}

Expand Down Expand Up @@ -179,7 +180,7 @@ public static ThingsSseRouteBuilder getInstance(final ActorSystem actorSystem,
}

@Override
public SseRouteBuilder withAuthorizationEnforcer(final SseAuthorizationEnforcer enforcer) {
public SseRouteBuilder withAuthorizationEnforcer(final StreamingAuthorizationEnforcer enforcer) {
sseAuthorizationEnforcer = checkNotNull(enforcer, "enforcer");
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.gateway.service.streaming.StreamControlMessage;
import org.eclipse.ditto.gateway.service.streaming.signals.StreamControlMessage;

/**
* This is a {@code StreamControlMessage} to explicitly express, that the WebSocket stream should remain as it is.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.gateway.service.streaming.Jwt;
import org.eclipse.ditto.gateway.service.streaming.StartStreaming;
import org.eclipse.ditto.gateway.service.streaming.StopStreaming;
import org.eclipse.ditto.gateway.service.streaming.StreamControlMessage;
import org.eclipse.ditto.gateway.service.streaming.signals.Jwt;
import org.eclipse.ditto.gateway.service.streaming.signals.StartStreaming;
import org.eclipse.ditto.gateway.service.streaming.signals.StopStreaming;
import org.eclipse.ditto.gateway.service.streaming.signals.StreamControlMessage;
import org.eclipse.ditto.things.model.ThingFieldSelector;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;

/**
* Extracts WebSocket Protocol message from the given payload string and returns a {@link org.eclipse.ditto.gateway.service.streaming.StartStreaming},
* {@link org.eclipse.ditto.gateway.service.streaming.StopStreaming} instance or {@code null} if the payload did not contain one of the defined
* Extracts WebSocket Protocol message from the given payload string and returns a {@link org.eclipse.ditto.gateway.service.streaming.signals.StartStreaming},
* {@link org.eclipse.ditto.gateway.service.streaming.signals.StopStreaming} instance or {@code null} if the payload did not contain one of the defined
* {@link ProtocolMessageType}.
*/
final class ProtocolMessageExtractor implements Function<String, Optional<StreamControlMessage>> {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@
import org.eclipse.ditto.gateway.api.GatewayWebsocketSessionClosedException;
import org.eclipse.ditto.gateway.api.GatewayWebsocketSessionExpiredException;
import org.eclipse.ditto.gateway.service.endpoints.routes.AbstractRoute;
import org.eclipse.ditto.gateway.service.streaming.StreamingAuthorizationEnforcer;
import org.eclipse.ditto.gateway.service.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.gateway.service.security.HttpHeader;
import org.eclipse.ditto.gateway.service.streaming.Connect;
import org.eclipse.ditto.gateway.service.streaming.IncomingSignal;
import org.eclipse.ditto.gateway.service.streaming.StreamControlMessage;
import org.eclipse.ditto.gateway.service.streaming.StreamingAck;
import org.eclipse.ditto.gateway.service.streaming.signals.Connect;
import org.eclipse.ditto.gateway.service.streaming.signals.IncomingSignal;
import org.eclipse.ditto.gateway.service.streaming.signals.StreamControlMessage;
import org.eclipse.ditto.gateway.service.streaming.signals.StreamingAck;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor;
import org.eclipse.ditto.gateway.service.streaming.actors.SupervisedStream;
Expand Down Expand Up @@ -104,6 +105,7 @@
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.WebSocketUpgrade;
import akka.http.javadsl.server.Directives;
import akka.http.javadsl.server.RequestContext;
import akka.http.javadsl.server.Route;
import akka.japi.Pair;
import akka.japi.function.Function;
Expand Down Expand Up @@ -169,7 +171,7 @@ public final class WebSocketRoute implements WebSocketRouteBuilder {

private IncomingWebSocketEventSniffer incomingMessageSniffer;
private OutgoingWebSocketEventSniffer outgoingMessageSniffer;
private WebSocketAuthorizationEnforcer authorizationEnforcer;
private StreamingAuthorizationEnforcer authorizationEnforcer;
private WebSocketSupervisor webSocketSupervisor;
@Nullable private GatewaySignalEnrichmentProvider signalEnrichmentProvider;
private HeaderTranslator headerTranslator;
Expand All @@ -185,7 +187,7 @@ private WebSocketRoute(final ActorSystem actorSystem,

incomingMessageSniffer = IncomingWebSocketEventSniffer.get(actorSystem);
outgoingMessageSniffer = OutgoingWebSocketEventSniffer.get(actorSystem);
authorizationEnforcer = WebSocketAuthorizationEnforcer.get(actorSystem);
authorizationEnforcer = StreamingAuthorizationEnforcer.ws(actorSystem);
webSocketSupervisor = WebSocketSupervisor.get(actorSystem);
webSocketConfigProvider = WebSocketConfigProvider.get(actorSystem);
signalEnrichmentProvider = null;
Expand Down Expand Up @@ -224,7 +226,7 @@ public WebSocketRouteBuilder withOutgoingEventSniffer(final OutgoingWebSocketEve
}

@Override
public WebSocketRouteBuilder withAuthorizationEnforcer(final WebSocketAuthorizationEnforcer enforcer) {
public WebSocketRouteBuilder withAuthorizationEnforcer(final StreamingAuthorizationEnforcer enforcer) {
authorizationEnforcer = checkNotNull(enforcer, "enforcer");
return this;
}
Expand Down Expand Up @@ -264,11 +266,12 @@ public WebSocketRouteBuilder withWebSocketConfigProvider(final WebSocketConfigPr
public Route build(final JsonSchemaVersion version,
final CharSequence correlationId,
final DittoHeaders dittoHeaders,
final ProtocolAdapter chosenProtocolAdapter) {
final ProtocolAdapter chosenProtocolAdapter,
final RequestContext ctx) {

return Directives.extractWebSocketUpgrade(websocketUpgrade -> Directives.extractRequest(request -> {
final CompletionStage<DittoHeaders> checkAuthorization =
authorizationEnforcer.checkAuthorization(dittoHeaders);
authorizationEnforcer.checkAuthorization(ctx, dittoHeaders);
return Directives.completeWithFuture(checkAuthorization.thenCompose(authorizedHeaders ->
createWebSocket(websocketUpgrade, version, correlationId.toString(),
authorizedHeaders, chosenProtocolAdapter, request)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.gateway.service.streaming.StreamingAuthorizationEnforcer;
import org.eclipse.ditto.gateway.service.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;

import akka.http.javadsl.server.RequestContext;
import akka.http.javadsl.server.Route;

/**
Expand Down Expand Up @@ -53,7 +55,7 @@ public interface WebSocketRouteBuilder {
* @return this builder instance to allow method chaining.
* @throws NullPointerException if {@code enforcer} is {@code null}.
*/
WebSocketRouteBuilder withAuthorizationEnforcer(WebSocketAuthorizationEnforcer enforcer);
WebSocketRouteBuilder withAuthorizationEnforcer(StreamingAuthorizationEnforcer enforcer);

/**
* Sets the given supervisor.
Expand Down Expand Up @@ -101,12 +103,14 @@ public interface WebSocketRouteBuilder {
* @param correlationId the correlation ID of the request to open the WS connection.
* @param dittoHeaders the ditto headers of the WS connection.
* @param chosenProtocolAdapter protocol adapter to map incoming and outgoing signals.
* @param ctx the request context.
* @return the route.
* @throws NullPointerException if any argument is {@code null}.
*/
Route build(JsonSchemaVersion version,
CharSequence correlationId,
DittoHeaders dittoHeaders,
ProtocolAdapter chosenProtocolAdapter);
ProtocolAdapter chosenProtocolAdapter,
RequestContext ctx);

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.eclipse.ditto.gateway.service.util.config.endpoints.HttpConfig;
import org.eclipse.ditto.gateway.service.util.config.health.HealthCheckConfig;
import org.eclipse.ditto.gateway.service.util.config.security.AuthenticationConfig;
import org.eclipse.ditto.gateway.service.util.config.security.DevOpsConfig;
import org.eclipse.ditto.gateway.service.util.config.security.OAuthConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
Expand Down Expand Up @@ -121,16 +122,17 @@ private GatewayRootActor(final GatewayConfig gatewayConfig, final ActorRef pubSu

final DefaultHttpClientFacade httpClient =
DefaultHttpClientFacade.getInstance(actorSystem, authenticationConfig.getHttpProxyConfig());
final OAuthConfig oAuthConfig = authenticationConfig.getOAuthConfig();

final JwtAuthenticationFactory jwtAuthenticationFactory =
getJwtAuthenticationFactory(httpClient, publicKeysConfig, authenticationConfig, actorSystem);
JwtAuthenticationFactory.newInstance(oAuthConfig, publicKeysConfig, httpClient, actorSystem);

final JwtAuthenticationResultProvider jwtAuthenticationResultProvider =
JwtAuthenticationResultProvider.get(actorSystem);

final DevOpsConfig devOpsConfig = authenticationConfig.getDevOpsConfig();
final DevopsAuthenticationDirectiveFactory devopsAuthenticationDirectiveFactory =
getDevopsAuthenticationDirectiveFactory(httpClient, publicKeysConfig, authenticationConfig,
actorSystem);
getDevopsAuthenticationDirectiveFactory(httpClient, publicKeysConfig, devOpsConfig, actorSystem);

final ProtocolAdapterProvider protocolAdapterProvider =
ProtocolAdapterProvider.load(gatewayConfig.getProtocolConfig(), actorSystem);
Expand Down Expand Up @@ -259,9 +261,10 @@ private static Route createRoute(final ActorSystem actorSystem,
.devopsRoute(new DevOpsRoute(routeBaseProperties, devopsAuthenticationDirective))
.policiesRoute(new PoliciesRoute(routeBaseProperties,
OAuthTokenIntegrationSubjectIdFactory.of(authConfig.getOAuthConfig())))
.sseThingsRoute(ThingsSseRouteBuilder.getInstance(actorSystem, streamingActor, streamingConfig, pubSubMediator)
.withProxyActor(proxyActor)
.withSignalEnrichmentProvider(signalEnrichmentProvider))
.sseThingsRoute(
ThingsSseRouteBuilder.getInstance(actorSystem, streamingActor, streamingConfig, pubSubMediator)
.withProxyActor(proxyActor)
.withSignalEnrichmentProvider(signalEnrichmentProvider))
.thingsRoute(new ThingsRoute(routeBaseProperties,
gatewayConfig.getMessageConfig(),
gatewayConfig.getClaimMessageConfig()))
Expand Down Expand Up @@ -311,29 +314,19 @@ private ActorRef startProxyActor(final ActorRefFactory actorSystem, final ActorR

}

private static JwtAuthenticationFactory getJwtAuthenticationFactory(final HttpClientFacade httpClient,
final CacheConfig publicKeysConfig,
final AuthenticationConfig authenticationConfig,
final ActorSystem actorSystem) {

final OAuthConfig oAuthConfig = authenticationConfig.getOAuthConfig();
return JwtAuthenticationFactory.newInstance(oAuthConfig, publicKeysConfig, httpClient, actorSystem);
}

private static DevopsAuthenticationDirectiveFactory getDevopsAuthenticationDirectiveFactory(
final HttpClientFacade httpClient,
final CacheConfig publicKeysConfig,
final AuthenticationConfig authenticationConfig,
final DevOpsConfig devOpsConfig,
final ActorSystem actorSystem) {

final OAuthConfig devopsOauthConfig = authenticationConfig.getDevOpsConfig().getOAuthConfig();
final OAuthConfig devopsOauthConfig = devOpsConfig.getOAuthConfig();
final JwtAuthenticationFactory devopsJwtAuthenticationFactory =
JwtAuthenticationFactory.newInstance(devopsOauthConfig, publicKeysConfig, httpClient, actorSystem);
return DevopsAuthenticationDirectiveFactory.newInstance(devopsJwtAuthenticationFactory,
authenticationConfig.getDevOpsConfig());
return DevopsAuthenticationDirectiveFactory.newInstance(devopsJwtAuthenticationFactory, devOpsConfig);
}

private String getHostname(final org.eclipse.ditto.base.service.config.http.HttpConfig httpConfig) {
private String getHostname(final HttpConfig httpConfig) {
String hostname = httpConfig.getHostname();
if (hostname.isEmpty()) {
hostname = LocalHostAddressSupplier.getInstance().get();
Expand Down
Loading

0 comments on commit b7adb4d

Please sign in to comment.