Skip to content

Commit

Permalink
Remove blocking instantiation of DittoClient
Browse files Browse the repository at this point in the history
* Always return DisconnectedDittoClient which can be connected
  asynchronously by calling DisconnectedDittoClient#connect()

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Feb 2, 2021
1 parent 77d1cfd commit 59b2b9f
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 147 deletions.
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@
<!-- globally set version for checking binary compatibility against -->
<!-- whoever changes this to ${revision} or ${project.version} is responsible for API breakage caused by this! -->
<!-- in other words: never do that here! exclude the 'breakages' locally in the japicmp maven plugin if you intentionally break something -->
<binary-compatibility-check.version>1.4.0</binary-compatibility-check.version>
<binary-compatibility-check.version>${revision}</binary-compatibility-check.version>
<!-- skip until first release: -->
<binary-compatibility-check.skip>false</binary-compatibility-check.skip>

Expand Down
108 changes: 5 additions & 103 deletions java/src/main/java/org/eclipse/ditto/client/DittoClients.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,115 +37,17 @@ private DittoClients() {
throw new AssertionError();
}

/**
* Creates a new {@link org.eclipse.ditto.client.DittoClient} with a shared {@code Twin} and {@code Live}
* {@link org.eclipse.ditto.client.messaging.MessagingProvider}.
*
* @param messagingProvider the messaging provider for this client.
* @return the client.
* @throws org.eclipse.ditto.client.messaging.AuthenticationException if authentication failed.
* @throws org.eclipse.ditto.client.messaging.MessagingException if a connection to the configured endpoint
* could not be established
*/
public static DittoClient newInstance(final MessagingProvider messagingProvider) {
return newInstance(messagingProvider, messagingProvider);
}

/**
* Creates a new {@link org.eclipse.ditto.client.DittoClient} with a specific {@code Twin} and {@code Live}
* {@link org.eclipse.ditto.client.messaging.MessagingProvider}.
*
* @param twinMessagingProvider the messaging provider for the {@code Twin} part of the client.
* @param liveMessagingProvider the messaging provider for the {@code Live} part of the client.
* @return the client.
* @throws org.eclipse.ditto.client.messaging.AuthenticationException if authentication failed.
* @throws org.eclipse.ditto.client.messaging.MessagingException if a connection to the configured endpoint
* could not be established
*/
public static DittoClient newInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider) {

return newInstance(twinMessagingProvider, liveMessagingProvider, twinMessagingProvider);
}

/**
* Creates a new {@link org.eclipse.ditto.client.DittoClient} with a specific {@code Twin} and {@code Live}
* {@link org.eclipse.ditto.client.messaging.MessagingProvider}.
*
* @param twinMessagingProvider the messaging provider for the {@code Twin} part of the client.
* @param liveMessagingProvider the messaging provider for the {@code Live} part of the client.
* @param policyMessagingProvider the messaging provider for the {@code Policy} part of the client.
* @return the client.
* @throws org.eclipse.ditto.client.messaging.AuthenticationException if authentication failed.
* @throws org.eclipse.ditto.client.messaging.MessagingException if a connection to the configured endpoint
* could not be established
* @since 1.1.0
*/
public static DittoClient newInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider,
final MessagingProvider policyMessagingProvider) {

final MessageSerializerRegistry messageSerializerRegistry =
MessageSerializerFactory.newInstance().getMessageSerializerRegistry();
return newInstance(twinMessagingProvider, liveMessagingProvider, policyMessagingProvider,
messageSerializerRegistry);
}

/**
* Creates a new {@link org.eclipse.ditto.client.DittoClient} with a specific {@code Twin} and {@code Live}
* {@link org.eclipse.ditto.client.messaging.MessagingProvider}.
*
* @param twinMessagingProvider the messaging provider for the {@code Twin} part of the client.
* @param liveMessagingProvider the messaging provider for the {@code Live} part of the client.
* @param messageSerializerRegistry a registry of {@code MessageSerializer}s for the {@code Live} part of the client.
* @return the client.
* @throws org.eclipse.ditto.client.messaging.AuthenticationException if authentication failed.
* @throws org.eclipse.ditto.client.messaging.MessagingException if a connection to the configured endpoint
* could not be established
*/
public static DittoClient newInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider,
final MessageSerializerRegistry messageSerializerRegistry) {

return newInstance(twinMessagingProvider, liveMessagingProvider, twinMessagingProvider,
messageSerializerRegistry);
}

/**
* Creates a new {@link org.eclipse.ditto.client.DittoClient} with a specific {@code Twin}, {@code Live} and
* {@code Policy} {@link org.eclipse.ditto.client.messaging.MessagingProvider}.
*
* @param twinMessagingProvider the messaging provider for the {@code Twin} part of the client.
* @param liveMessagingProvider the messaging provider for the {@code Live} part of the client.
* @param policyMessagingProvider the messaging provider for the {@code Policy} part of the client.
* @param messageSerializerRegistry a registry of {@code MessageSerializer}s for the {@code Live} part of the client.
* @return the client.
* @throws org.eclipse.ditto.client.messaging.AuthenticationException if authentication failed.
* @throws org.eclipse.ditto.client.messaging.MessagingException if a connection to the configured endpoint
* could not be established
* @since 1.1.0
*/
public static DittoClient newInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider, final MessagingProvider policyMessagingProvider,
final MessageSerializerRegistry messageSerializerRegistry) {

return DefaultDittoClient.newInstance(twinMessagingProvider,
liveMessagingProvider,
policyMessagingProvider,
messageSerializerRegistry);
}

