Skip to content

Commit

Permalink
Issue #ditto/757: Add acknowledgements to live commands.
Browse files Browse the repository at this point in the history
For usage example, see:

DittoClientLiveTest.testThingCommandAcknowledgement().

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 16, 2020
1 parent 4c3c3e1 commit b285baf
Show file tree
Hide file tree
Showing 22 changed files with 583 additions and 1,040 deletions.
Expand Up @@ -15,7 +15,6 @@
import java.util.Collection;
import java.util.function.Consumer;

import org.eclipse.ditto.client.changes.AcknowledgementRequestHandle;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;

/**
Expand Down
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.client.changes;
package org.eclipse.ditto.client.ack;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
Expand Down
Expand Up @@ -20,7 +20,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.client.changes.AcknowledgementRequestHandle;
import org.eclipse.ditto.client.ack.AcknowledgementRequestHandle;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
Expand Down
Expand Up @@ -13,17 +13,14 @@
package org.eclipse.ditto.client.changes;

import java.time.Instant;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import org.eclipse.ditto.client.ack.Acknowledgeable;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.entity.type.WithEntityType;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.signals.base.WithId;
Expand Down
Expand Up @@ -26,7 +26,7 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.client.ack.internal.ImmutableAcknowledgementRequestHandle;
import org.eclipse.ditto.client.changes.AcknowledgementRequestHandle;
import org.eclipse.ditto.client.ack.AcknowledgementRequestHandle;
import org.eclipse.ditto.client.changes.Change;
import org.eclipse.ditto.client.changes.ChangeAction;
import org.eclipse.ditto.json.JsonObject;
Expand Down
Expand Up @@ -23,7 +23,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.client.changes.AcknowledgementRequestHandle;
import org.eclipse.ditto.client.ack.AcknowledgementRequestHandle;
import org.eclipse.ditto.client.changes.Change;
import org.eclipse.ditto.client.changes.ChangeAction;
import org.eclipse.ditto.client.changes.FeatureChange;
Expand Down
Expand Up @@ -23,7 +23,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.client.changes.AcknowledgementRequestHandle;
import org.eclipse.ditto.client.ack.AcknowledgementRequestHandle;
import org.eclipse.ditto.client.changes.Change;
import org.eclipse.ditto.client.changes.ChangeAction;
import org.eclipse.ditto.client.changes.FeaturesChange;
Expand Down
Expand Up @@ -23,7 +23,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.client.changes.AcknowledgementRequestHandle;
import org.eclipse.ditto.client.ack.AcknowledgementRequestHandle;
import org.eclipse.ditto.client.changes.Change;
import org.eclipse.ditto.client.changes.ChangeAction;
import org.eclipse.ditto.client.changes.ThingChange;
Expand Down
Expand Up @@ -12,7 +12,16 @@
*/
package org.eclipse.ditto.client.live;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Stream;

import org.eclipse.ditto.client.live.commands.LiveCommandHandler;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.live.base.LiveCommand;
import org.eclipse.ditto.signals.commands.live.base.LiveCommandAnswer;
import org.eclipse.ditto.signals.commands.live.base.LiveCommandAnswerBuilder;
import org.slf4j.Logger;

