Skip to content

Commit

Permalink
enhance WoT skeleton creation with more options
Browse files Browse the repository at this point in the history
* whether to generate defaults for "optional" WoT TM properties
* whether to throw exceptions related to WoT models instead of silently swallowing them

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Dec 19, 2023
1 parent 28bccd8 commit 075bd85
Show file tree
Hide file tree
Showing 17 changed files with 531 additions and 137 deletions.
2 changes: 1 addition & 1 deletion edge/service/src/main/resources/ditto-edge-service.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ditto {

ask-with-retry {
# maximum duration to wait for answers from entity shard regions
ask-timeout = 3s
ask-timeout = 5s
ask-timeout = ${?CONCIERGE_CACHES_ASK_TIMEOUT}

# one of: OFF, NO_DELAY, FIXED_DELAY, BACKOFF_DELAY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ public interface Cache<K, V> {
*/
CompletableFuture<Optional<V>> get(K key);

/**
* Returns a {@link CompletableFuture} returning the value which is associated with the specified key, specifying
* an {@code errorHandler}.
*
* @param key the key to get the associated value for.
* @param errorHandler function to invoke when a {@code Throwable} is encountered by the cache loader.
* @return a {@link CompletableFuture} returning the value which is associated with the specified key or an empty
* {@link Optional}.
* @throws NullPointerException if {@code key} is {@code null}.
*/
CompletableFuture<Optional<V>> get(K key, Function<Throwable, Optional<V>> errorHandler);

/**
* Retrieve the value associated with a key in a future if it exists in the cache, or a future empty optional if
* it does not. The cache loader will never be called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -165,6 +167,20 @@ public CompletableFuture<Optional<V>> get(final K key) {
return asyncCache.get(key, asyncLoad).thenApply(Optional::ofNullable);
}

@Override
public CompletableFuture<Optional<V>> get(final K key, final Function<Throwable, Optional<V>> errorHandler) {
requireNonNull(key);

return asyncCache.get(key, asyncLoad).thenApply(Optional::ofNullable)
.exceptionally(throwable -> {
if (throwable instanceof CompletionException completionException) {
return errorHandler.apply(completionException.getCause());
} else {
return errorHandler.apply(throwable);
}
});
}

/**
* Lookup a value in cache, or create it via {@code mappingFunction} and store it if the value was not cached.
* Only available for Caffeine caches.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public CompletableFuture<Optional<U>> get(final K key) {
return cache.get(key).thenApply(projectOptional);
}

@Override
public CompletableFuture<Optional<U>> get(final K key, final Function<Throwable, Optional<U>> errorHandler) {
return cache.get(key, throwable -> errorHandler.apply(throwable).map(embed)).thenApply(projectOptional);
}

@Override
public CompletableFuture<Optional<U>> getIfPresent(final K key) {
return cache.getIfPresent(key).thenApply(projectOptional);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,20 @@ private record PersistEventAsync<
/**
* Persist an event, modify actor state by the event strategy, then invoke the handler.
*
* @param event the event to persist and apply.
* @param eventStage the event stage to persist and apply.
* @param handler what happens afterwards.
* @param errorHandler the errorHandler to invoke for encountered throwables from the CompletionStage
*/
protected void persistAndApplyEventAsync(final CompletionStage<E> event, final BiConsumer<E, S> handler) {
Patterns.pipe(event.thenApply(e -> new PersistEventAsync<>(e, handler)), getContext().getDispatcher())
.to(getSelf());
protected void persistAndApplyEventAsync(final CompletionStage<E> eventStage, final BiConsumer<E, S> handler,
final Consumer<Throwable> errorHandler) {
Patterns.pipe(eventStage.handle((e, throwable) -> {
if (throwable != null) {
errorHandler.accept(throwable);
return null;
} else {
return new PersistEventAsync<>(e, handler);
}
}), getContext().getDispatcher()).to(getSelf());
}

/**
Expand Down Expand Up @@ -736,9 +744,11 @@ public void onMutation(final Command<?> command, final E event, final WithDittoH
}

@Override
public void onStagedMutation(final Command<?> command, final CompletionStage<E> event,
public void onStagedMutation(final Command<?> command,
final CompletionStage<E> event,
final CompletionStage<WithDittoHeaders> response,
final boolean becomeCreated, final boolean becomeDeleted) {
final boolean becomeCreated,
final boolean becomeDeleted) {

final ActorRef sender = getSender();
persistAndApplyEventAsync(event, (persistedEvent, resultingEntity) -> {
Expand All @@ -751,6 +761,16 @@ public void onStagedMutation(final Command<?> command, final CompletionStage<E>
if (becomeCreated) {
becomeCreatedHandler();
}
}, throwable -> {
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
DittoInternalErrorException.newBuilder()
.cause(t)
.dittoHeaders(command.getDittoHeaders())
.build());
if (shouldSendResponse(command.getDittoHeaders())) {
notifySender(sender, dittoRuntimeException);
}
});
}

Expand All @@ -766,7 +786,13 @@ public void onQuery(final Command<?> command, final WithDittoHeaders response) {
public void onStagedQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
if (command.getDittoHeaders().isResponseRequired()) {
final ActorRef sender = getSender();
response.thenAccept(r -> notifySender(sender, r));
response.whenComplete((r, throwable) -> {
if (throwable instanceof DittoRuntimeException dittoRuntimeException) {
notifySender(sender, dittoRuntimeException);
} else {
notifySender(sender, r);
}
});
}
}

Expand Down Expand Up @@ -883,7 +909,13 @@ private void notifySender(final WithDittoHeaders message) {
}

private void notifySender(final ActorRef sender, final CompletionStage<WithDittoHeaders> message) {
message.thenAccept(msg -> notifySender(sender, msg));
message.whenComplete((msg, throwable) -> {
if (throwable instanceof DittoRuntimeException dittoRuntimeException) {
notifySender(sender, dittoRuntimeException);
} else {
notifySender(sender, msg);
}
});
}

private void takeSnapshotByInterval(final Control takeSnapshot) {
Expand Down Expand Up @@ -1097,19 +1129,23 @@ public void onQuery(final Command<?> command, final WithDittoHeaders response) {
@Override
public void onStagedQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
if (command.getDittoHeaders().isResponseRequired()) {
response.thenAccept(r -> {
final WithDittoHeaders theResponseToSend;
if (response instanceof DittoHeadersSettable<?> dittoHeadersSettable) {
final DittoHeaders queryCommandHeaders = r.getDittoHeaders();
final DittoHeaders adjustedHeaders = queryCommandHeaders.toBuilder()
.putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(),
historicalDittoHeaders.toJson().toString())
.build();
theResponseToSend = dittoHeadersSettable.setDittoHeaders(adjustedHeaders);
response.whenComplete((r, throwable) -> {
if (throwable instanceof DittoRuntimeException dittoRuntimeException) {
notifySender(sender, dittoRuntimeException);
} else {
theResponseToSend = r;
final WithDittoHeaders theResponseToSend;
if (response instanceof DittoHeadersSettable<?> dittoHeadersSettable) {
final DittoHeaders queryCommandHeaders = r.getDittoHeaders();
final DittoHeaders adjustedHeaders = queryCommandHeaders.toBuilder()
.putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(),
historicalDittoHeaders.toJson().toString())
.build();
theResponseToSend = dittoHeadersSettable.setDittoHeaders(adjustedHeaders);
} else {
theResponseToSend = r;
}
notifySender(sender, theResponseToSend);
}
notifySender(sender, theResponseToSend);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public abstract class AbstractPersistenceSupervisor<E extends EntityId, S extend
@Nullable protected ActorRef persistenceActorChild;
@Nullable protected ActorRef enforcerChild;

private final Duration localAskTimeout;
protected final Duration localAskTimeout;

private final ExponentialBackOffConfig exponentialBackOffConfig;
private final SignalTransformer signalTransformer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
import java.util.function.Function;

import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
Expand Down Expand Up @@ -70,6 +70,12 @@ public CompletableFuture<Optional<Entry<PolicyEnforcer>>> get(final PolicyId key
return delegate.get(key);
}

@Override
public CompletableFuture<Optional<Entry<PolicyEnforcer>>> get(final PolicyId key,
final Function<Throwable, Optional<Entry<PolicyEnforcer>>> errorHandler) {
return delegate.get(key, errorHandler);
}

@Override
public CompletableFuture<Optional<Entry<PolicyEnforcer>>> getIfPresent(final PolicyId key) {
return delegate.getIfPresent(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.persistence.RecoveryCompleted;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand Down Expand Up @@ -47,11 +53,6 @@
import org.eclipse.ditto.policies.service.persistence.actors.strategies.commands.PolicyCommandStrategies;
import org.eclipse.ditto.policies.service.persistence.actors.strategies.events.PolicyEventStrategies;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.persistence.RecoveryCompleted;

/**
* PersistentActor which "knows" the state of a single {@link Policy}.
*/
Expand Down Expand Up @@ -279,6 +280,16 @@ public void onStagedMutation(final Command<?> command, final CompletionStage<Pol
if (becomeCreated) {
becomeCreatedHandler();
}
}, throwable -> {
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
DittoInternalErrorException.newBuilder()
.cause(t)
.dittoHeaders(command.getDittoHeaders())
.build());
if (shouldSendResponse(command.getDittoHeaders())) {
notifySender(sender, dittoRuntimeException);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorKilledException;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.FI;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.AskTimeoutException;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -67,19 +80,6 @@
import org.eclipse.ditto.things.service.enforcement.ThingPolicyCreated;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;

import org.apache.pekko.actor.ActorKilledException;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.FI;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.AskTimeoutException;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

/**
* Supervisor for {@link ThingPersistenceActor} which means it will create, start and watch it as child actor.
* <p>
Expand Down Expand Up @@ -244,7 +244,10 @@ public static Props props(final ActorRef pubSubMediator,
@Override
protected CompletionStage<Object> askEnforcerChild(final Signal<?> signal) {

if (signal instanceof ThingCommandResponse<?> thingCommandResponse &&
if (signal instanceof CreateThing createThing && createThing.getThing().getDefinition().isPresent()) {
// for thing creations containing a "definition", retrieving WoT model from URL is involved, give more time:
return Patterns.ask(enforcerChild, signal, localAskTimeout.multipliedBy(3));
} else if (signal instanceof ThingCommandResponse<?> thingCommandResponse &&
CommandResponse.isLiveCommandResponse(thingCommandResponse)) {

return signal.getDittoHeaders().getCorrelationId()
Expand Down Expand Up @@ -337,10 +340,11 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
@Override
protected CompletableFuture<Object> handleTargetActorAndEnforcerException(final Signal<?> signal, final Throwable throwable) {
if (RollbackCreatedPolicy.shouldRollbackBasedOnException(signal, throwable)) {
final Throwable cause = throwable.getCause();
log.withCorrelationId(signal)
.info("Target actor exception received: <{}>. " +
.info("Target actor exception received: <{}>, cause: <{}>. " +
"Sending RollbackCreatedPolicy msg to self, potentially rolling back a created policy.",
throwable.getClass().getSimpleName());
throwable.getClass().getSimpleName(), cause != null ? cause.getClass().getSimpleName() : "-");
final CompletableFuture<Object> responseFuture = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(signal, throwable, responseFuture), getSelf());
return responseFuture;
Expand Down
24 changes: 24 additions & 0 deletions things/service/src/main/resources/things.conf
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,30 @@ ditto {
expire-after-access = ${?THINGS_WOT_THING_MODEL_CACHE_EXPIRE_AFTER_ACCESS}
}

tm-based-creation {
thing {
skeleton-creation-enabled = true
skeleton-creation-enabled = ${?THINGS_WOT_TM_BASED_CREATION_THING_SKELETON_CREATION_ENABLED}

generate-defaults-for-optional-properties = false
generate-defaults-for-optional-properties = ${?THINGS_WOT_TM_BASED_CREATION_THING_GENERATE_DEFAULTS_FOR_OPTIONAL_PROPERTIES}

throw-exception-on-wot-errors = true
throw-exception-on-wot-errors = ${?THINGS_WOT_TM_BASED_CREATION_THING_THROW_EXCEPTION_ON_WOT_ERRORS}
}

feature {
skeleton-creation-enabled = true
skeleton-creation-enabled = ${?THINGS_WOT_TM_BASED_CREATION_FEATURE_SKELETON_CREATION_ENABLED}

generate-defaults-for-optional-properties = false
generate-defaults-for-optional-properties = ${?THINGS_WOT_TM_BASED_CREATION_FEATURE_GENERATE_DEFAULTS_FOR_OPTIONAL_PROPERTIES}

throw-exception-on-wot-errors = true
throw-exception-on-wot-errors = ${?THINGS_WOT_TM_BASED_CREATION_FEATURE_THROW_EXCEPTION_ON_WOT_ERRORS}
}
}

to-thing-description {
base-prefix = "http://localhost:8080"
base-prefix = ${?THINGS_WOT_TO_THING_DESCRIPTION_BASE_PREFIX}
Expand Down
Loading

0 comments on commit 075bd85

Please sign in to comment.