Skip to content

Commit

Permalink
#17 finished token refresh implementation
Browse files Browse the repository at this point in the history
Signed-off-by: krj1imb <johannes.schneider@bosch-si.com>
  • Loading branch information
krj1imb committed Oct 11, 2019
1 parent a2d9833 commit 2cd5fbb
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 52 deletions.
Expand Up @@ -12,15 +12,10 @@
*/
package org.eclipse.ditto.client.messaging;

import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.ditto.client.configuration.internal.AccessTokenAuthenticationConfiguration;
import org.eclipse.ditto.client.configuration.internal.BasicAuthenticationConfiguration;
import org.eclipse.ditto.client.configuration.internal.ClientCredentialsAuthenticationConfiguration;
import org.eclipse.ditto.client.configuration.internal.DummyAuthenticationConfiguration;
import org.eclipse.ditto.client.internal.DefaultThreadFactory;
import org.eclipse.ditto.client.messaging.internal.AccessTokenAuthenticationProvider;
import org.eclipse.ditto.client.messaging.internal.BasicAuthenticationProvider;
import org.eclipse.ditto.client.messaging.internal.ClientCredentialsAuthenticationProvider;
Expand All @@ -45,8 +40,7 @@ private AuthenticationProviders() {
*/
public static AuthenticationProvider accessToken(final AccessTokenAuthenticationConfiguration configuration) {

return new AccessTokenAuthenticationProvider(configuration,
createDefaultExecutorService(UUID.randomUUID().toString()));
return new AccessTokenAuthenticationProvider(configuration);
}

/**
Expand All @@ -69,8 +63,7 @@ public static AuthenticationProvider basic(final BasicAuthenticationConfiguratio
public static AuthenticationProvider clientCredentials(
final ClientCredentialsAuthenticationConfiguration configuration) {

return new ClientCredentialsAuthenticationProvider(configuration,
createDefaultExecutorService(UUID.randomUUID().toString()));
return new ClientCredentialsAuthenticationProvider(configuration);
}

/**
Expand All @@ -85,8 +78,4 @@ public static AuthenticationProvider dummy(
return new DummyAuthenticationProvider(configuration);
}

private static ScheduledExecutorService createDefaultExecutorService(final String name) {
return Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("ditto-client-scheduler-" + name));
}

}
Expand Up @@ -17,9 +17,14 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.annotation.concurrent.ThreadSafe;

import org.eclipse.ditto.client.internal.DefaultThreadFactory;
import org.eclipse.ditto.client.messaging.AuthenticationProvider;
import org.eclipse.ditto.model.jwt.JsonWebToken;

Expand All @@ -32,57 +37,76 @@
*/
abstract class AbstractTokenAuthenticationProvider implements AuthenticationProvider<WebSocket> {

private static final long TOKEN_GRACE_SECONDS = 5L;
private static final String TOKEN_MESSAGE_TEMPLATE = "JWT-TOKEN?jwtToken=%s";
private static final String PROTOCOL_CMD_JWT_TOKEN_TEMPLATE = "JWT-TOKEN?jwtToken=%s";

private final Map<String, String> additionalHeaders;
private final JsonWebTokenSupplier jsonWebTokenSupplier;
private final Scheduler scheduler;
private final JwtRefreshScheduler jwtRefreshScheduler;

AbstractTokenAuthenticationProvider(final Map<String, String> additionalHeaders,
final JsonWebTokenSupplier jsonWebTokenSupplier,
final ScheduledExecutorService executorService) {
final JsonWebTokenSupplier jsonWebTokenSupplier) {
this.additionalHeaders = checkNotNull(additionalHeaders, "additionalHeaders");
this.jsonWebTokenSupplier = checkNotNull(jsonWebTokenSupplier, "accessTokenSupplier");
scheduler = new Scheduler(jsonWebTokenSupplier, executorService);
jwtRefreshScheduler = JwtRefreshScheduler.of(jsonWebTokenSupplier);
}

@Override
public void prepareAuthentication(final WebSocket webSocket) {
final JsonWebToken jsonWebToken = jsonWebTokenSupplier.get();
final String authorizationHeader = String.format("Bearer %s", jsonWebToken.getToken());
final JsonWebToken jwt = jsonWebTokenSupplier.get();
final String authorizationHeader = String.format("Bearer %s", jwt.getToken());
webSocket.addHeader("Authorization", authorizationHeader);
additionalHeaders.forEach(webSocket::addHeader);
scheduler.scheduleTokenRefresh(webSocket, jsonWebToken.getExpirationTime());
jwtRefreshScheduler.scheduleRefresh(jwt.getExpirationTime(), newJwt -> sendJwt(webSocket, newJwt));
}

private void sendJwt(final WebSocket webSocket, final JsonWebToken jsonWebToken) {
webSocket.sendText(String.format(PROTOCOL_CMD_JWT_TOKEN_TEMPLATE, jsonWebToken.getToken()));
}

@Override
public void destroy() {
scheduler.destroy();
jwtRefreshScheduler.destroy();
}

private static final class Scheduler {
@ThreadSafe
private static final class JwtRefreshScheduler {

private static final long EXPIRY_GRACE_SECONDS = 5L;

private final JsonWebTokenSupplier jsonWebTokenSupplier;
private final ScheduledExecutorService executorService;

private Scheduler(final JsonWebTokenSupplier jsonWebTokenSupplier,
final ScheduledExecutorService executorService) {
private JwtRefreshScheduler(final JsonWebTokenSupplier jsonWebTokenSupplier) {
this.jsonWebTokenSupplier = checkNotNull(jsonWebTokenSupplier, "jsonWebTokenSupplier");
this.executorService = checkNotNull(executorService, "executorService");
executorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("scheduler"));
}

static JwtRefreshScheduler of(final JsonWebTokenSupplier jsonWebTokenSupplier) {
return new JwtRefreshScheduler(jsonWebTokenSupplier);
}

void scheduleTokenRefresh(final WebSocket webSocket, final Instant expiry) {
final Instant expiration = expiry.minusSeconds(TOKEN_GRACE_SECONDS);
final Duration delay = Duration.between(Instant.now(), expiration);
executorService.schedule(() -> getToken(webSocket), delay.toMillis(), TimeUnit.MILLISECONDS);
/**
* Schedules a fresh {@code JsonWebToken} from the configured {@code JsonWebTokenSupplier}. The refresh will
* be triggered
* {@link org.eclipse.ditto.client.messaging.internal.AbstractTokenAuthenticationProvider.JwtRefreshScheduler#EXPIRY_GRACE_SECONDS}
* seconds before the actual expiry to account for network latency.
*
* @param due the instant when the fresh token is due.
* @param consumer provides the fresh token.
*/
void scheduleRefresh(final Instant due, final Consumer<JsonWebToken> consumer) {
final Instant expiration = due.minusSeconds(EXPIRY_GRACE_SECONDS);
final Instant now = Instant.now();
if (now.isBefore(expiration)) {
final long delay = Duration.between(now, expiration).toMillis();
executorService.schedule(() -> doRefresh(consumer), delay, TimeUnit.MILLISECONDS);
}
}

private void getToken(final WebSocket webSocket) {
private void doRefresh(final Consumer<JsonWebToken> consumer) {
final JsonWebToken jsonWebToken = jsonWebTokenSupplier.get();
final String tokenMessage = String.format(TOKEN_MESSAGE_TEMPLATE, jsonWebToken.getToken());
webSocket.sendText(tokenMessage);
scheduleTokenRefresh(webSocket, jsonWebToken.getExpirationTime());
consumer.accept(jsonWebToken);
scheduleRefresh(jsonWebToken.getExpirationTime(), consumer);
}

void destroy() {
Expand Down
Expand Up @@ -14,8 +14,6 @@

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.ditto.client.configuration.AuthenticationConfiguration;
import org.eclipse.ditto.client.configuration.internal.AccessTokenAuthenticationConfiguration;

Expand All @@ -28,9 +26,8 @@ public final class AccessTokenAuthenticationProvider extends AbstractTokenAuthen

private final AccessTokenAuthenticationConfiguration configuration;

public AccessTokenAuthenticationProvider(final AccessTokenAuthenticationConfiguration configuration,
final ScheduledExecutorService executorService) {
super(configuration.getAdditionalHeaders(), configuration.getJsonWebTokenSupplier(), executorService);
public AccessTokenAuthenticationProvider(final AccessTokenAuthenticationConfiguration configuration) {
super(configuration.getAdditionalHeaders(), configuration.getJsonWebTokenSupplier());
this.configuration = checkNotNull(configuration, "configuration");
}

Expand Down
Expand Up @@ -14,8 +14,6 @@

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.ditto.client.configuration.AuthenticationConfiguration;
import org.eclipse.ditto.client.configuration.internal.ClientCredentialsAuthenticationConfiguration;

Expand All @@ -28,10 +26,8 @@ public final class ClientCredentialsAuthenticationProvider extends AbstractToken

private final ClientCredentialsAuthenticationConfiguration configuration;

public ClientCredentialsAuthenticationProvider(final ClientCredentialsAuthenticationConfiguration configuration,
final ScheduledExecutorService executorService) {
super(configuration.getAdditionalHeaders(), ClientCredentialsJsonWebTokenSupplier.newInstance(configuration),
executorService);
public ClientCredentialsAuthenticationProvider(final ClientCredentialsAuthenticationConfiguration configuration) {
super(configuration.getAdditionalHeaders(), ClientCredentialsJsonWebTokenSupplier.newInstance(configuration));
this.configuration = checkNotNull(configuration, "configuration");
}

Expand Down
Expand Up @@ -133,6 +133,8 @@ public final class WebSocketMessagingProvider extends WebSocketAdapter implement
private static final String PROTOCOL_CMD_START_SEND_LIVE_EVENTS = "START-SEND-LIVE-EVENTS";
private static final String PROTOCOL_CMD_STOP_SEND_LIVE_EVENTS = "STOP-SEND-LIVE-EVENTS";

private static final String PROTOCOL_CMD_JWT_TOKEN = "JWT-TOKEN";

/**
* The backend sends the protocol message above suffixed by ":ACK" when the subscription was created. E.g.: {@code
* START-SEND-EVENTS:ACK}
Expand Down Expand Up @@ -790,6 +792,9 @@ private void handleIncomingMessage(final String message) {
case PROTOCOL_CMD_STOP_SEND_LIVE_EVENTS + PROTOCOL_CMD_ACK_SUFFIX:
ackSubscription(PROTOCOL_CMD_STOP_SEND_LIVE_EVENTS);
return;
case PROTOCOL_CMD_JWT_TOKEN + PROTOCOL_CMD_ACK_SUFFIX:
LOGGER.trace("Ack for JWT received.");
return;
default:
// no protocol message, treat as JSON below ..
}
Expand Down
Expand Up @@ -15,6 +15,7 @@
import static java.util.Arrays.asList;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -29,8 +30,7 @@
public class ClientShutdownTest {

private static final List<String> ALLOWED_THREADS = asList("main", "Monitor Ctrl-Break", "BundleWatcher: 1",
"surefire-forkedjvm-command-thread", "surefire-forkedjvm-ping-30s", "ping-30s", "Attach API wait loop",
"pool-1-thread-1");
"surefire-forkedjvm-command-thread", "surefire-forkedjvm-ping-30s", "ping-30s", "Attach API wait loop");

@Test
public void testNoMoreActiveThreads() throws InterruptedException {
Expand All @@ -42,7 +42,7 @@ public void testNoMoreActiveThreads() throws InterruptedException {
DittoClients.newInstance(messaging).destroy();

// wait some time for executors/threads to shutdown
Thread.sleep(2000);
TimeUnit.SECONDS.sleep(2L);

final Thread[] threads = new Thread[Thread.activeCount()];
Thread.enumerate(threads);
Expand Down
Expand Up @@ -13,12 +13,12 @@
package org.eclipse.ditto.client.messaging.internal;

import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import java.time.Instant;
import java.util.Base64;
import java.util.concurrent.Executors;

import org.eclipse.ditto.client.configuration.internal.AccessTokenAuthenticationConfiguration;
import org.eclipse.ditto.model.jwt.ImmutableJsonWebToken;
Expand All @@ -45,14 +45,27 @@ public void tokenRefreshIsCalledBeforeExpiry() {

underTest.prepareAuthentication(webSocket);

verify(webSocket, timeout(12000L)).sendText(startsWith("JWT-TOKEN?jwtToken="));
verify(webSocket, timeout(10000L)).sendText(startsWith("JWT-TOKEN?jwtToken="));

underTest.destroy();
}

@Test
public void tokenRefreshIsNotCalledWithNegativeExpiry() {
final AccessTokenAuthenticationProvider underTest = getAccessTokenAuthenticationProvider(0L);

underTest.prepareAuthentication(webSocket);

verify(webSocket, never()).sendText(startsWith("JWT-TOKEN?jwtToken="));

underTest.destroy();
}

private static AccessTokenAuthenticationProvider getAccessTokenAuthenticationProvider(final long exp) {
return new AccessTokenAuthenticationProvider(AccessTokenAuthenticationConfiguration.newBuilder()
.identifier("bumlux")
.accessTokenSupplier(() -> getJsonWebToken(exp))
.build(), Executors.newSingleThreadScheduledExecutor());
.build());
}

private static JsonWebToken getJsonWebToken(final long exp) {
Expand Down

0 comments on commit 2cd5fbb

Please sign in to comment.