/**
* Internal interface for implementations capable of processing {@link LiveCommand}s.
Expand All @@ -21,13 +30,86 @@
*/
public interface LiveCommandProcessor {

/**
* Get a concurrent map of live command handlers.
*
* @return the live command handler.
*/
Map<Class<? extends LiveCommand<?, ?>>, LiveCommandHandler<?, ?>> getLiveCommandHandlers();

/**
* Publish a signal.
*
* @param signal the signal to publish.
*/
void publishLiveSignal(Signal<?> signal);

/**
* Retrieve the logger.
*
* @return the logger.
*/
Logger getLogger();

/**
* Register a live command handler.
*
* @param liveCommandHandler the live command handler.
* @throws java.lang.IllegalStateException if the live command handled by the live command handler is already
* registered.
*/
default void register(final LiveCommandHandler<?, ?> liveCommandHandler) {
final Class<? extends LiveCommand<?, ?>> liveCommandClass = liveCommandHandler.getType();
getLiveCommandHandlers().compute(liveCommandClass, (clazz, handler) -> {
if (getLiveCommandHandlers().containsKey(liveCommandClass)) {
throw new IllegalStateException(
"A Function for '" + liveCommandClass.getSimpleName() + "' is already " +
"defined. Stop the registered handler before registering a new handler.");
} else {
return liveCommandHandler;
}
});
}

/**
* Remove the registration for a live command.
*
* @param liveCommandClass the class of the live command whose handler should be removed.
*/
default void unregister(final Class<? extends LiveCommand<?, ?>> liveCommandClass) {
getLiveCommandHandlers().remove(liveCommandClass);
}

/**
* Processes the passed {@link LiveCommand} and reports the successful processing via return value.
*
* @param liveCommand the live command to process
* @return {@code true} when the passed {@code liveCommand} was successfully processed, {@code false} if either the
* implementation did not have a function to handle the type or a RuntimeException occurred during invocation.
*/
boolean processLiveCommand(LiveCommand liveCommand);
default boolean processLiveCommand(final LiveCommand<?, ?> liveCommand) {
return Arrays.stream(liveCommand.getClass().getInterfaces())
.flatMap(clazz -> {
final LiveCommandHandler<?, ?> handler = getLiveCommandHandlers().get(clazz);
return handler == null ? Stream.empty() : Stream.of(handler);
})
.map(handler -> {
try {
final LiveCommandAnswerBuilder.BuildStep builder =
handler.castAndApply(liveCommand, this::publishLiveSignal);
final LiveCommandAnswer liveCommandAnswer = builder.build();
liveCommandAnswer.getResponse().ifPresent(this::publishLiveSignal);
liveCommandAnswer.getEvent().ifPresent(this::publishLiveSignal);
return true;
} catch (final RuntimeException e) {
getLogger().error(
"User defined function which processed LiveCommand '{}' threw RuntimeException: {}",
liveCommand.getType(), e.getMessage(), e);
return false;
}
})
.findAny()
.orElse(false);
}

}
Expand Up @@ -14,6 +14,7 @@

import java.util.function.Function;

