Skip to content

Commit

Permalink
also made WoT based queries asynchronous
Browse files Browse the repository at this point in the history
* quite a refactoring was needed overall
* still with an open "TODO" as I got not rid of blocking on the Akka dispatcher thread when persisting a "CreateThing"

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jun 27, 2023
1 parent 9e696d6 commit 84fc4f0
Show file tree
Hide file tree
Showing 24 changed files with 465 additions and 366 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand All @@ -26,6 +27,7 @@
import org.eclipse.ditto.base.api.commands.sudo.SudoCommand;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
Expand Down Expand Up @@ -532,7 +534,7 @@ protected void persistAndApplyEvent(final CompletionStage<E> event, final BiCons
handler.accept(persistedEvent, entity);
});
}
});
}).toCompletableFuture().join(); // TODO TJ is this bad or not? - propbably yes :/
}

/**
Expand Down Expand Up @@ -662,9 +664,14 @@ private <T extends Command<?>> void handleByStrategy(final T command, @Nullable
try {
result = strategy.apply(getStrategyContext(), workEntity, getNextRevisionNumber(), (T) tracedCommand);
result.accept(this);
} catch (final DittoRuntimeException e) {
} catch (final CompletionException | DittoRuntimeException e) {
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(e, throwable ->
DittoInternalErrorException.newBuilder()
.dittoHeaders(command.getDittoHeaders())
.build());
startedSpan.tagAsFailed(e);
result = ResultFactory.newErrorResult(e, tracedCommand);
result = ResultFactory.newErrorResult(dittoRuntimeException, tracedCommand);
result.accept(this);
} finally {
startedSpan.finish();
Expand All @@ -677,9 +684,10 @@ public void onMutation(final Command<?> command, final CompletionStage<E> event,
final CompletionStage<WithDittoHeaders> response,
final boolean becomeCreated, final boolean becomeDeleted) {

final ActorRef sender = getSender();
persistAndApplyEvent(event, (persistedEvent, resultingEntity) -> {
if (shouldSendResponse(command.getDittoHeaders())) {
notifySender(response);
notifySender(sender, response);
}
if (becomeDeleted) {
becomeDeletedHandler();
Expand All @@ -691,9 +699,10 @@ public void onMutation(final Command<?> command, final CompletionStage<E> event,
}

@Override
public void onQuery(final Command<?> command, final WithDittoHeaders response) {
public void onQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
if (command.getDittoHeaders().isResponseRequired()) {
notifySender(response);
final ActorRef sender = getSender();
response.thenAccept(r -> notifySender(sender, r));
}
}

Expand Down Expand Up @@ -806,8 +815,8 @@ private void notifySender(final WithDittoHeaders message) {
notifySender(getSender(), message);
}

private void notifySender(final CompletionStage<WithDittoHeaders> message) {
message.thenAccept(msg -> notifySender(getSender(), msg));
private void notifySender(final ActorRef sender, final CompletionStage<WithDittoHeaders> message) {
message.thenAccept(msg -> notifySender(sender, msg));
}

private void takeSnapshotByInterval(final Control takeSnapshot) {
Expand Down Expand Up @@ -995,20 +1004,22 @@ public void onMutation(final Command<?> command, final CompletionStage<E> event,
}

@Override
public void onQuery(final Command<?> command, final WithDittoHeaders response) {
public void onQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
if (command.getDittoHeaders().isResponseRequired()) {
final WithDittoHeaders theResponseToSend;
if (response instanceof DittoHeadersSettable<?> dittoHeadersSettable) {
final DittoHeaders queryCommandHeaders = response.getDittoHeaders();
final DittoHeaders adjustedHeaders = queryCommandHeaders.toBuilder()
.putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(),
historicalDittoHeaders.toJson().toString())
.build();
theResponseToSend = dittoHeadersSettable.setDittoHeaders(adjustedHeaders);
} else {
theResponseToSend = response;
}
notifySender(sender, theResponseToSend);
response.thenAccept(r -> {
final WithDittoHeaders theResponseToSend;
if (response instanceof DittoHeadersSettable<?> dittoHeadersSettable) {
final DittoHeaders queryCommandHeaders = r.getDittoHeaders();
final DittoHeaders adjustedHeaders = queryCommandHeaders.toBuilder()
.putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(),
historicalDittoHeaders.toJson().toString())
.build();
theResponseToSend = dittoHeadersSettable.setDittoHeaders(adjustedHeaders);
} else {
theResponseToSend = r;
}
notifySender(sender, theResponseToSend);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public String toString() {
return this.getClass().getSimpleName() + " [" +
"command=" + command +
", eventToPersistStage=" + eventToPersistStage +
", response=" + responseStage +
", responseStage=" + responseStage +
", becomeCreated=" + becomeCreated +
", becomeDeleted=" + becomeDeleted +
']';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,28 @@
public final class QueryResult<E extends Event<?>> implements Result<E> {

private final Command<?> command;
private final WithDittoHeaders response;
private final CompletionStage<WithDittoHeaders> responseStage;

QueryResult(final Command<?> command, final WithDittoHeaders response) {
QueryResult(final Command<?> command, final CompletionStage<WithDittoHeaders> responseStage) {
this.command = command;
this.response = response;
this.responseStage = responseStage;
}

@Override
public String toString() {
return this.getClass().getSimpleName() + " [" +
"command=" + command +
", response=" + response +
", responseStage=" + responseStage +
']';
}

@Override
public void accept(final ResultVisitor<E> visitor) {
visitor.onQuery(command, response);
visitor.onQuery(command, responseStage);
}

@Override
public <F extends Event<?>> Result<F> map(final Function<CompletionStage<E>, CompletionStage<F>> mappingFunction) {
return new QueryResult<>(command, response);
return new QueryResult<>(command, responseStage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ public static <E extends Event<?>> Result<E> newErrorResult(final DittoRuntimeEx
*/
public static <E extends Event<?>> Result<E> newQueryResult(final Command<?> command,
final WithDittoHeaders response) {
return newQueryResult(command, CompletableFuture.completedStage(response));
}

