Skip to content

Commit

Permalink
Use ScheduledExecutorService to avoid StackOverflowException because of
Browse files Browse the repository at this point in the history
recursion and avoid interrupting threads.

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 13, 2020
1 parent fae4b73 commit edce2a8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 39 deletions.
Expand Up @@ -12,14 +12,17 @@
*/
package org.eclipse.ditto.client.messaging.internal;

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,13 +41,13 @@ final class Retry<T> implements Supplier<CompletionStage<T>> {
private final String sessionId;
private final String nameOfAction;
private final Supplier<T> retriedSupplier;
private final ExecutorService executorService;
private final ScheduledExecutorService executorService;


private Retry(final String nameOfAction,
final String sessionId,
final Supplier<T> retriedSupplier,
final ExecutorService executorService) {
final ScheduledExecutorService executorService) {

this.sessionId = sessionId;
this.nameOfAction = nameOfAction;
Expand All @@ -69,35 +72,30 @@ private static int ensureIndexIntoTimeToWaitBounds(final int index) {
*/
@Override
public CompletionStage<T> get() {
return CompletableFuture.supplyAsync(() -> this.doGet(1), executorService);
final CompletableFuture<T> result = new CompletableFuture<>();
executorService.submit(() -> this.completeFutureEventually(1, result));
return result;
}

private T doGet(int attempt) {
private void completeFutureEventually(int attempt, final CompletableFuture<T> resultToComplete) {
try {
return retriedSupplier.get();
resultToComplete.complete(retriedSupplier.get());
} catch (final RuntimeException e) {
// log error, but try again (don't end loop)
LOGGER.error("Client <{}>: Failed to <{}>: {}.", sessionId, nameOfAction, e.getMessage());
waitFor(getTimeToWaitForAttempt(attempt));
return doGet(attempt + 1);

final int timeToWaitInSeconds = getTimeToWaitInSecondsForAttempt(attempt);
LOGGER.info("Client <{}>: Waiting for <{}> second(s) before retrying to <{}>.",
sessionId, timeToWaitInSeconds, nameOfAction);
executorService.schedule(() -> this.completeFutureEventually(attempt + 1, resultToComplete),
timeToWaitInSeconds,
TimeUnit.SECONDS);
}
}

private Duration getTimeToWaitForAttempt(int attempt) {
private int getTimeToWaitInSecondsForAttempt(int attempt) {
final int attemptIndex = ensureIndexIntoTimeToWaitBounds(attempt - 1);
return Duration.ofSeconds(TIME_TO_WAIT_BETWEEN_RETRIES_IN_SECONDS[attemptIndex]);
}

private void waitFor(final Duration timeToWait) {
try {
LOGGER.info("Client <{}>: Waiting for <{}> second(s) before retrying to <{}>.",
sessionId, timeToWait.getSeconds(), nameOfAction);
TimeUnit.SECONDS.sleep(timeToWait.getSeconds());
} catch (final InterruptedException ie) {
LOGGER.error("Client <{}>: Interrupted while waiting for the next try to <{}>.",
sessionId, nameOfAction, ie);
Thread.currentThread().interrupt();
}
return TIME_TO_WAIT_BETWEEN_RETRIES_IN_SECONDS[attemptIndex];
}

/**
Expand Down Expand Up @@ -143,7 +141,7 @@ interface RetryBuilderFinal<T> extends Supplier<CompletionStage<T>> {
* @param executorService the executor service.
* @return a new instance of this builder step.
*/
RetryBuilderFinal<T> withExecutor(final ExecutorService executorService);
RetryBuilderFinal<T> withExecutor(final ScheduledExecutorService executorService);

/**
* Executes the provided supplier unit the supplier returns a result.
Expand All @@ -165,16 +163,16 @@ static class RetryBuilder<T> implements RetryBuilderStep1<T>, RetryBuilderFinal<
private final String nameOfAction;
private final Supplier<T> retriedSupplier;
private final String sessionId;
private final ExecutorService executorService;
@Nullable private final ScheduledExecutorService executorService;

private RetryBuilder(final String nameOfAction, final Supplier<T> retriedSupplier) {
this(nameOfAction, retriedSupplier, "", ForkJoinPool.commonPool());
this(nameOfAction, retriedSupplier, "", null);
}

private RetryBuilder(final String nameOfAction,
final Supplier<T> retriedSupplier,
final String sessionId,
final ExecutorService executorService) {
@Nullable final ScheduledExecutorService executorService) {

this.nameOfAction = nameOfAction;
this.retriedSupplier = retriedSupplier;
Expand All @@ -188,12 +186,13 @@ public RetryBuilderFinal<T> inClientSession(final String sessionId) {
}

@Override
public RetryBuilderFinal<T> withExecutor(final ExecutorService executorService) {
public RetryBuilderFinal<T> withExecutor(final ScheduledExecutorService executorService) {
return new RetryBuilder<>(nameOfAction, retriedSupplier, sessionId, executorService);
}

@Override
public CompletionStage<T> get() {
checkNotNull(executorService, "executorService");
return new Retry<>(nameOfAction, sessionId, retriedSupplier, executorService).get();
}

Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -102,8 +103,8 @@ private WebSocketMessagingProvider(final AdaptableBus adaptableBus,
webSocket = new AtomicReference<>();
}

private static ScheduledThreadPoolExecutor createReconnectExecutor() {
return new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory("ditto-client-reconnect"));
private static ScheduledExecutorService createReconnectExecutor() {
return Executors.newScheduledThreadPool(1, new DefaultThreadFactory("ditto-client-reconnect"));
}

/**
Expand Down Expand Up @@ -162,7 +163,7 @@ public MessagingProvider unregisterSubscriptionMessage(final Object key) {
public void initialize() {
synchronized (webSocket) {
if (webSocket.get() == null) {
final ExecutorService connectionExecutor = createConnectionExecutor();
final ScheduledExecutorService connectionExecutor = createConnectionExecutor();
try {
final WebSocket connectedWebSocket = connectWithRetries(this::createWebsocket, connectionExecutor)
.toCompletableFuture()
Expand Down Expand Up @@ -285,13 +286,8 @@ private WebSocket initiateConnection(final WebSocket ws) {
}
}

private static ExecutorService createConnectionExecutor() {
final ThreadPoolExecutor connectionExecutor = new ThreadPoolExecutor(
1, 2, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new DefaultThreadFactory("ditto-client-connect"),
new ThreadPoolExecutor.CallerRunsPolicy());
connectionExecutor.allowCoreThreadTimeOut(true);
return connectionExecutor;
private static ScheduledExecutorService createConnectionExecutor() {
return Executors.newScheduledThreadPool(1, new DefaultThreadFactory("ditto-client-connect"));
}

@Override
Expand Down Expand Up @@ -394,7 +390,7 @@ public void onError(final WebSocket websocket, final WebSocketException cause) {
}

private CompletionStage<WebSocket> connectWithRetries(final Supplier<WebSocket> webSocket,
final ExecutorService executorService) {
final ScheduledExecutorService executorService) {

return Retry
.retryTo("initialize WebSocket connection", () -> initiateConnection(webSocket.get()))
Expand Down

0 comments on commit edce2a8

Please sign in to comment.