Skip to content

Commit

Permalink
Add ClientCredentialsFlow for HTTP connections. Make JwtInvalidExcept…
Browse files Browse the repository at this point in the history
…ion a user-indicated error.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 27, 2021
1 parent 1e4e1fb commit 1d989d7
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
import java.time.Duration;
import java.time.Instant;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.service.UriEncoding;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
import org.eclipse.ditto.jwt.model.JsonWebToken;
import org.eclipse.ditto.jwt.model.JwtInvalidException;

import akka.NotUsed;
import akka.http.javadsl.model.ContentTypes;
Expand Down Expand Up @@ -58,6 +62,12 @@ public final class ClientCredentialsFlow {
this.maxClockSkew = maxClockSkew;
}

/**
* Augment HTTP requests with OAuth2 bearer tokens.
*
* @param httpFlow Flow with which to send HTTP requests.
* @return The request-augmenting flow.
*/
public Flow<Pair<HttpRequest, HttpPushContext>, Pair<HttpRequest, HttpPushContext>, NotUsed> withToken(
final Flow<Pair<HttpRequest, HttpPushContext>, Pair<Try<HttpResponse>, HttpPushContext>, ?> httpFlow) {

Expand Down Expand Up @@ -100,13 +110,76 @@ Source<JsonWebToken, NotUsed> getSingleTokenSource(
final HttpPushContext context = response -> {};
return Source.single(Pair.create(tokenRequest, context))
.via(httpFlow)
.flatMapConcat(ClientCredentialsFlow::asJsonWebToken);
.flatMapConcat(this::asJsonWebToken);
}

private boolean shouldNotRefresh(final JsonWebToken jwt) {
return maxClockSkew.minus(Duration.between(Instant.now(), jwt.getExpirationTime())).isNegative();
}

private Source<JsonWebToken, NotUsed> asJsonWebToken(final Pair<Try<HttpResponse>, HttpPushContext> pair) {
final var tryResponse = pair.first();
if (tryResponse.isFailure()) {
return Source.failed(convertException(tryResponse.failed().get()));
} else {
return parseJwt(tryResponse.get());
}
}

private Source<JsonWebToken, NotUsed> parseJwt(final HttpResponse response) {
final boolean areStatusAndContentTypeExpected = response.status().isSuccess() &&
response.entity().getContentType().equals(ContentTypes.APPLICATION_JSON);
if (areStatusAndContentTypeExpected) {
return response.entity()
.getDataBytes()
.fold(ByteString.emptyByteString(), ByteString::concat)
.map(ByteString::utf8String)
.flatMapConcat(this::extractJwt)
.mapMaterializedValue(any -> NotUsed.getInstance());
} else {
final String description = String.format("Response status is <%d> and content type is <%s>.",
response.status().intValue(), response.entity().getContentType());
return Source.failed(getJwtInvalidExceptionForResponse().description(description).build());
}
}

private Source<JsonWebToken, NotUsed> extractJwt(final String body) {
try {
final var json = JsonObject.of(body);
return Source.single(ImmutableJsonWebToken.fromToken(json.getValueOrThrow(ACCESS_TOKEN)));
} catch (final NullPointerException | IllegalArgumentException | JsonRuntimeException |
DittoRuntimeException e) {
final JwtInvalidException jwtInvalid;
if (e instanceof JwtInvalidException) {
jwtInvalid = (JwtInvalidException) e;
} else {
final var bodySummary = body.length() > 100 ? body.substring(0, 100) + "..." : body;
jwtInvalid = getJwtInvalidExceptionForResponse()
.description(String.format("Response body: <%s>", bodySummary))
.build();
}
return Source.failed(jwtInvalid);
}
}

private JwtInvalidException convertException(final Throwable error) {
if (error instanceof JwtInvalidException) {
return (JwtInvalidException) error;
} else {
return JwtInvalidException.newBuilder()
.message(String.format("Request to token endpoint <%s> failed.", tokenRequest.getUri()))
.description(
String.format("Cause: %s: %s", error.getClass().getCanonicalName(), error.getMessage()))
.build();
}
}

private DittoRuntimeExceptionBuilder<JwtInvalidException> getJwtInvalidExceptionForResponse() {
return JwtInvalidException.newBuilder()
.message(String.format("Received invalid JSON web token response from <%s>.", tokenRequest.getUri()))
.description("Please verify that the token endpoint and client credentials are correct.");
}

private static HttpRequest toTokenRequest(final String tokenEndpoint,
final String clientId,
final String clientSecret,
Expand Down Expand Up @@ -136,35 +209,4 @@ private static Pair<HttpRequest, HttpPushContext> augmentRequestWithJwt(
pair.first().first().addCredentials(HttpCredentials.createOAuth2BearerToken(jwt.getToken()));
return Pair.create(augmentedRequest, context);
}

private static Source<JsonWebToken, NotUsed> asJsonWebToken(final Pair<Try<HttpResponse>, HttpPushContext> pair) {
final var tryResponse = pair.first();
if (tryResponse.isFailure()) {
return Source.failed(tryResponse.failed().get());
} else {
return parseJwt(tryResponse.get());
}
}

private static Source<JsonWebToken, NotUsed> parseJwt(final HttpResponse response) {
final boolean areStatusAndContentTypeExpected = response.status().isSuccess() &&
response.entity().getContentType().equals(ContentTypes.APPLICATION_JSON);
if (areStatusAndContentTypeExpected) {
return response.entity()
.getDataBytes()
.fold(ByteString.emptyByteString(), ByteString::concat)
.map(ByteString::utf8String)
.map(ClientCredentialsFlow::extractJwt)
.mapMaterializedValue(any -> NotUsed.getInstance());
} else {
// TODO: use misconfiguration exception
return Source.failed(new IllegalStateException("Unexpected response from token endpoint"));
}
}

private static JsonWebToken extractJwt(final String body) {
// TODO: classify JSON exception as misconfiguration
final var json = JsonObject.of(body);
return ImmutableJsonWebToken.fromToken(json.getValueOrThrow(ACCESS_TOKEN));
}
}
2 changes: 2 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ ditto {
{exceptionName: "org.apache.qpid.jms.provider.ProviderException", messagePattern: ".*"}
{exceptionName: "org.apache.qpid.jms.provider.exceptions.ProviderSecurityException", messagePattern: ".*"}
{exceptionName: "org.apache.qpid.jms.provider.exceptions.ProviderInvalidDestinationException", messagePattern: ".*"}
# HTTP
{exceptionName: "org.eclipse.ditto.jwt.model.JwtInvalidException", messagePattern: ".*"}
# Common cert exceptions
{exceptionName: "java.security.cert.CertificateException", messagePattern: ".*"}
{exceptionName: "java.security.cert.CertPathValidatorException", messagePattern: ".*"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.connectivity.service.messaging.httppush;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -21,9 +22,11 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionException;

import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
import org.eclipse.ditto.jwt.model.JsonWebToken;
import org.eclipse.ditto.jwt.model.JwtInvalidException;
import org.junit.After;
import org.junit.Test;

Expand Down Expand Up @@ -153,6 +156,65 @@ public void expiredTokensAreUsedExactlyOnce() {
assertThat(uniqueTokens).containsExactlyInAnyOrderElementsOf(result);
}

@Test
public void failedRequest() {
final var underTest = newClientCredentialsFlow(Duration.ZERO);
final var result = underTest.getSingleTokenSource(httpFlow).runWith(Sink.ignore(), actorSystem);
responseProbe.expectRequest();
responseProbe.sendNext(Try.apply(() -> {
throw new IllegalStateException("Expected error");
}));
assertThatExceptionOfType(CompletionException.class)
.isThrownBy(result.toCompletableFuture()::join)
.withCauseInstanceOf(JwtInvalidException.class);
}

@Test
public void incorrectStatusCode() {
final var underTest = newClientCredentialsFlow(Duration.ZERO);
final var result = underTest.getSingleTokenSource(httpFlow).runWith(Sink.ignore(), actorSystem);
responseProbe.expectRequest();
responseProbe.sendNext(Try.apply(() -> HttpResponse.create().withStatus(400)));
assertThatExceptionOfType(CompletionException.class)
.isThrownBy(result.toCompletableFuture()::join)
.withCauseInstanceOf(JwtInvalidException.class);
}

@Test
public void incorrectContentType() {
final var underTest = newClientCredentialsFlow(Duration.ZERO);
final var result = underTest.getSingleTokenSource(httpFlow).runWith(Sink.ignore(), actorSystem);
responseProbe.expectRequest();
responseProbe.sendNext(Try.apply(() -> HttpResponse.create().withStatus(200)
.withEntity(HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, "hello world"))));
assertThatExceptionOfType(CompletionException.class)
.isThrownBy(result.toCompletableFuture()::join)
.withCauseInstanceOf(JwtInvalidException.class);
}

@Test
public void invalidJson() {
final var underTest = newClientCredentialsFlow(Duration.ZERO);
final var result = underTest.getSingleTokenSource(httpFlow).runWith(Sink.ignore(), actorSystem);
responseProbe.expectRequest();
responseProbe.sendNext(Try.apply(() -> HttpResponse.create().withStatus(200)
.withEntity(HttpEntities.create(ContentTypes.APPLICATION_JSON, "hello world"))));
assertThatExceptionOfType(CompletionException.class)
.isThrownBy(result.toCompletableFuture()::join)
.withCauseInstanceOf(JwtInvalidException.class);
}

@Test
public void invalidJwt() {
final var underTest = newClientCredentialsFlow(Duration.ZERO);
final var result = underTest.getSingleTokenSource(httpFlow).runWith(Sink.ignore(), actorSystem);
responseProbe.expectRequest();
responseProbe.sendNext(Try.apply(() -> getTokenResponse(Duration.ZERO, "one!.invalid!.token")));
assertThatExceptionOfType(CompletionException.class)
.isThrownBy(result.toCompletableFuture()::join)
.withCauseInstanceOf(JwtInvalidException.class);
}

private static ClientCredentialsFlow newClientCredentialsFlow(final Duration maxClockSkew) {
return new ClientCredentialsFlow(URI, CLIENT_ID, CLIENT_SECRET, CLIENT_SCOPE, maxClockSkew);
}
Expand Down

0 comments on commit 1d989d7

Please sign in to comment.