Skip to content

Commit

Permalink
Issue #792: Make -AutthorizationEnforcer asynchronous for Websocket a…
Browse files Browse the repository at this point in the history
…nd SSE.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 24, 2020
1 parent da2489c commit ceeb2a3
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 23 deletions.
Expand Up @@ -78,6 +78,24 @@ public AcknowledgementLabelInvalidException(final CharSequence label) {
DEFAULT_HREF);
}

/**
* Create an {@code AcknowledgementLabelInvalidException} with custom description, hyperlink and headers.
*
* @param label the invalid label.
* @param description the custom description.
* @param href hyperlink in the exception.
* @param dittoHeaders the headers of the exception.
* @return the exception.
*/
public static AcknowledgementLabelInvalidException of(final CharSequence label,
@Nullable final String description,
@Nullable final URI href,
final DittoHeaders dittoHeaders) {

return new AcknowledgementLabelInvalidException(dittoHeaders, MessageFormat.format(MESSAGE_TEMPLATE, label),
description, null, href);
}

/**
* Constructs a new {@code AcknowledgementLabelInvalidException} object with the exception message extracted from
* the given JSON object.
Expand Down
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.services.gateway.endpoints.routes.sse;

import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.model.base.headers.DittoHeaders;

import akka.http.javadsl.server.RequestContext;
Expand All @@ -29,7 +31,8 @@ public interface SseAuthorizationEnforcer {
* @param requestContext the context of the HTTP request for opening the connection.
* @param dittoHeaders the DittoHeaders with authentication information for opening the connection.
* @throws NullPointerException if any argument is {@code null}.
* @return a successful future if validation succeeds or a failed future if validation fails.
*/
void checkAuthorization(RequestContext requestContext, DittoHeaders dittoHeaders);
CompletionStage<Void> checkAuthorization(RequestContext requestContext, DittoHeaders dittoHeaders);

}
Expand Up @@ -243,11 +243,8 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
final SignalEnrichmentFacade facade =
signalEnrichmentProvider == null ? null : signalEnrichmentProvider.getFacade(ctx.getRequest());

final CompletionStage<Source<ServerSentEvent, NotUsed>> sseSourceStage =
dittoHeadersStage.thenApply(dittoHeaders -> {

sseAuthorizationEnforcer.checkAuthorization(ctx, dittoHeaders);

final CompletionStage<Source<ServerSentEvent, NotUsed>> sseSourceStage = dittoHeadersStage.thenCompose(
dittoHeaders -> sseAuthorizationEnforcer.checkAuthorization(ctx, dittoHeaders).thenApply(_void -> {
if (filterString != null) {
// will throw an InvalidRqlExpressionException if the RQL expression was not valid:
queryFilterCriteriaFactory.filterCriteria(filterString, dittoHeaders);
Expand Down Expand Up @@ -298,7 +295,8 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
// sniffer shouldn't sniff heartbeats
.viaMat(eventSniffer.toAsyncFlow(ctx.getRequest()), Keep.none())
.keepAlive(Duration.ofSeconds(1), ServerSentEvent::heartbeat);
});
})
);

return completeOKWithFuture(sseSourceStage, EventStreamMarshalling.toEventStream());
}
Expand Down Expand Up @@ -457,8 +455,9 @@ private static Counter getCounterFor(final String path) {
private static final class NoOpSseAuthorizationEnforcer implements SseAuthorizationEnforcer {

@Override
public void checkAuthorization(final RequestContext requestContext, final DittoHeaders dittoHeaders) {
// Does nothing.
public CompletionStage<Void> checkAuthorization(final RequestContext requestContext,
final DittoHeaders dittoHeaders) {
return CompletableFuture.completedStage(null);
}

}
Expand Down
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.services.gateway.endpoints.routes.websocket;

import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.model.base.headers.DittoHeaders;

/**
Expand All @@ -27,8 +29,8 @@ public interface WebSocketAuthorizationEnforcer {
*
* @param dittoHeaders the DittoHeaders containing already gathered context information.
* @throws NullPointerException if any argument is {@code null}.
* @throws org.eclipse.ditto.model.base.exceptions.DittoRuntimeException if the check failed
* @return a successful future if the check succeeded, or a failed future if the check failed.
*/
void checkAuthorization(DittoHeaders dittoHeaders);
CompletionStage<Void> checkAuthorization(DittoHeaders dittoHeaders);

}
Expand Up @@ -244,14 +244,13 @@ public Route build(final JsonSchemaVersion version,
final DittoHeaders dittoHeaders,
final ProtocolAdapter chosenProtocolAdapter) {

return Directives.extractWebSocketUpgrade(
websocketUpgrade -> Directives.extractRequest(
request -> {
authorizationEnforcer.checkAuthorization(dittoHeaders);
return Directives.completeWithFuture(
createWebSocket(websocketUpgrade, version, correlationId.toString(),
dittoHeaders, chosenProtocolAdapter, request));
}));
return Directives.extractWebSocketUpgrade(websocketUpgrade -> Directives.extractRequest(request -> {
final CompletionStage<Void> checkAuthorization =
authorizationEnforcer.checkAuthorization(dittoHeaders);
return Directives.completeWithFuture(checkAuthorization.thenCompose(_void ->
createWebSocket(websocketUpgrade, version, correlationId.toString(),
dittoHeaders, chosenProtocolAdapter, request)));
}));
}

private CompletionStage<WebsocketConfig> retrieveWebsocketConfig() {
Expand Down Expand Up @@ -757,11 +756,9 @@ private static Optional<JsonWebToken> extractJwtFromRequestIfPresent(final HttpR
private static final class NoOpAuthorizationEnforcer implements WebSocketAuthorizationEnforcer {

@Override
public void checkAuthorization(final DittoHeaders dittoHeaders) {

// Does nothing.
public CompletionStage<Void> checkAuthorization(final DittoHeaders dittoHeaders) {
return CompletableFuture.completedStage(null);
}

}

/**
Expand Down

0 comments on commit ceeb2a3

Please sign in to comment.