Skip to content

Commit

Permalink
Adjust to JDK 11 concurrency implementation; make thread leackage tes…
Browse files Browse the repository at this point in the history
…t effective; fix it.

1. Executors do not allocate threads before tasks are submitted. This
   made the thread leak test ineffective as no thread was allocated
   to begin with.

2. Runnables obtained from Executor.shutdownNow() cannot be run
   without another executor because they override Runnable.run()
   to NOT run when the parent executor was shut down.
   Refactor initialization to not rely on executing executor
   tasks in the calling thread.

3. Added an asynchronous client creation interface
   DisconnectedDittoClient to ensure that resources are released
   if client creation failed.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 29, 2020
1 parent dfa519b commit 4fd6fe3
Show file tree
Hide file tree
Showing 12 changed files with 385 additions and 250 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.client;

import java.util.concurrent.CompletionStage;

/**
* Client interface before connecting.
*/
public interface DisconnectedDittoClient {

/**
* Connect the client to the configured Ditto back-end.
* If this method is called more than once, the result is not defined.
*
* @return a future that completes with the connected client.
*/
CompletionStage<DittoClient> connect();

/**
* Release resources held by this client.
*/
void destroy();
}
36 changes: 36 additions & 0 deletions java/src/main/java/org/eclipse/ditto/client/DittoClients.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,40 @@ public static DittoClient newInstance(final MessagingProvider twinMessagingProvi
messageSerializerRegistry);
}

/**
* Creates a new {@link org.eclipse.ditto.client.DittoClient} with a shared {@code Twin} and {@code Live}
* {@link org.eclipse.ditto.client.messaging.MessagingProvider} but do not attempt to connect to the configured
* back-end.
*
* @param messagingProvider the messaging provider for this client.
* @return the disconnected client.
* @since 1.3.0
*/
public static DisconnectedDittoClient newDisconnectedInstance(final MessagingProvider messagingProvider) {
return newDisconnectedInstance(messagingProvider, messagingProvider, messagingProvider,
MessageSerializerFactory.newInstance().getMessageSerializerRegistry());
}

/**
* Creates a new {@link org.eclipse.ditto.client.DittoClient} with a specific {@code Twin}, {@code Live} and
* {@code Policy} {@link org.eclipse.ditto.client.messaging.MessagingProvider} but do not attempt to the configured
* back-end.
*
* @param twinMessagingProvider the messaging provider for the {@code Twin} part of the client.
* @param liveMessagingProvider the messaging provider for the {@code Live} part of the client.
* @param policyMessagingProvider the messaging provider for the {@code Policy} part of the client.
* @param messageSerializerRegistry a registry of {@code MessageSerializer}s for the {@code Live} part of the client.
* @return the disconnected client.
* @since 1.3.0
*/
public static DisconnectedDittoClient newDisconnectedInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider, final MessagingProvider policyMessagingProvider,
final MessageSerializerRegistry messageSerializerRegistry) {

return DefaultDittoClient.newDisconnectedInstance(twinMessagingProvider,
liveMessagingProvider,
policyMessagingProvider,
messageSerializerRegistry);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

import java.text.MessageFormat;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import org.eclipse.ditto.client.DisconnectedDittoClient;
import org.eclipse.ditto.client.DittoClient;
import org.eclipse.ditto.client.changes.ChangeAction;
import org.eclipse.ditto.client.changes.internal.ImmutableChange;
Expand Down Expand Up @@ -81,7 +83,7 @@
*
* @since 1.0.0
*/
public final class DefaultDittoClient implements DittoClient {
public final class DefaultDittoClient implements DittoClient, DisconnectedDittoClient {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDittoClient.class);

Expand Down Expand Up @@ -120,6 +122,35 @@ public static DittoClient newInstance(final MessagingProvider twinMessagingProvi
final MessagingProvider liveMessagingProvider,
final MessagingProvider policyMessagingProvider,
final MessageSerializerRegistry messageSerializerRegistry) {

final DisconnectedDittoClient disconnectedClient =
newDisconnectedInstance(twinMessagingProvider, liveMessagingProvider, policyMessagingProvider,
messageSerializerRegistry);
final CompletionStage<DittoClient> connectFuture = disconnectedClient.connect();

connectFuture.whenComplete((result, error) -> {
if (error != null) {
disconnectedClient.destroy();
}
});

return connectFuture.toCompletableFuture().join();
}

/**
* Create a Ditto client object but do not attempt to connect to the configured back-end.
*
* @param twinMessagingProvider the messaging provider to use for the {@code Twin} aspect.
* @param liveMessagingProvider the messaging provider to use for the {@code Live} aspect.
* @param policyMessagingProvider the messaging provider for the {@code Policy} part of the client.
* @param messageSerializerRegistry registry for all serializers of live messages.
* @return the disconnected client.
*/
public static DisconnectedDittoClient newDisconnectedInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider,
final MessagingProvider policyMessagingProvider,
final MessageSerializerRegistry messageSerializerRegistry) {

final TwinImpl twin = configureTwin(twinMessagingProvider);
final LiveImpl live = configureLive(liveMessagingProvider, messageSerializerRegistry);
final PoliciesImpl policy = configurePolicyClient(policyMessagingProvider);
Expand Down Expand Up @@ -233,16 +264,9 @@ private static OutgoingMessageFactory getOutgoingMessageFactoryForPolicies(
}

private static void init(final PointerBus bus, final MessagingProvider messagingProvider) {
try {
registerKeyBasedDistributorForIncomingEvents(bus);
registerKeyBasedHandlersForIncomingEvents(bus, messagingProvider,
DittoProtocolAdapter.of(HeaderTranslator.empty()));
messagingProvider.initialize();
} catch (final RuntimeException e) {
bus.close();
messagingProvider.close();
throw e;
}
registerKeyBasedDistributorForIncomingEvents(bus);
registerKeyBasedHandlersForIncomingEvents(bus, messagingProvider,
DittoProtocolAdapter.of(HeaderTranslator.empty()));
}

private static void registerKeyBasedDistributorForIncomingEvents(final PointerBus bus) {
Expand Down Expand Up @@ -555,4 +579,12 @@ private static void registerKeyBasedHandlersForIncomingEvents(final PointerBus b
emitAcknowledgement)
);
}

@Override
public CompletionStage<DittoClient> connect() {
return twin.messagingProvider.initializeAsync()
.thenCompose(result -> live.messagingProvider.initializeAsync())
.thenCompose(result -> policies.messagingProvider.initializeAsync())
.thenApply(result -> this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ public void publish(final String message) {

@Override
public void shutdownExecutor() {
LOGGER.trace("Shutting down AdaptableBus Executor");
LOGGER.trace("Shutting down AdaptableBus Executors");
singleThreadedExecutorService.shutdownNow();
scheduledExecutorService.shutdownNow();
}

// call this in a single-threaded executor so that ordering is preserved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

Expand All @@ -41,8 +42,20 @@ public interface MessagingProvider {
/**
* Initializes the Messaging Provider by opening the underlying connections, etc.
* Blocks the calling thread until messaging provider is ready.
* Use {@code initializeAsync} instead.
*/
void initialize();
@Deprecated
default void initialize() {
initializeAsync().toCompletableFuture().join();
}

/**
* Perform initialization asynchronously.
*
* @return a future that completes after initialization completes.
* @since 1.3.0
*/
CompletionStage<?> initializeAsync();

/**
* Returns the {@code AuthenticationConfiguration} of this provider.
Expand Down
Loading

0 comments on commit 4fd6fe3

Please sign in to comment.