Skip to content

Commit

Permalink
Replace return of CompletableFuture with CompletionStage
Browse files Browse the repository at this point in the history
* The completion stages returned by the ditto client are not meant
  to be completed by the user.
* If it's required to still have get and join methods use toCompeltableFuture
  on the CompletionStage

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Feb 8, 2021
1 parent 5acab79 commit ce597dd
Show file tree
Hide file tree
Showing 30 changed files with 394 additions and 375 deletions.
6 changes: 3 additions & 3 deletions java/src/main/java/org/eclipse/ditto/client/DittoClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package org.eclipse.ditto.client;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.client.live.Live;
import org.eclipse.ditto.client.policies.Policies;
Expand Down Expand Up @@ -52,10 +52,10 @@ public interface DittoClient {
* Directly sends a Ditto Protocol {@link Adaptable} message to the established Ditto backend connection.
*
* @param dittoProtocolAdaptable the adaptable to send
* @return a CompletableFuture containing the correlated response to the sent {@code dittoProtocolAdaptable}
* @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable}
* @throws IllegalStateException when no twin/live connection was configured for this client
*/
CompletableFuture<Adaptable> sendDittoProtocol(Adaptable dittoProtocolAdaptable);
CompletionStage<Adaptable> sendDittoProtocol(Adaptable dittoProtocolAdaptable);

/**
* Returns the client's {@link Policies} singleton which provides the necessary functionality to manage and monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ protected CommonManagementImpl(
}

@Override
public CompletableFuture<Void> startConsumption() {
public CompletionStage<Void> startConsumption() {
// do not call doStartConsumption directly
return startConsumption(new Option[]{});
}

@Override
public CompletableFuture<Void> startConsumption(final Option<?>... consumptionOptions) {
public CompletionStage<Void> startConsumption(final Option<?>... consumptionOptions) {

// concurrent consumption requests can have strange effects, so better avoid it
if (!subscriptionRequestPending.compareAndSet(false, true)) {
Expand Down Expand Up @@ -162,9 +162,9 @@ public CompletableFuture<Void> startConsumption(final Option<?>... consumptionOp
* Starts the consumption of twin events / messages / live events and commands.
*
* @param consumptionConfig the configuration Map to apply for the consumption.
* @return a CompletableFuture that terminates when the start operation was successful.
* @return a CompletionStage that terminates when the start operation was successful.
*/
protected abstract CompletableFuture<Void> doStartConsumption(Map<String, String> consumptionConfig);
protected abstract CompletionStage<Void> doStartConsumption(Map<String, String> consumptionConfig);

