Skip to content

Commit

Permalink
fix WoT skeleton creation doing blocking operations on Akka dispatche…
Browse files Browse the repository at this point in the history
…r thread

* whenever creating a Thing using the WoT "ThingModel" included in the to-be-created thing a `.join()` of a CompletableFuture is called on the main Akka dispatcher thread
* this causes that the complete ActorSystem might be blocked to process any futher messages
* fixed that by making ResultVisitor work with a CompletionStage instead

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jun 26, 2023
1 parent b9667f6 commit 0d67706
Show file tree
Hide file tree
Showing 29 changed files with 346 additions and 196 deletions.
Expand Up @@ -571,8 +571,9 @@ private void askSelfForRetrieveConnectionStatus(@Nullable final CharSequence cor
}

@Override
public void onMutation(final Command<?> command, final ConnectivityEvent<?> event,
final WithDittoHeaders response, final boolean becomeCreated, final boolean becomeDeleted) {
public void onMutation(final Command<?> command, final CompletionStage<ConnectivityEvent<?>> event,
final CompletionStage<WithDittoHeaders> response, final boolean becomeCreated,
final boolean becomeDeleted) {
if (command instanceof StagedCommand stagedCommand) {
interpretStagedCommand(stagedCommand.withSenderUnlessDefined(getSender()));
} else {
Expand Down Expand Up @@ -612,7 +613,7 @@ private void interpretStagedCommand(final StagedCommand command) {
"Failed to handle staged command because required event wasn't present: <{}>",
command));
case PERSIST_AND_APPLY_EVENT -> command.getEvent().ifPresentOrElse(
event -> persistAndApplyEvent(event,
event -> persistAndApplyEvent(CompletableFuture.completedStage(event),
(unusedEvent, connection) -> interpretStagedCommand(command.next())),
() -> log.error("Failed to handle staged command because required event wasn't present: <{}>",
command));
Expand Down
Expand Up @@ -15,6 +15,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand Down Expand Up @@ -518,18 +519,20 @@ protected void becomeDeletedHandler() {
* @param event the event to persist and apply.
* @param handler what happens afterwards.
*/
protected void persistAndApplyEvent(final E event, final BiConsumer<E, S> handler) {
protected void persistAndApplyEvent(final CompletionStage<E> event, final BiConsumer<E, S> handler) {

final E modifiedEvent = modifyEventBeforePersist(event);
if (modifiedEvent.getDittoHeaders().isDryRun()) {
handler.accept(modifiedEvent, entity);
} else {
persistEvent(modifiedEvent, persistedEvent -> {
// after the event was persisted, apply the event on the current actor state
applyEvent(persistedEvent);
handler.accept(persistedEvent, entity);
});
}
event.thenAccept(completedEvent -> {
final E modifiedEvent = modifyEventBeforePersist(completedEvent);
if (modifiedEvent.getDittoHeaders().isDryRun()) {
handler.accept(modifiedEvent, entity);
} else {
persistEvent(modifiedEvent, persistedEvent -> {
// after the event was persisted, apply the event on the current actor state
applyEvent(persistedEvent);
handler.accept(persistedEvent, entity);
});
}
});
}

/**
Expand Down Expand Up @@ -670,8 +673,9 @@ private <T extends Command<?>> void handleByStrategy(final T command, @Nullable
}

@Override
public void onMutation(final Command<?> command, final E event, final WithDittoHeaders response,
final boolean becomeCreated, final boolean becomeDeleted) {
public void onMutation(final Command<?> command, final CompletionStage<E> event,
final CompletionStage<WithDittoHeaders> response,
final boolean becomeCreated, final boolean becomeDeleted) {

persistAndApplyEvent(event, (persistedEvent, resultingEntity) -> {
if (shouldSendResponse(command.getDittoHeaders())) {
Expand Down Expand Up @@ -802,6 +806,10 @@ private void notifySender(final WithDittoHeaders message) {
notifySender(getSender(), message);
}

private void notifySender(final CompletionStage<WithDittoHeaders> message) {
message.thenAccept(msg -> notifySender(getSender(), msg));
}

private void takeSnapshotByInterval(final Control takeSnapshot) {
takeSnapshot("snapshot interval has passed");
}
Expand Down Expand Up @@ -980,8 +988,8 @@ private HistoricalResultListener(final ActorRef sender, final DittoHeaders histo
}

@Override
public void onMutation(final Command<?> command, final E event,
final WithDittoHeaders response,
public void onMutation(final Command<?> command, final CompletionStage<E> event,
final CompletionStage<WithDittoHeaders> response,
final boolean becomeCreated, final boolean becomeDeleted) {
throw new UnsupportedOperationException("Mutating historical entity not supported.");
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ protected Result<E> doApply(final Context<K> context, @Nullable final S entity,
if (commandStrategy != null) {
context.getLog().withCorrelationId(command)
.debug("Applying command <{}>", command);
return commandStrategy.apply(context, entity, nextRevision, command).map(x -> x);
return (Result<E>) commandStrategy.apply(context, entity, nextRevision, command);
} else {
// this may happen when subclasses override the "isDefined" condition.
return unhandled(context, entity, nextRevision, command);
Expand Down
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.internal.utils.persistentactors.results;

import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import org.eclipse.ditto.base.model.signals.events.Event;
Expand Down Expand Up @@ -42,7 +43,7 @@ public void accept(final ResultVisitor<E> visitor) {
}

@Override
public <F extends Event<?>> Result<F> map(final Function<E, F> mappingFunction) {
public <F extends Event<?>> Result<F> map(final Function<CompletionStage<E>, CompletionStage<F>> mappingFunction) {
return getInstance();
}

Expand Down
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.internal.utils.persistentactors.results;

import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -45,7 +46,7 @@ public void accept(final ResultVisitor<E> visitor) {
}

@Override
public <F extends Event<?>> Result<F> map(final Function<E, F> mappingFunction) {
public <F extends Event<?>> Result<F> map(final Function<CompletionStage<E>, CompletionStage<F>> mappingFunction) {
return new ErrorResult<>(dittoRuntimeException, errorCausingCommand);
}
}
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.internal.utils.persistentactors.results;

import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand All @@ -26,37 +27,39 @@
public final class MutationResult<E extends Event<?>> implements Result<E> {

private final Command<?> command;
private final E eventToPersist;
private final WithDittoHeaders response;
private final CompletionStage<E> eventToPersistStage;
private final CompletionStage<WithDittoHeaders> responseStage;
private final boolean becomeCreated;
private final boolean becomeDeleted;

MutationResult(final Command<?> command, final E eventToPersist, final WithDittoHeaders response,
MutationResult(final Command<?> command,
final CompletionStage<E> eventToPersistStage,
final CompletionStage<WithDittoHeaders> responseStage,
final boolean becomeCreated, final boolean becomeDeleted) {
this.command = command;
this.eventToPersist = eventToPersist;
this.response = response;
this.eventToPersistStage = eventToPersistStage;
this.responseStage = responseStage;
this.becomeCreated = becomeCreated;
this.becomeDeleted = becomeDeleted;
}

@Override
public void accept(final ResultVisitor<E> visitor) {
visitor.onMutation(command, eventToPersist, response, becomeCreated, becomeDeleted);
visitor.onMutation(command, eventToPersistStage, responseStage, becomeCreated, becomeDeleted);
}

@Override
public <F extends Event<?>> Result<F> map(final Function<E, F> mappingFunction) {
return new MutationResult<>(command, mappingFunction.apply(eventToPersist), response, becomeCreated,
public <F extends Event<?>> Result<F> map(final Function<CompletionStage<E>, CompletionStage<F>> mappingFunction) {
return new MutationResult<>(command, mappingFunction.apply(eventToPersistStage), responseStage, becomeCreated,
becomeDeleted);
}

@Override
public String toString() {
return this.getClass().getSimpleName() + " [" +
"command=" + command +
", eventToPersist=" + eventToPersist +
", response=" + response +
", eventToPersistStage=" + eventToPersistStage +
", response=" + responseStage +
", becomeCreated=" + becomeCreated +
", becomeDeleted=" + becomeDeleted +
']';
Expand Down
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.internal.utils.persistentactors.results;

import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand Down Expand Up @@ -47,7 +48,7 @@ public void accept(final ResultVisitor<E> visitor) {
}

@Override
public <F extends Event<?>> Result<F> map(final Function<E, F> mappingFunction) {
public <F extends Event<?>> Result<F> map(final Function<CompletionStage<E>, CompletionStage<F>> mappingFunction) {
return new QueryResult<>(command, response);
}
}
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.internal.utils.persistentactors.results;

import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import org.eclipse.ditto.base.model.signals.events.Event;
Expand All @@ -36,7 +37,7 @@ public interface Result<E extends Event<?>> {
* @return the new result.
* @since 2.0.0
*/
<F extends Event<?>> Result<F> map(Function<E, F> mappingFunction);
<F extends Event<?>> Result<F> map(Function<CompletionStage<E>, CompletionStage<F>> mappingFunction);

/**
* @return the empty result
Expand Down
Expand Up @@ -12,6 +12,9 @@
*/
package org.eclipse.ditto.internal.utils.persistentactors.results;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -41,6 +44,23 @@ private ResultFactory() {
public static <E extends Event<?>> Result<E> newMutationResult(final Command<?> command, final E eventToPersist,
final WithDittoHeaders response) {

return newMutationResult(command, CompletableFuture.completedStage(eventToPersist),
CompletableFuture.completedStage(response));
}

/**
* Create a mutation result.
*
* @param command command that caused the mutation.
* @param eventToPersist event of the mutation.
* @param response response of the command.
* @param <E> type of the event.
* @return the result.
*/
public static <E extends Event<?>> Result<E> newMutationResult(final Command<?> command,
final CompletionStage<E> eventToPersist,
final CompletionStage<WithDittoHeaders> response) {

return new MutationResult<>(command, eventToPersist, response, false, false);
}

Expand All @@ -61,6 +81,27 @@ public static <E extends Event<?>> Result<E> newMutationResult(final Command<?>
final boolean becomeCreated,
final boolean becomeDeleted) {

return newMutationResult(command, CompletableFuture.completedStage(eventToPersist),
CompletableFuture.completedStage(response), becomeCreated, becomeDeleted);
}

/**
* Create a mutation result.
*
* @param command command that caused the mutation.
* @param eventToPersist event of the mutation.
* @param response response of the command.
* @param becomeCreated whether the actor should behave as if the entity is created.
* @param becomeDeleted whether the actor should behave as if the entity is deleted.
* @param <E> type of the event.
* @return the result.
*/
public static <E extends Event<?>> Result<E> newMutationResult(final Command<?> command,
final CompletionStage<E> eventToPersist,
final CompletionStage<WithDittoHeaders> response,
final boolean becomeCreated,
final boolean becomeDeleted) {

return new MutationResult<>(command, eventToPersist, response, becomeCreated, becomeDeleted);
}

Expand Down
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.internal.utils.persistentactors.results;

import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.Command;
Expand Down Expand Up @@ -40,8 +42,8 @@ default void onEmpty() {
* @param becomeCreated whether the actor should behave as if the entity is created.
* @param becomeDeleted whether the actor should behave as if the entity is deleted.
*/
void onMutation(Command<?> command, E event, WithDittoHeaders response, boolean becomeCreated,
boolean becomeDeleted);
void onMutation(Command<?> command, CompletionStage<E> event, CompletionStage<WithDittoHeaders> response,
boolean becomeCreated, boolean becomeDeleted);

/**
* Evaluate a query result.
Expand Down
Expand Up @@ -67,10 +67,10 @@ protected AbstractPolicyActionEvent(final String type,
*/
protected SubjectsModifiedPartially aggregateWithSubjectCreatedOrModified(
final Map<Label, Collection<Subject>> initialModifiedSubjects,
final Collection<PolicyActionEvent<?>> otherEvents) {
final Collection<PolicyActionEvent> otherEvents) {

final Map<Label, Collection<Subject>> modifiedSubjects = new LinkedHashMap<>(initialModifiedSubjects);
for (final PolicyActionEvent<?> event : otherEvents) {
for (final PolicyActionEvent event : otherEvents) {
if (event instanceof SubjectCreated) {
final SubjectCreated subjectCreated = (SubjectCreated) event;
final Set<Subject> mergedSubjects =
Expand Down Expand Up @@ -99,10 +99,10 @@ protected SubjectsModifiedPartially aggregateWithSubjectCreatedOrModified(
*/
protected SubjectsDeletedPartially aggregateWithSubjectDeleted(
final Map<Label, Collection<SubjectId>> initialDeletedSubjectIds,
final Collection<PolicyActionEvent<?>> otherEvents) {
final Collection<PolicyActionEvent> otherEvents) {

final Map<Label, Collection<SubjectId>> deletedSubjectIds = new LinkedHashMap<>(initialDeletedSubjectIds);
for (final PolicyActionEvent<?> event : otherEvents) {
for (final PolicyActionEvent event : otherEvents) {
if (event instanceof SubjectDeleted) {
final SubjectDeleted subjectDeleted = (SubjectDeleted) event;
final Collection<SubjectId> existingSubjectIds = deletedSubjectIds.get(subjectDeleted.getLabel());
Expand Down
Expand Up @@ -28,5 +28,5 @@ public interface PolicyActionEvent<T extends PolicyActionEvent<T>> extends Polic
* @param otherPolicyActionEvents the collection of policy action events.
* @return the aggregated event.
*/
PolicyEvent<?> aggregateWith(Collection<PolicyActionEvent<?>> otherPolicyActionEvents);
PolicyEvent<?> aggregateWith(Collection<PolicyActionEvent> otherPolicyActionEvents);
}
Expand Up @@ -202,7 +202,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Js
}

@Override
public SubjectsModifiedPartially aggregateWith(final Collection<PolicyActionEvent<?>> otherPolicyActionEvents) {
public SubjectsModifiedPartially aggregateWith(final Collection<PolicyActionEvent> otherPolicyActionEvents) {
final Map<Label, Collection<Subject>> initialCreatedSubjects =
Stream.of(0).collect(Collectors.toMap(i -> label, i -> Collections.singleton(subject),
(u, v) -> {
Expand Down
Expand Up @@ -188,7 +188,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Js
}

@Override
public SubjectsDeletedPartially aggregateWith(final Collection<PolicyActionEvent<?>> otherPolicyActionEvents) {
public SubjectsDeletedPartially aggregateWith(final Collection<PolicyActionEvent> otherPolicyActionEvents) {
final Map<Label, Collection<SubjectId>> initialDeletedSubjectId =
Stream.of(0).collect(Collectors.toMap(i -> label, i -> Collections.singleton(subjectId),
(u, v) -> {
Expand Down
Expand Up @@ -202,7 +202,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Js
}

@Override
public SubjectsModifiedPartially aggregateWith(final Collection<PolicyActionEvent<?>> otherPolicyActionEvents) {
public SubjectsModifiedPartially aggregateWith(final Collection<PolicyActionEvent> otherPolicyActionEvents) {
final Map<Label, Collection<Subject>> initialModifiedSubjects =
Stream.of(0).collect(Collectors.toMap(i -> label, i -> Collections.singleton(subject),
(u, v) -> {
Expand Down
Expand Up @@ -190,7 +190,7 @@ public int hashCode() {
}

@Override
public SubjectsDeletedPartially aggregateWith(final Collection<PolicyActionEvent<?>> otherPolicyActionEvents) {
public SubjectsDeletedPartially aggregateWith(final Collection<PolicyActionEvent> otherPolicyActionEvents) {
return aggregateWithSubjectDeleted(deletedSubjectIds, otherPolicyActionEvents);
}

Expand Down
Expand Up @@ -184,7 +184,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Js
}

@Override
public SubjectsModifiedPartially aggregateWith(final Collection<PolicyActionEvent<?>> otherPolicyActionEvents) {
public SubjectsModifiedPartially aggregateWith(final Collection<PolicyActionEvent> otherPolicyActionEvents) {
return aggregateWithSubjectCreatedOrModified(modifiedSubjects, otherPolicyActionEvents);
}

Expand Down

0 comments on commit 0d67706

Please sign in to comment.