Skip to content

Commit

Permalink
Augment requests with OAuth2 tokens; add togglable validation that th…
Browse files Browse the repository at this point in the history
…e token endpoint protocol is HTTPS.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 29, 2021
1 parent 2a51e05 commit 5eee329
Show file tree
Hide file tree
Showing 16 changed files with 413 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ final class DefaultHttpPushConfig implements HttpPushConfig, WithStringMapDecodi
private final Duration requestTimeout;
private final HttpProxyConfig httpProxyConfig;
private final Map<String, String> hmacAlgorithms;
private final OAuth2Config oAuth2Config;

private DefaultHttpPushConfig(final ScopedConfig config) {
maxQueueSize = config.getPositiveIntOrThrow(ConfigValue.MAX_QUEUE_SIZE);
requestTimeout = config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.REQUEST_TIMEOUT);
httpProxyConfig = DefaultHttpProxyConfig.ofProxy(config);
hmacAlgorithms = asStringMap(config, ConfigValue.HMAC_ALGORITHMS.getConfigPath());
oAuth2Config = DefaultOAuth2Config.of(config);
}

static DefaultHttpPushConfig of(final Config config) {
Expand All @@ -69,6 +71,11 @@ public Map<String, String> getHmacAlgorithms() {
return hmacAlgorithms;
}

@Override
public OAuth2Config getOAuth2Config() {
return oAuth2Config;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -81,12 +88,13 @@ public boolean equals(final Object o) {
return maxQueueSize == that.maxQueueSize &&
Objects.equals(requestTimeout, that.requestTimeout) &&
Objects.equals(httpProxyConfig, that.httpProxyConfig) &&
Objects.equals(hmacAlgorithms, that.hmacAlgorithms);
Objects.equals(hmacAlgorithms, that.hmacAlgorithms) &&
Objects.equals(oAuth2Config, that.oAuth2Config);
}

@Override
public int hashCode() {
return Objects.hash(maxQueueSize, httpProxyConfig, hmacAlgorithms, requestTimeout);
return Objects.hash(maxQueueSize, httpProxyConfig, hmacAlgorithms, requestTimeout, oAuth2Config);
}

@Override
Expand All @@ -96,6 +104,7 @@ public String toString() {
", requestTimeout=" + requestTimeout +
", httpProxyConfig=" + httpProxyConfig +
", hmacAlgorithms=" + hmacAlgorithms +
", oAuth2Config=" + oAuth2Config +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;

import com.typesafe.config.Config;

/**
* This class is the default implementation of {@link OAuth2Config}.
*/
@Immutable
final class DefaultOAuth2Config implements OAuth2Config {

private static final String CONFIG_PATH = "oauth2";

private final Duration maxClockSkew;
private final boolean enforceHttps;

private DefaultOAuth2Config(final ScopedConfig config) {
maxClockSkew = config.getDuration(ConfigValue.MAX_CLOCK_SKEW.getConfigPath());
enforceHttps = config.getBoolean(ConfigValue.ENFORCE_HTTPS.getConfigPath());
}

static DefaultOAuth2Config of(final Config config) {
return new DefaultOAuth2Config(ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()));
}

@Override
public Duration getMaxClockSkew() {
return maxClockSkew;
}

@Override
public boolean shouldEnforceHttps() {
return enforceHttps;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DefaultOAuth2Config that = (DefaultOAuth2Config) o;
return Objects.equals(maxClockSkew, that.maxClockSkew) && enforceHttps == that.enforceHttps;
}

@Override
public int hashCode() {
return Objects.hash(enforceHttps, maxClockSkew);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"requestTimeout=" + maxClockSkew +
", httpProxyConfig=" + enforceHttps +
"]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,23 @@
import org.eclipse.ditto.base.service.config.http.HttpProxyConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

import com.typesafe.config.Config;

/**
* Provides configuration settings of the http-push connection type.
*/
public interface HttpPushConfig {

/**
* Create an HTTP Push config object from HOCON.
*
* @param config the HOCON.
* @return the HTTP Push config object.
*/
static HttpPushConfig of(final Config config) {
return DefaultHttpPushConfig.of(config);
}

/**
* @return maximum number of messages buffered at the publisher actor before dropping them.
*/
Expand All @@ -43,6 +55,11 @@ public interface HttpPushConfig {
*/
Map<String, String> getHmacAlgorithms();

/**
* @return configuration for OAuth2.
*/
OAuth2Config getOAuth2Config();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code HttpPushConfig}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;

import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

import com.typesafe.config.Config;

/**
* Provides configuration settings of the http-push connection for OAuth2.
*/
public interface OAuth2Config {

/**
* Create an {@code OAuth2Config} object.
*
* @param config the HOCON.
* @return the OAuth2Config object.
*/
static OAuth2Config of(final Config config) {
return DefaultOAuth2Config.of(config);
}

/**
* @return Maximum expected clock skew. Tokens are renewed before they expire this much time into the future.
*/
Duration getMaxClockSkew();

/**
* @return Whether HTTPS is the required protocol of the token endpoint. Should be true in a production
* environment to avoid transmitting client secret in plain text.
*/
boolean shouldEnforceHttps();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code OAuthConfig}.
*/
enum ConfigValue implements KnownConfigValue {

/**
* Maximum expected clock skew.
*/
MAX_CLOCK_SKEW("max-clock-skew", Duration.ofMinutes(1L)),

/**
* Whether HTTPS is the required protocol of the token endpoint.
*/
ENFORCE_HTTPS("enforce-https", true);

private final String path;
private final Object defaultValue;

ConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.service.UriEncoding;
import org.eclipse.ditto.connectivity.model.OAuthClientCredentials;
import org.eclipse.ditto.connectivity.service.config.HttpPushConfig;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
Expand Down Expand Up @@ -65,6 +67,19 @@ public final class ClientCredentialsFlow {
this.maxClockSkew = maxClockSkew;
}

/**
* Create a {@code ClientCredentialsFlow} object.
*
* @param credentials the credentials.
* @param config the HTTP-Push config.
* @return the object.
*/
public static ClientCredentialsFlow of(final OAuthClientCredentials credentials, final HttpPushConfig config) {
return new ClientCredentialsFlow(credentials.getTokenEndpoint(), credentials.getClientId(),
credentials.getClientSecret(), credentials.getRequestedScopes(),
config.getOAuth2Config().getMaxClockSkew());
}

/**
* Augment HTTP requests with OAuth2 bearer tokens.
*
Expand Down Expand Up @@ -99,7 +114,7 @@ Source<JsonWebToken, NotUsed> getTokenSource(final Flow<HttpRequest, Try<HttpRes
Source.single(token).concatLazy(
Source.repeat(token)
.takeWhile(this::shouldNotRefresh)
.concatLazy(Source.lazily(() -> getTokenSource(httpFlow)))
.concatLazy(Source.lazySource(() -> getTokenSource(httpFlow)))
)
).withAttributes(Attributes.inputBuffer(1, 1));
}
Expand All @@ -112,8 +127,6 @@ Source<JsonWebToken, NotUsed> getTokenSource(final Flow<HttpRequest, Try<HttpRes
* @return Source of a single token, or a failed source.
*/
Source<JsonWebToken, NotUsed> getSingleTokenSource(final Flow<HttpRequest, Try<HttpResponse>, ?> httpFlow) {

final HttpPushContext context = response -> {};
return Source.single(tokenRequest)
.via(httpFlow)
.flatMapConcat(this::asJsonWebToken);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.httppush;

import org.eclipse.ditto.connectivity.model.ClientCertificateCredentials;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.CredentialsVisitor;
import org.eclipse.ditto.connectivity.model.HmacCredentials;
import org.eclipse.ditto.connectivity.model.OAuthClientCredentials;
import org.eclipse.ditto.connectivity.model.SshPublicKeyCredentials;
import org.eclipse.ditto.connectivity.model.UserPasswordCredentials;
import org.eclipse.ditto.connectivity.service.config.HttpPushConfig;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.japi.Pair;
import akka.stream.javadsl.Flow;

/**
* Visitor to create a flow that augment requests with bearer tokens.
*/
final class ClientCredentialsFlowVisitor implements
CredentialsVisitor<Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed>> {

private final ActorSystem actorSystem;
private final HttpPushConfig config;

private ClientCredentialsFlowVisitor(final ActorSystem actorSystem, final HttpPushConfig config) {
this.actorSystem = actorSystem;
this.config = config;
}

static Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed> eval(
final ActorSystem actorSystem, final HttpPushConfig config, final Connection connection) {
return connection.getCredentials()
.map(credentials -> credentials.accept(new ClientCredentialsFlowVisitor(actorSystem, config)))
.orElseGet(Flow::create);
}

@Override
public Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed> clientCertificate(
final ClientCertificateCredentials credentials) {
return Flow.create();
}

@Override
public Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed> usernamePassword(
final UserPasswordCredentials credentials) {
return Flow.create();
}

@Override
public Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed>
sshPublicKeyAuthentication(final SshPublicKeyCredentials credentials) {
return Flow.create();
}

@Override
public Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed> hmac(
final HmacCredentials credentials) {
return Flow.create();
}

@Override
public Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed> oauthClientCredentials(
final OAuthClientCredentials credentials) {
return ClientCredentialsFlow.of(credentials, config).withToken(Http.get(actorSystem));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ static Props props(final Connection connection, final HttpPushFactory factory, f
final Consumer<Duration> logRequestTimes =
duration -> connectionLogger.success("HTTP request took <{0}> ms.", duration.toMillis());

final Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed> oauthFlow =
ClientCredentialsFlowVisitor.eval(getContext().getSystem(), config, connection);

final Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed> requestSigningFlow =
Flow.<Pair<HttpRequest, HttpPushContext>>create()
.flatMapConcat(pair -> httpRequestSigning.sign(pair.first())
Expand All @@ -196,7 +199,7 @@ static Props props(final Connection connection, final HttpPushFactory factory, f
factory.<HttpPushContext>createFlow(getContext().getSystem(), logger, requestTimeout, timer,
logRequestTimes);

return requestSigningFlow.via(httpPushFlow);
return oauthFlow.via(requestSigningFlow).via(httpPushFlow);
}

@Override
Expand Down
Loading

0 comments on commit 5eee329

Please sign in to comment.