/**
* Returns the MessagingProvider this CommonManagement uses.
Expand Down Expand Up @@ -261,7 +261,7 @@ protected Optional<F> getFeatureHandle(final ThingId thingId, final String featu
}

@Override
public CompletableFuture<Thing> create(final Option<?>... options) {
public CompletionStage<Thing> create(final Option<?>... options) {
// as the backend adds the default namespace, we can here simply use the empty namespace.
final Thing thing = ThingsModelFactory.newThingBuilder()
.setId(ThingId.generateRandom())
Expand All @@ -270,7 +270,7 @@ public CompletableFuture<Thing> create(final Option<?>... options) {
}

@Override
public CompletableFuture<Thing> create(final ThingId thingId, final Option<?>... options) {
public CompletionStage<Thing> create(final ThingId thingId, final Option<?>... options) {
argumentNotNull(thingId);
argumentNotEmpty(thingId);

Expand All @@ -281,12 +281,12 @@ public CompletableFuture<Thing> create(final ThingId thingId, final Option<?>...
}

@Override
public CompletableFuture<Thing> create(final Thing thing, final Option<?>... options) {
public CompletionStage<Thing> create(final Thing thing, final Option<?>... options) {
return processCreate(thing, null, options);
}

@Override
public CompletableFuture<Thing> create(final JsonObject jsonObject, final Option<?>... options) {
public CompletionStage<Thing> create(final JsonObject jsonObject, final Option<?>... options) {
argumentNotNull(jsonObject);

final Optional<JsonObject> initialPolicy = getInlinePolicyFromThingJson(jsonObject);
Expand All @@ -296,7 +296,7 @@ public CompletableFuture<Thing> create(final JsonObject jsonObject, final Option
}

@Override
public CompletableFuture<Thing> create(final Policy initialPolicy, final Option<?>... options) {
public CompletionStage<Thing> create(final Policy initialPolicy, final Option<?>... options) {
argumentNotNull(initialPolicy, ARGUMENT_INITIAL_POLICY);
// as the backend adds the default namespace, we can here simply use the empty namespace.
final Thing thing = ThingsModelFactory.newThingBuilder()
Expand All @@ -306,7 +306,7 @@ public CompletableFuture<Thing> create(final Policy initialPolicy, final Option<
}

@Override
public CompletableFuture<Thing> create(final ThingId thingId, final JsonObject initialPolicy,
public CompletionStage<Thing> create(final ThingId thingId, final JsonObject initialPolicy,
final Option<?>... options) {
argumentNotNull(thingId, ARGUMENT_THING_ID);
argumentNotEmpty(thingId);
Expand All @@ -320,7 +320,7 @@ public CompletableFuture<Thing> create(final ThingId thingId, final JsonObject i


@Override
public CompletableFuture<Thing> create(final ThingId thingId, final Policy initialPolicy,
public CompletionStage<Thing> create(final ThingId thingId, final Policy initialPolicy,
final Option<?>... options) {
argumentNotNull(thingId, ARGUMENT_THING_ID);
argumentNotEmpty(thingId);
Expand All @@ -333,7 +333,7 @@ public CompletableFuture<Thing> create(final ThingId thingId, final Policy initi
}

@Override
public CompletableFuture<Thing> create(final JsonObject thing, final JsonObject initialPolicy,
public CompletionStage<Thing> create(final JsonObject thing, final JsonObject initialPolicy,
final Option<?>... options) {
argumentNotNull(thing, ARGUMENT_THING);
argumentNotNull(initialPolicy, ARGUMENT_INITIAL_POLICY);
Expand All @@ -342,7 +342,7 @@ public CompletableFuture<Thing> create(final JsonObject thing, final JsonObject
}

@Override
public CompletableFuture<Thing> create(final JsonObject thing, final Policy initialPolicy,
public CompletionStage<Thing> create(final JsonObject thing, final Policy initialPolicy,
final Option<?>... options) {
argumentNotNull(thing, ARGUMENT_THING);
argumentNotNull(initialPolicy, ARGUMENT_INITIAL_POLICY);
Expand All @@ -351,7 +351,7 @@ public CompletableFuture<Thing> create(final JsonObject thing, final Policy init
}

@Override
public CompletableFuture<Thing> create(final Thing thing, final JsonObject initialPolicy,
public CompletionStage<Thing> create(final Thing thing, final JsonObject initialPolicy,
final Option<?>... options) {
argumentNotNull(thing, ARGUMENT_THING);
assertThatThingHasId(thing);
Expand All @@ -361,7 +361,7 @@ public CompletableFuture<Thing> create(final Thing thing, final JsonObject initi
}

@Override
public CompletableFuture<Thing> create(final Thing thing, final Policy initialPolicy,
public CompletionStage<Thing> create(final Thing thing, final Policy initialPolicy,
final Option<?>... options) {
argumentNotNull(thing, ARGUMENT_THING);
assertThatThingHasId(thing);
Expand All @@ -370,7 +370,7 @@ public CompletableFuture<Thing> create(final Thing thing, final Policy initialPo
return processCreate(thing, initialPolicy.toJson(), options);
}

private CompletableFuture<Thing> processCreate(final Thing thing, @Nullable final JsonObject initialPolicy,
private CompletionStage<Thing> processCreate(final Thing thing, @Nullable final JsonObject initialPolicy,
final Option<?>... options) {
argumentNotNull(thing);
assertThatThingHasId(thing);
Expand All @@ -380,8 +380,7 @@ private CompletableFuture<Thing> processCreate(final Thing thing, @Nullable fina
return askThingCommand(command,
// response could be CreateThingResponse or ModifyThingResponse or Acknowledgements.
CommandResponse.class,
CommonManagementImpl::transformModifyResponse)
.toCompletableFuture();
CommonManagementImpl::transformModifyResponse);
}


Expand All @@ -405,14 +404,14 @@ public CompletableFuture<Void> merge(final ThingId thingId, final Thing thing,
}

@Override
public CompletableFuture<Optional<Thing>> put(final Thing thing, final Option<?>... options) {
public CompletionStage<Optional<Thing>> put(final Thing thing, final Option<?>... options) {
argumentNotNull(thing, ARGUMENT_THING);
assertThatThingHasId(thing);
return processPut(thing, null, options);
}

@Override
public CompletableFuture<Optional<Thing>> put(final JsonObject jsonObject, final Option<?>... options) {
public CompletionStage<Optional<Thing>> put(final JsonObject jsonObject, final Option<?>... options) {
argumentNotNull(jsonObject);

final Optional<JsonObject> initialPolicy = getInlinePolicyFromThingJson(jsonObject);
Expand All @@ -422,7 +421,7 @@ public CompletableFuture<Optional<Thing>> put(final JsonObject jsonObject, final
}

@Override
public CompletableFuture<Optional<Thing>> put(final JsonObject thing, final JsonObject initialPolicy,
public CompletionStage<Optional<Thing>> put(final JsonObject thing, final JsonObject initialPolicy,
final Option<?>... options) {
argumentNotNull(thing, ARGUMENT_THING);
argumentNotNull(initialPolicy, ARGUMENT_INITIAL_POLICY);
Expand All @@ -431,7 +430,7 @@ public CompletableFuture<Optional<Thing>> put(final JsonObject thing, final Json
}

@Override
public CompletableFuture<Optional<Thing>> put(final JsonObject thing, final Policy initialPolicy,
public CompletionStage<Optional<Thing>> put(final JsonObject thing, final Policy initialPolicy,
final Option<?>... options) {
argumentNotNull(thing, ARGUMENT_THING);
argumentNotNull(initialPolicy, ARGUMENT_INITIAL_POLICY);
Expand All @@ -440,7 +439,7 @@ public CompletableFuture<Optional<Thing>> put(final JsonObject thing, final Poli
}

@Override
public CompletableFuture<Optional<Thing>> put(final Thing thing, final JsonObject initialPolicy,
public CompletionStage<Optional<Thing>> put(final Thing thing, final JsonObject initialPolicy,
final Option<?>... options) {
argumentNotNull(thing, ARGUMENT_THING);
assertThatThingHasId(thing);
Expand All @@ -450,7 +449,7 @@ public CompletableFuture<Optional<Thing>> put(final Thing thing, final JsonObjec
}

@Override
public CompletableFuture<Optional<Thing>> put(final Thing thing, final Policy initialPolicy,
public CompletionStage<Optional<Thing>> put(final Thing thing, final Policy initialPolicy,
final Option<?>... options) {
argumentNotNull(thing, ARGUMENT_THING);
assertThatThingHasId(thing);
Expand All @@ -459,7 +458,7 @@ public CompletableFuture<Optional<Thing>> put(final Thing thing, final Policy in
return processPut(thing, initialPolicy.toJson(), options);
}

private CompletableFuture<Optional<Thing>> processPut(final Thing thing, @Nullable final JsonObject initialPolicy,
private CompletionStage<Optional<Thing>> processPut(final Thing thing, @Nullable final JsonObject initialPolicy,
final Option<?>... options) {
argumentNotNull(thing);
assertThatThingHasId(thing);
Expand All @@ -469,43 +468,43 @@ private CompletableFuture<Optional<Thing>> processPut(final Thing thing, @Nullab
// response could be CreateThingResponse or ModifyThingResponse or Acknowledgements.
CommandResponse.class,
response -> Optional.ofNullable(transformModifyResponse(response))
).toCompletableFuture();
);
}

@Override
public CompletableFuture<Void> update(final Thing thing, final Option<?>... options) {
public CompletionStage<Void> update(final Thing thing, final Option<?>... options) {
argumentNotNull(thing);
assertThatThingHasId(thing);

return askThingCommand(outgoingMessageFactory.updateThing(thing, options), CommandResponse.class,
this::toVoid).toCompletableFuture();
this::toVoid);
}

@Override
public CompletableFuture<Void> update(final JsonObject jsonObject, final Option<?>... options) {
public CompletionStage<Void> update(final JsonObject jsonObject, final Option<?>... options) {
argumentNotNull(jsonObject);

final Thing thing = ThingsModelFactory.newThing(jsonObject);
return update(thing, options);
}

@Override
public CompletableFuture<Void> delete(final ThingId thingId, final Option<?>... options) {
public CompletionStage<Void> delete(final ThingId thingId, final Option<?>... options) {
argumentNotNull(thingId);

final DeleteThing command = outgoingMessageFactory.deleteThing(thingId, options);
return askThingCommand(command, CommandResponse.class, this::toVoid).toCompletableFuture();
return askThingCommand(command, CommandResponse.class, this::toVoid);
}

@Override
public CompletableFuture<List<Thing>> retrieve(final Iterable<ThingId> thingIds) {
public CompletionStage<List<Thing>> retrieve(final Iterable<ThingId> thingIds) {
argumentNotNull(thingIds);

return sendRetrieveThingsMessage(outgoingMessageFactory.retrieveThings(thingIds));
}

@Override
public CompletableFuture<List<Thing>> retrieve(final ThingId thingId, final ThingId... thingIds) {
public CompletionStage<List<Thing>> retrieve(final ThingId thingId, final ThingId... thingIds) {
argumentNotNull(thingId);
argumentNotNull(thingIds);

Expand All @@ -517,7 +516,7 @@ public CompletableFuture<List<Thing>> retrieve(final ThingId thingId, final Thin
}

@Override
public CompletableFuture<List<Thing>> retrieve(final JsonFieldSelector fieldSelector, final ThingId thingId,
public CompletionStage<List<Thing>> retrieve(final JsonFieldSelector fieldSelector, final ThingId thingId,
final ThingId... thingIds) {

argumentNotNull(thingId);
Expand All @@ -531,7 +530,7 @@ public CompletableFuture<List<Thing>> retrieve(final JsonFieldSelector fieldSele
}

@Override
public CompletableFuture<List<Thing>> retrieve(final JsonFieldSelector fieldSelector,
public CompletionStage<List<Thing>> retrieve(final JsonFieldSelector fieldSelector,
final Iterable<ThingId> thingIds) {

argumentNotNull(fieldSelector);
Expand Down Expand Up @@ -783,9 +782,8 @@ private static void assertThatThingHasId(final Thing thing) {
}
}

private CompletableFuture<List<Thing>> sendRetrieveThingsMessage(final RetrieveThings command) {
return askThingCommand(command, RetrieveThingsResponse.class, RetrieveThingsResponse::getThings)
.toCompletableFuture();
private CompletionStage<List<Thing>> sendRetrieveThingsMessage(final RetrieveThings command) {
return askThingCommand(command, RetrieveThingsResponse.class, RetrieveThingsResponse::getThings);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.text.MessageFormat;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

Expand Down Expand Up @@ -151,7 +150,7 @@ public Policies policies() {
}

@Override
public CompletableFuture<Adaptable> sendDittoProtocol(final Adaptable dittoProtocolAdaptable) {
public CompletionStage<Adaptable> sendDittoProtocol(final Adaptable dittoProtocolAdaptable) {

final TopicPath.Group group = dittoProtocolAdaptable.getTopicPath().getGroup();
switch (group) {
Expand All @@ -164,7 +163,7 @@ public CompletableFuture<Adaptable> sendDittoProtocol(final Adaptable dittoProto
}
}

private CompletableFuture<Adaptable> sendDittoProtocolForThingsGroup(final Adaptable dittoProtocolAdaptable) {
private CompletionStage<Adaptable> sendDittoProtocolForThingsGroup(final Adaptable dittoProtocolAdaptable) {
final TopicPath.Channel channel = dittoProtocolAdaptable.getTopicPath().getChannel();
switch (channel) {
case TWIN:
Expand All @@ -176,7 +175,7 @@ private CompletableFuture<Adaptable> sendDittoProtocolForThingsGroup(final Adapt
}
}

private CompletableFuture<Adaptable> sendDittoProtocolForPoliciesGroup(final Adaptable dittoProtocolAdaptable) {
private CompletionStage<Adaptable> sendDittoProtocolForPoliciesGroup(final Adaptable dittoProtocolAdaptable) {
final TopicPath.Channel channel = dittoProtocolAdaptable.getTopicPath().getChannel();
if (TopicPath.Channel.NONE.equals(channel)) {
return policies.getMessagingProvider().sendAdaptable(dittoProtocolAdaptable);
Expand Down
10 changes: 5 additions & 5 deletions java/src/main/java/org/eclipse/ditto/client/live/Live.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package org.eclipse.ditto.client.live;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.client.live.commands.FeaturePropertiesCommandHandling;
import org.eclipse.ditto.client.live.commands.FeaturesCommandHandling;
Expand Down Expand Up @@ -60,11 +60,11 @@ public interface Live extends CommonManagement<LiveThingHandle, LiveFeatureHandl
/**
* Start consuming changes, messages and commands on this {@code live()} channel.
*
* @return a CompletableFuture that terminates when the start operation was successful.
* @return a CompletionStage that terminates when the start operation was successful.
*/
@Override
// overwritten in order to display a better suiting javadoc for the user
CompletableFuture<Void> startConsumption();
CompletionStage<Void> startConsumption();

/**
* Start consuming changes, messages and commands on this {@code live()} channel with the passed {@code
Expand All @@ -75,10 +75,10 @@ public interface Live extends CommonManagement<LiveThingHandle, LiveFeatureHandl
* <pre>{@code Options.Consumption.namespaces("org.eclipse.ditto.namespace1","org.eclipse.ditto.namespace2");
* Options.Consumption.filter("gt(attributes/counter,42)");}
* </pre>
* @return a CompletableFuture that terminates when the start operation was successful.
* @return a CompletionStage that terminates when the start operation was successful.
*/
@Override
// overwritten in order to display a better suiting javadoc for the user
CompletableFuture<Void> startConsumption(Option<?>... consumptionOptions);
CompletionStage<Void> startConsumption(Option<?>... consumptionOptions);

}
Loading

0 comments on commit ce597dd

Please sign in to comment.