Skip to content

Commit

Permalink
Added new MessagingConfiguration parameter fo intial reconnection beh…
Browse files Browse the repository at this point in the history
…aviour

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Sep 25, 2020
1 parent 3d257c7 commit bf95662
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ public interface MessagingConfiguration {
*/
boolean isReconnectEnabled();

/**
* @return {@code true} if client should retry when connection initialization failed.
*
* @since 1.3.0
*/
boolean isInitialConnectRetryEnabled();

/**
* Returns the proxy configuration.
*
Expand Down Expand Up @@ -117,6 +124,17 @@ interface Builder {
*/
Builder reconnectEnabled(boolean reconnectEnabled);

/**
* Sets if {@code initialConnectRetryEnbaled}.
* <p> Default is disabled. When establishing a new connection, the client doesn't try to reconnect.
*
* @param initialConnectRetryEnabled enables/disables retrying connection initialization.
* @return this builder.
*
* @since 1.3.0
*/
Builder initialConnectRetryEnabled(boolean initialConnectRetryEnabled);

/**
* Sets the {@code proxyConfiguration}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class WebSocketMessagingConfiguration implements MessagingConfigura
private final JsonSchemaVersion jsonSchemaVersion;
private final URI endpointUri;
private final boolean reconnectEnabled;
private final boolean initialConnectRetryEnabled;
@Nullable private final ProxyConfiguration proxyConfiguration;
@Nullable private final TrustStoreConfiguration trustStoreConfiguration;
@Nullable private final Consumer<Throwable> connectionErrorHandler;
Expand All @@ -49,6 +50,7 @@ public WebSocketMessagingConfiguration(final WebSocketMessagingConfigurationBuil

jsonSchemaVersion = builder.jsonSchemaVersion;
reconnectEnabled = builder.reconnectEnabled;
initialConnectRetryEnabled = builder.initialConnectRetryEnabled;
proxyConfiguration = builder.proxyConfiguration;
trustStoreConfiguration = builder.trustStoreConfiguration;
connectionErrorHandler = builder.connectionErrorHandler;
Expand Down Expand Up @@ -80,6 +82,11 @@ public boolean isReconnectEnabled() {
return reconnectEnabled;
}

@Override
public boolean isInitialConnectRetryEnabled() {
return initialConnectRetryEnabled;
}

@Override
public Optional<ProxyConfiguration> getProxyConfiguration() {
return Optional.ofNullable(proxyConfiguration);
Expand All @@ -105,13 +112,15 @@ private static final class WebSocketMessagingConfigurationBuilder implements Mes
private Duration timeout = Duration.ofSeconds(60L);
private URI endpointUri;
private boolean reconnectEnabled;
private boolean initialConnectRetryEnabled;
@Nullable private ProxyConfiguration proxyConfiguration;
private TrustStoreConfiguration trustStoreConfiguration;
@Nullable private Consumer<Throwable> connectionErrorHandler;

private WebSocketMessagingConfigurationBuilder() {
jsonSchemaVersion = JsonSchemaVersion.LATEST;
reconnectEnabled = true;
initialConnectRetryEnabled = false;
proxyConfiguration = null;
connectionErrorHandler = null;
}
Expand Down Expand Up @@ -147,6 +156,12 @@ public MessagingConfiguration.Builder reconnectEnabled(final boolean reconnectEn
return this;
}

@Override
public MessagingConfiguration.Builder initialConnectRetryEnabled(final boolean initialConnectRetryEnabled) {
this.initialConnectRetryEnabled = initialConnectRetryEnabled;
return this;
}

@Override
public MessagingConfiguration.Builder proxyConfiguration(
@Nullable final ProxyConfiguration proxyConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,12 @@ public synchronized void initialize() {
if (webSocket.get() == null) {
final ScheduledExecutorService executor = createConnectionExecutor();
try {
setWebSocket(connectWithRetries(this::createWebsocket, executor).toCompletableFuture().join());
setWebSocket(
connectWithPotentialRetries(this::createWebsocket, executor).toCompletableFuture().join());
initiallyConnected.set(true);
} catch (CompletionException e) {
LOGGER.error("Encountered error during initial connection.");
throw e;
} finally {
executor.shutdownNow();
}
Expand Down Expand Up @@ -316,16 +320,21 @@ public void onError(final WebSocket websocket, final WebSocketException cause) {
});
}

private CompletionStage<WebSocket> connectWithRetries(final Supplier<WebSocket> webSocket,
private CompletionStage<WebSocket> connectWithPotentialRetries(final Supplier<WebSocket> webSocket,
final ScheduledExecutorService executorService) {

return Retry.retryTo("initialize WebSocket connection",
() -> initiateConnection(webSocket.get(), reconnectExecutor), isCancelled::get)
.inClientSession(sessionId)
.withExecutor(executorService)
.notifyOnError(messagingConfiguration.getConnectionErrorHandler().orElse(null))
.isRecoverable(WebSocketMessagingProvider::isRecoverable)
.get();
if (messagingConfiguration.isInitialConnectRetryEnabled()) {
return Retry.retryTo("initialize WebSocket connection",
() -> initiateConnection(webSocket.get(), reconnectExecutor), isCancelled::get)
.inClientSession(sessionId)
.withExecutor(executorService)
.notifyOnError(messagingConfiguration.getConnectionErrorHandler().orElse(null))
.isRecoverable(WebSocketMessagingProvider::isRecoverable)
.get();
} else {
LOGGER.info("Client <{}>: Initializing WebSocket connection without retrying", sessionId);
return initiateConnection(webSocket.get(), executorService);
}
}

private void handleReconnectionIfEnabled() {
Expand All @@ -344,7 +353,7 @@ private void handleReconnectionIfEnabled() {
}

private void reconnectWithRetries() {
this.connectWithRetries(this::recreateWebSocket, reconnectExecutor)
this.connectWithPotentialRetries(this::recreateWebSocket, reconnectExecutor)
.thenAccept(reconnectedWebSocket -> {
setWebSocket(reconnectedWebSocket);
reconnecting.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ private static MessagingConfiguration getDefaultMessagingConfiguration(final Jso
return WebSocketMessagingConfiguration.newBuilder()
.endpoint("ws://localhost:8080")
.jsonSchemaVersion(version)
.initialConnectRetryEnabled(true)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ private MessagingConfiguration configOf(final String uri, final Consumer<Throwab
.endpoint(uri)
.connectionErrorHandler(errorHandler)
.reconnectEnabled(true)
.initialConnectRetryEnabled(true)
.build();
}

Expand Down

0 comments on commit bf95662

Please sign in to comment.