import org.eclipse.ditto.client.live.LiveCommandProcessor;
import org.eclipse.ditto.signals.commands.live.base.LiveCommandAnswerBuilder;
import org.eclipse.ditto.signals.commands.live.modify.DeleteFeaturePropertiesLiveCommand;
import org.eclipse.ditto.signals.commands.live.modify.DeleteFeaturePropertyLiveCommand;
Expand All @@ -29,7 +30,7 @@
*
* @since 1.0.0
*/
public interface FeaturePropertiesCommandHandling {
public interface FeaturePropertiesCommandHandling extends LiveCommandProcessor {

/**
* Registers a handler to receive {@link org.eclipse.ditto.signals.commands.things.modify.ModifyFeatureProperties
Expand All @@ -41,14 +42,18 @@ public interface FeaturePropertiesCommandHandling {
* calling this method
* @see #stopHandlingModifyFeaturePropertiesCommands()
*/
void handleModifyFeaturePropertiesCommands(
Function<ModifyFeaturePropertiesLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler);
default void handleModifyFeaturePropertiesCommands(
final Function<ModifyFeaturePropertiesLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler) {
register(LiveCommandHandler.of(ModifyFeaturePropertiesLiveCommand.class, handler));
}

/**
* De-registers the handler to receive {@link org.eclipse.ditto.signals.commands.things.modify.ModifyFeatureProperties
* ModifyFeatureProperties} commands.
*/
void stopHandlingModifyFeaturePropertiesCommands();
default void stopHandlingModifyFeaturePropertiesCommands() {
unregister(ModifyFeaturePropertiesLiveCommand.class);
}

/**
* Registers a handler to receive {@link org.eclipse.ditto.signals.commands.things.modify.DeleteFeatureProperties
Expand All @@ -60,14 +65,18 @@ void handleModifyFeaturePropertiesCommands(
* calling this method
* @see #stopHandlingDeleteFeaturePropertiesCommands()
*/
void handleDeleteFeaturePropertiesCommands(
Function<DeleteFeaturePropertiesLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler);
default void handleDeleteFeaturePropertiesCommands(
final Function<DeleteFeaturePropertiesLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler) {
register(LiveCommandHandler.of(DeleteFeaturePropertiesLiveCommand.class, handler));
}

/**
* De-registers the handler to receive {@link org.eclipse.ditto.signals.commands.things.modify.DeleteFeatureProperties
* DeleteFeatureProperties} commands.
*/
void stopHandlingDeleteFeaturePropertiesCommands();
default void stopHandlingDeleteFeaturePropertiesCommands() {
unregister(DeleteFeaturePropertiesLiveCommand.class);
}

/**
* Registers a handler to receive {@link org.eclipse.ditto.signals.commands.things.modify.ModifyFeatureProperty
Expand All @@ -79,14 +88,18 @@ void handleDeleteFeaturePropertiesCommands(
* calling this method
* @see #stopHandlingModifyFeaturePropertyCommands()
*/
void handleModifyFeaturePropertyCommands(
Function<ModifyFeaturePropertyLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler);
default void handleModifyFeaturePropertyCommands(
final Function<ModifyFeaturePropertyLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler) {
register(LiveCommandHandler.of(ModifyFeaturePropertyLiveCommand.class, handler));
}

/**
* De-registers the handler to receive {@link org.eclipse.ditto.signals.commands.things.modify.ModifyFeatureProperty
* ModifyFeatureProperty} commands.
*/
void stopHandlingModifyFeaturePropertyCommands();
default void stopHandlingModifyFeaturePropertyCommands() {
unregister(ModifyFeaturePropertyLiveCommand.class);
}

/**
* Registers a handler to receive {@link org.eclipse.ditto.signals.commands.things.modify.DeleteFeatureProperty
Expand All @@ -98,14 +111,18 @@ void handleModifyFeaturePropertyCommands(
* calling this method
* @see #stopHandlingDeleteFeaturePropertyCommands()
*/
void handleDeleteFeaturePropertyCommands(
Function<DeleteFeaturePropertyLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler);
default void handleDeleteFeaturePropertyCommands(
final Function<DeleteFeaturePropertyLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler) {
register(LiveCommandHandler.of(DeleteFeaturePropertyLiveCommand.class, handler));
}

/**
* De-registers the handler to receive {@link org.eclipse.ditto.signals.commands.things.modify.DeleteFeatureProperty
* DeleteFeatureProperty} commands.
*/
void stopHandlingDeleteFeaturePropertyCommands();
default void stopHandlingDeleteFeaturePropertyCommands() {
unregister(DeleteFeaturePropertyLiveCommand.class);
}

/**
* Registers a handler to receive {@link org.eclipse.ditto.signals.commands.things.query.RetrieveFeatureProperty
Expand All @@ -117,14 +134,18 @@ void handleDeleteFeaturePropertyCommands(
* calling this method
* @see #stopHandlingRetrieveFeaturePropertyCommands()
*/
void handleRetrieveFeaturePropertyCommands(
Function<RetrieveFeaturePropertyLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler);
default void handleRetrieveFeaturePropertyCommands(
final Function<RetrieveFeaturePropertyLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler) {
register(LiveCommandHandler.of(RetrieveFeaturePropertyLiveCommand.class, handler));
}

/**
* De-registers the handler to receive {@link org.eclipse.ditto.signals.commands.things.query.RetrieveFeatureProperty
* RetrieveFeatureProperty} commands.
*/
void stopHandlingRetrieveFeaturePropertyCommands();
default void stopHandlingRetrieveFeaturePropertyCommands() {
unregister(RetrieveFeaturePropertyLiveCommand.class);
}

/**
* Registers a handler to receive {@link org.eclipse.ditto.signals.commands.things.query.RetrieveFeatureProperties
Expand All @@ -136,13 +157,17 @@ void handleRetrieveFeaturePropertyCommands(
* calling this method
* @see #stopHandlingRetrieveFeaturePropertiesCommands()
*/
void handleRetrieveFeaturePropertiesCommands(
Function<RetrieveFeaturePropertiesLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler);
default void handleRetrieveFeaturePropertiesCommands(
Function<RetrieveFeaturePropertiesLiveCommand, LiveCommandAnswerBuilder.BuildStep> handler) {
register(LiveCommandHandler.of(RetrieveFeaturePropertiesLiveCommand.class, handler));
}

/**
* De-registers the handler to receive {@link org.eclipse.ditto.signals.commands.things.query.RetrieveFeatureProperties
* RetrieveFeatureProperties} commands.
*/
void stopHandlingRetrieveFeaturePropertiesCommands();
default void stopHandlingRetrieveFeaturePropertiesCommands() {
unregister(RetrieveFeaturePropertiesLiveCommand.class);
}

}

0 comments on commit b285baf

Please sign in to comment.