/**
* Create a query result.
*
* @param command the query command.
* @param response the response.
* @param <E> type of events (irrelevant).
* @return the result.
*/
public static <E extends Event<?>> Result<E> newQueryResult(final Command<?> command,
final CompletionStage<WithDittoHeaders> response) {
return new QueryResult<>(command, response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void onMutation(Command<?> command, CompletionStage<E> event, CompletionStage<Wi
* @param command the query command.
* @param response the response.
*/
void onQuery(Command<?> command, WithDittoHeaders response);
void onQuery(Command<?> command, CompletionStage<WithDittoHeaders> response);

/**
* Evaluate an error result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,10 @@ public void onMutation(final Command<?> command, final CompletionStage<PolicyEve
final CompletionStage<WithDittoHeaders> response,
final boolean becomeCreated, final boolean becomeDeleted) {

final ActorRef sender = getSender();
persistAndApplyEvent(event, (persistedEvent, resultingEntity) -> {
if (shouldSendResponse(command.getDittoHeaders())) {
response.thenAccept(rsp -> notifySender(getSender(), rsp));
response.thenAccept(rsp -> notifySender(sender, rsp));
}
if (becomeDeleted) {
becomeDeletedHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void onMutation(final Command<?> command, final CompletionStage<PolicyAct
}

@Override
public void onQuery(final Command<?> command, final WithDittoHeaders response) {
public void onQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
// do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void onMutation(final Command<?> command, final CompletionStage<E> event,
}

@Override
public void onQuery(final Command<?> command, final WithDittoHeaders response) {
public void onQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
throw new AssertionError("Expect mutation result, got query response: " + response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void onMutation(final Command<?> command, final CompletionStage<PolicyEve
}

@Override
public void onQuery(final Command command, final WithDittoHeaders response) {
public void onQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
throw new AssertionError("Expect error, got query: " + response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.things.service.persistence.actors;

import java.time.Instant;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -118,18 +119,25 @@ public static Props props(final ThingId thingId,
}

@Override
public void onQuery(final Command<?> command, final WithDittoHeaders response) {
if (response.getDittoHeaders().didLiveChannelConditionMatch()) {
final var liveChannelTimeoutStrategy = response.getDittoHeaders()
.getLiveChannelTimeoutStrategy()
.orElse(LiveChannelTimeoutStrategy.FAIL);
if (liveChannelTimeoutStrategy != LiveChannelTimeoutStrategy.USE_TWIN &&
response instanceof ThingQueryCommandResponse<?> queryResponse) {
super.onQuery(command, queryResponse.setEntity(JsonFactory.nullLiteral()));
return;
public void onQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
final ActorRef sender = getSender();
response.thenAccept(r -> {
if (r.getDittoHeaders().didLiveChannelConditionMatch()) {
final var liveChannelTimeoutStrategy = r.getDittoHeaders()
.getLiveChannelTimeoutStrategy()
.orElse(LiveChannelTimeoutStrategy.FAIL);
if (liveChannelTimeoutStrategy != LiveChannelTimeoutStrategy.USE_TWIN &&
r instanceof ThingQueryCommandResponse<?> queryResponse &&
command.getDittoHeaders().isResponseRequired()) {
notifySender(sender, queryResponse.setEntity(JsonFactory.nullLiteral()));
return;
}
}
}
super.onQuery(command, response);

if (command.getDittoHeaders().isResponseRequired()) {
notifySender(sender, r);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
package org.eclipse.ditto.things.service.persistence.actors.strategies.commands;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.contenttype.ContentType;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.base.model.signals.FeatureToggle;
Expand Down Expand Up @@ -89,7 +91,7 @@ protected Result<ThingEvent<?>> doApply(final Context<ThingId> context,
}
}

private DittoHeadersSettable<?> getRetrieveThingDescriptionResponse(@Nullable final Thing thing,
private CompletionStage<WithDittoHeaders> getRetrieveThingDescriptionResponse(@Nullable final Thing thing,
final RetrieveFeature command) {
final String featureId = command.getFeatureId();
if (thing != null) {
Expand All @@ -98,18 +100,23 @@ private DittoHeadersSettable<?> getRetrieveThingDescriptionResponse(@Nullable fi
.map(feature -> wotThingDescriptionProvider
.provideFeatureTD(command.getEntityId(), thing, feature, command.getDittoHeaders())
)
.map(td -> RetrieveWotThingDescriptionResponse.of(command.getEntityId(), td.toJson(),
command.getDittoHeaders()
.toBuilder()
.contentType(ContentType.APPLICATION_TD_JSON)
.build())
.map(tdStage -> tdStage.thenApply(td ->
RetrieveWotThingDescriptionResponse.of(command.getEntityId(), td.toJson(),
command.getDittoHeaders()
.toBuilder()
.contentType(ContentType.APPLICATION_TD_JSON)
.build()
)
).thenApply(WithDittoHeaders.class::cast)
)
.map(DittoHeadersSettable.class::cast)
.orElseGet(() -> ExceptionFactory.featureNotFound(command.getEntityId(), featureId,
command.getDittoHeaders()));
.orElseGet(() -> CompletableFuture.completedStage(ExceptionFactory.featureNotFound(command.getEntityId(), featureId,
command.getDittoHeaders())
));
} else {
return ExceptionFactory.featureNotFound(command.getEntityId(), featureId,
command.getDittoHeaders());
return CompletableFuture.completedStage(
ExceptionFactory.featureNotFound(command.getEntityId(), featureId,
command.getDittoHeaders())
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.contenttype.ContentType;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.base.model.signals.FeatureToggle;
Expand All @@ -39,7 +42,6 @@
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.wot.integration.provider.WotThingDescriptionProvider;
import org.eclipse.ditto.wot.model.ThingDescription;

import akka.actor.ActorSystem;

Expand Down Expand Up @@ -122,18 +124,24 @@ private static JsonObject getThingJson(final Thing thing, final ThingQueryComman
.orElseGet(() -> thing.toJson(command.getImplementedSchemaVersion()));
}

private DittoHeadersSettable<?> getRetrieveThingDescriptionResponse(@Nullable final Thing thing,
private CompletionStage<WithDittoHeaders> getRetrieveThingDescriptionResponse(@Nullable final Thing thing,
final RetrieveThing command) {
if (thing != null) {
final ThingDescription wotThingDescription = wotThingDescriptionProvider
return wotThingDescriptionProvider
.provideThingTD(thing.getDefinition().orElse(null),
command.getEntityId(),
thing,
command.getDittoHeaders());
return RetrieveWotThingDescriptionResponse.of(command.getEntityId(), wotThingDescription.toJson(),
command.getDittoHeaders().toBuilder().contentType(ContentType.APPLICATION_TD_JSON).build());
command.getDittoHeaders())
.thenApply(wotThingDescription ->
RetrieveWotThingDescriptionResponse.of(command.getEntityId(),
wotThingDescription.toJson(),
command.getDittoHeaders().toBuilder()
.contentType(ContentType.APPLICATION_TD_JSON)
.build()
)
);
} else {
return notAccessible(command);
return CompletableFuture.completedStage(notAccessible(command));
}
}

Expand Down

0 comments on commit 84fc4f0

Please sign in to comment.