/**
* Creates a new {@link org.eclipse.ditto.client.DittoClient} with a shared {@code Twin} and {@code Live}
* {@link org.eclipse.ditto.client.messaging.MessagingProvider} but does not attempt to connect to the configured
* back-end.
*
* @param messagingProvider the messaging provider for this client.
* @return the disconnected client.
* @since 1.3.0
* @since 2.0.0
*/
public static DisconnectedDittoClient newDisconnectedInstance(final MessagingProvider messagingProvider) {
return newDisconnectedInstance(messagingProvider, messagingProvider, messagingProvider,
public static DisconnectedDittoClient newInstance(final MessagingProvider messagingProvider) {
return newInstance(messagingProvider, messagingProvider, messagingProvider,
MessageSerializerFactory.newInstance().getMessageSerializerRegistry());
}

Expand All @@ -159,9 +61,9 @@ public static DisconnectedDittoClient newDisconnectedInstance(final MessagingPro
* @param policyMessagingProvider the messaging provider for the {@code Policy} part of the client.
* @param messageSerializerRegistry a registry of {@code MessageSerializer}s for the {@code Live} part of the client.
* @return the disconnected client.
* @since 1.3.0
* @since 2.0.0
*/
public static DisconnectedDittoClient newDisconnectedInstance(final MessagingProvider twinMessagingProvider,
public static DisconnectedDittoClient newInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider, final MessagingProvider policyMessagingProvider,
final MessageSerializerRegistry messageSerializerRegistry) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,6 @@ private DefaultDittoClient(final TwinImpl twin, final LiveImpl live, final Polic
handleSpontaneousErrors();
}

/**
* Creates a new {@link org.eclipse.ditto.client.DittoClient}.
*
* @param twinMessagingProvider the messaging provider to use for the {@code Twin} aspect.
* @param liveMessagingProvider the messaging provider to use for the {@code Live} aspect.
* @param policyMessagingProvider the messaging provider for the {@code Policy} part of the client.
* @param messageSerializerRegistry registry for all serializers of live messages.
* @return the client.
*/
public static DittoClient newInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider,
final MessagingProvider policyMessagingProvider,
final MessageSerializerRegistry messageSerializerRegistry) {

final DisconnectedDittoClient disconnectedClient =
newDisconnectedInstance(twinMessagingProvider, liveMessagingProvider, policyMessagingProvider,
messageSerializerRegistry);
final CompletionStage<DittoClient> connectFuture = disconnectedClient.connect();

connectFuture.whenComplete((result, error) -> {
if (error != null) {
disconnectedClient.destroy();
}
});

return connectFuture.toCompletableFuture().join();
}

/**
* Create a Ditto client object but do not attempt to connect to the configured back-end.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@
*/
public interface MessagingProvider {

/**
* Initializes the Messaging Provider by opening the underlying connections, etc.
* Blocks the calling thread until messaging provider is ready.
* @deprecated since 1.3.0. Use {@code initializeAsync} instead.
*/
@Deprecated
default void initialize() {
initializeAsync().toCompletableFuture().join();
}

/**
* Perform initialization asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public final class DittoClientPoliciesTest extends AbstractDittoClientTest {
@Test
public void verifyClientDefaultsToSchemaVersion2ForPolicyCommands() {
messaging = new MockMessagingProvider(JsonSchemaVersion.V_1);
final DittoClient client = DittoClients.newInstance(messaging);
final DittoClient client = DittoClients.newInstance(messaging)
.connect()
.toCompletableFuture()
.join();
assertEventualCompletion(client.policies().retrieve(POLICY_ID).thenRun(client::destroy));
final RetrievePolicy command = expectMsgClass(RetrievePolicy.class);
reply(RetrievePolicyResponse.of(POLICY_ID, POLICY, command.getDittoHeaders()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,14 @@ public final class DittoClientUsageExamples {
private static final Properties CONFIG;

public static void main(final String... args) throws ExecutionException, InterruptedException {
final DittoClient client = DittoClients.newInstance(createMessagingProvider());
final DittoClient client2 = DittoClients.newInstance(createMessagingProvider());
final DittoClient client = DittoClients.newInstance(createMessagingProvider())
.connect()
.toCompletableFuture()
.join();
final DittoClient client2 = DittoClients.newInstance(createMessagingProvider())
.connect()
.toCompletableFuture()
.join();

if (shouldNotSkip("twin.examples")) {
final JsonifiableAdaptable jsonifiableAdaptable = ProtocolFactory.jsonifiableAdaptableFromJson(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ public void before() {
LOGGER.debug("active threads before test: {}", startingThreadNames);
messaging = new MockMessagingProvider();
messaging.onSend(m -> LOGGER.info("Send message: " + m));
client = DittoClients.newInstance(messaging);
client = DittoClients.newInstance(messaging)
.connect()
.toCompletableFuture()
.join();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void noThreadLeakWithWebsocketMessagingProvider() throws Exception {
stringFuture.toCompletableFuture().join();

// WHEN: client is created
final DisconnectedDittoClient client = DittoClients.newDisconnectedInstance(messaging);
final DisconnectedDittoClient client = DittoClients.newInstance(messaging);

// THEN: threads are allocated
Assertions.assertThat(getActiveThreads(startingThreadNames)).isNotEmpty();
Expand Down

0 comments on commit 59b2b9f

Please sign in to comment.