From 84fc4f003ef7ca2707b85492baf3534fef21d5a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20J=C3=A4ckle?= Date: Tue, 27 Jun 2023 17:22:26 +0200 Subject: [PATCH] also made WoT based queries asynchronous MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * quite a refactoring was needed overall * still with an open "TODO" as I got not rid of blocking on the Akka dispatcher thread when persisting a "CreateThing" Signed-off-by: Thomas Jäckle --- .../AbstractPersistenceActor.java | 53 ++-- .../results/MutationResult.java | 2 +- .../persistentactors/results/QueryResult.java | 12 +- .../results/ResultFactory.java | 13 + .../results/ResultVisitor.java | 2 +- .../actors/PolicyPersistenceActor.java | 3 +- .../TopLevelPolicyActionCommandStrategy.java | 2 +- .../AbstractPolicyCommandStrategyTest.java | 2 +- .../commands/PolicyConflictStrategyTest.java | 2 +- .../actors/ThingPersistenceActor.java | 30 +- .../commands/RetrieveFeatureStrategy.java | 31 +- .../commands/RetrieveThingStrategy.java | 22 +- .../commands/AbstractCommandStrategyTest.java | 26 +- .../commands/ResultFactoryTest.java | 9 +- .../commands/ThingConflictStrategyTest.java | 2 +- .../DefaultWotThingDescriptionGenerator.java | 86 +++--- ...DefaultWotThingModelExtensionResolver.java | 103 ++++--- .../DefaultWotThingSkeletonGenerator.java | 284 +++++++++--------- .../WotThingDescriptionGenerator.java | 3 +- .../WotThingModelExtensionResolver.java | 6 +- .../generator/WotThingSkeletonGenerator.java | 5 +- .../DefaultWotThingDescriptionProvider.java | 122 ++++---- .../provider/DefaultWotThingModelFetcher.java | 7 +- .../provider/WotThingDescriptionProvider.java | 4 +- 24 files changed, 465 insertions(+), 366 deletions(-) diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java index 6e808fb533..84aadf0a07 100755 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java @@ -15,6 +15,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Optional; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -26,6 +27,7 @@ import org.eclipse.ditto.base.api.commands.sudo.SudoCommand; import org.eclipse.ditto.base.model.entity.id.EntityId; import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId; +import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; @@ -532,7 +534,7 @@ protected void persistAndApplyEvent(final CompletionStage event, final BiCons handler.accept(persistedEvent, entity); }); } - }); + }).toCompletableFuture().join(); // TODO TJ is this bad or not? - propbably yes :/ } /** @@ -662,9 +664,14 @@ private > void handleByStrategy(final T command, @Nullable try { result = strategy.apply(getStrategyContext(), workEntity, getNextRevisionNumber(), (T) tracedCommand); result.accept(this); - } catch (final DittoRuntimeException e) { + } catch (final CompletionException | DittoRuntimeException e) { + final DittoRuntimeException dittoRuntimeException = + DittoRuntimeException.asDittoRuntimeException(e, throwable -> + DittoInternalErrorException.newBuilder() + .dittoHeaders(command.getDittoHeaders()) + .build()); startedSpan.tagAsFailed(e); - result = ResultFactory.newErrorResult(e, tracedCommand); + result = ResultFactory.newErrorResult(dittoRuntimeException, tracedCommand); result.accept(this); } finally { startedSpan.finish(); @@ -677,9 +684,10 @@ public void onMutation(final Command command, final CompletionStage event, final CompletionStage response, final boolean becomeCreated, final boolean becomeDeleted) { + final ActorRef sender = getSender(); persistAndApplyEvent(event, (persistedEvent, resultingEntity) -> { if (shouldSendResponse(command.getDittoHeaders())) { - notifySender(response); + notifySender(sender, response); } if (becomeDeleted) { becomeDeletedHandler(); @@ -691,9 +699,10 @@ public void onMutation(final Command command, final CompletionStage event, } @Override - public void onQuery(final Command command, final WithDittoHeaders response) { + public void onQuery(final Command command, final CompletionStage response) { if (command.getDittoHeaders().isResponseRequired()) { - notifySender(response); + final ActorRef sender = getSender(); + response.thenAccept(r -> notifySender(sender, r)); } } @@ -806,8 +815,8 @@ private void notifySender(final WithDittoHeaders message) { notifySender(getSender(), message); } - private void notifySender(final CompletionStage message) { - message.thenAccept(msg -> notifySender(getSender(), msg)); + private void notifySender(final ActorRef sender, final CompletionStage message) { + message.thenAccept(msg -> notifySender(sender, msg)); } private void takeSnapshotByInterval(final Control takeSnapshot) { @@ -995,20 +1004,22 @@ public void onMutation(final Command command, final CompletionStage event, } @Override - public void onQuery(final Command command, final WithDittoHeaders response) { + public void onQuery(final Command command, final CompletionStage response) { if (command.getDittoHeaders().isResponseRequired()) { - final WithDittoHeaders theResponseToSend; - if (response instanceof DittoHeadersSettable dittoHeadersSettable) { - final DittoHeaders queryCommandHeaders = response.getDittoHeaders(); - final DittoHeaders adjustedHeaders = queryCommandHeaders.toBuilder() - .putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), - historicalDittoHeaders.toJson().toString()) - .build(); - theResponseToSend = dittoHeadersSettable.setDittoHeaders(adjustedHeaders); - } else { - theResponseToSend = response; - } - notifySender(sender, theResponseToSend); + response.thenAccept(r -> { + final WithDittoHeaders theResponseToSend; + if (response instanceof DittoHeadersSettable dittoHeadersSettable) { + final DittoHeaders queryCommandHeaders = r.getDittoHeaders(); + final DittoHeaders adjustedHeaders = queryCommandHeaders.toBuilder() + .putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), + historicalDittoHeaders.toJson().toString()) + .build(); + theResponseToSend = dittoHeadersSettable.setDittoHeaders(adjustedHeaders); + } else { + theResponseToSend = r; + } + notifySender(sender, theResponseToSend); + }); } } diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/MutationResult.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/MutationResult.java index 529bf85fac..eee22c24e4 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/MutationResult.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/MutationResult.java @@ -59,7 +59,7 @@ public String toString() { return this.getClass().getSimpleName() + " [" + "command=" + command + ", eventToPersistStage=" + eventToPersistStage + - ", response=" + responseStage + + ", responseStage=" + responseStage + ", becomeCreated=" + becomeCreated + ", becomeDeleted=" + becomeDeleted + ']'; diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/QueryResult.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/QueryResult.java index 21911d6d06..87ec7d0d40 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/QueryResult.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/QueryResult.java @@ -27,28 +27,28 @@ public final class QueryResult> implements Result { private final Command command; - private final WithDittoHeaders response; + private final CompletionStage responseStage; - QueryResult(final Command command, final WithDittoHeaders response) { + QueryResult(final Command command, final CompletionStage responseStage) { this.command = command; - this.response = response; + this.responseStage = responseStage; } @Override public String toString() { return this.getClass().getSimpleName() + " [" + "command=" + command + - ", response=" + response + + ", responseStage=" + responseStage + ']'; } @Override public void accept(final ResultVisitor visitor) { - visitor.onQuery(command, response); + visitor.onQuery(command, responseStage); } @Override public > Result map(final Function, CompletionStage> mappingFunction) { - return new QueryResult<>(command, response); + return new QueryResult<>(command, responseStage); } } diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultFactory.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultFactory.java index 1020b4f6f5..b86e54dcfb 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultFactory.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultFactory.java @@ -128,6 +128,19 @@ public static > Result newErrorResult(final DittoRuntimeEx */ public static > Result newQueryResult(final Command command, final WithDittoHeaders response) { + return newQueryResult(command, CompletableFuture.completedStage(response)); + } + + /** + * Create a query result. + * + * @param command the query command. + * @param response the response. + * @param type of events (irrelevant). + * @return the result. + */ + public static > Result newQueryResult(final Command command, + final CompletionStage response) { return new QueryResult<>(command, response); } diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultVisitor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultVisitor.java index 57548b391c..a4a1c6212b 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultVisitor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/results/ResultVisitor.java @@ -51,7 +51,7 @@ void onMutation(Command command, CompletionStage event, CompletionStage command, WithDittoHeaders response); + void onQuery(Command command, CompletionStage response); /** * Evaluate an error result. diff --git a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java index a2d454f53b..1aaf5b622b 100755 --- a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java +++ b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java @@ -250,9 +250,10 @@ public void onMutation(final Command command, final CompletionStage response, final boolean becomeCreated, final boolean becomeDeleted) { + final ActorRef sender = getSender(); persistAndApplyEvent(event, (persistedEvent, resultingEntity) -> { if (shouldSendResponse(command.getDittoHeaders())) { - response.thenAccept(rsp -> notifySender(getSender(), rsp)); + response.thenAccept(rsp -> notifySender(sender, rsp)); } if (becomeDeleted) { becomeDeletedHandler(); diff --git a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/TopLevelPolicyActionCommandStrategy.java b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/TopLevelPolicyActionCommandStrategy.java index ff93ab88ab..4c9aee4c86 100644 --- a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/TopLevelPolicyActionCommandStrategy.java +++ b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/TopLevelPolicyActionCommandStrategy.java @@ -197,7 +197,7 @@ public void onMutation(final Command command, final CompletionStage command, final WithDittoHeaders response) { + public void onQuery(final Command command, final CompletionStage response) { // do nothing } diff --git a/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/AbstractPolicyCommandStrategyTest.java b/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/AbstractPolicyCommandStrategyTest.java index 49d0625fc5..3f89658b64 100644 --- a/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/AbstractPolicyCommandStrategyTest.java +++ b/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/AbstractPolicyCommandStrategyTest.java @@ -215,7 +215,7 @@ public void onMutation(final Command command, final CompletionStage event, } @Override - public void onQuery(final Command command, final WithDittoHeaders response) { + public void onQuery(final Command command, final CompletionStage response) { throw new AssertionError("Expect mutation result, got query response: " + response); } diff --git a/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/PolicyConflictStrategyTest.java b/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/PolicyConflictStrategyTest.java index f19460f103..f8f440e0c3 100644 --- a/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/PolicyConflictStrategyTest.java +++ b/policies/service/src/test/java/org/eclipse/ditto/policies/service/persistence/actors/strategies/commands/PolicyConflictStrategyTest.java @@ -109,7 +109,7 @@ public void onMutation(final Command command, final CompletionStage command, final CompletionStage response) { throw new AssertionError("Expect error, got query: " + response); } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java index 5245df3341..a27224e559 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java @@ -13,6 +13,7 @@ package org.eclipse.ditto.things.service.persistence.actors; import java.time.Instant; +import java.util.concurrent.CompletionStage; import javax.annotation.Nullable; @@ -118,18 +119,25 @@ public static Props props(final ThingId thingId, } @Override - public void onQuery(final Command command, final WithDittoHeaders response) { - if (response.getDittoHeaders().didLiveChannelConditionMatch()) { - final var liveChannelTimeoutStrategy = response.getDittoHeaders() - .getLiveChannelTimeoutStrategy() - .orElse(LiveChannelTimeoutStrategy.FAIL); - if (liveChannelTimeoutStrategy != LiveChannelTimeoutStrategy.USE_TWIN && - response instanceof ThingQueryCommandResponse queryResponse) { - super.onQuery(command, queryResponse.setEntity(JsonFactory.nullLiteral())); - return; + public void onQuery(final Command command, final CompletionStage response) { + final ActorRef sender = getSender(); + response.thenAccept(r -> { + if (r.getDittoHeaders().didLiveChannelConditionMatch()) { + final var liveChannelTimeoutStrategy = r.getDittoHeaders() + .getLiveChannelTimeoutStrategy() + .orElse(LiveChannelTimeoutStrategy.FAIL); + if (liveChannelTimeoutStrategy != LiveChannelTimeoutStrategy.USE_TWIN && + r instanceof ThingQueryCommandResponse queryResponse && + command.getDittoHeaders().isResponseRequired()) { + notifySender(sender, queryResponse.setEntity(JsonFactory.nullLiteral())); + return; + } } - } - super.onQuery(command, response); + + if (command.getDittoHeaders().isResponseRequired()) { + notifySender(sender, r); + } + }); } @Override diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/RetrieveFeatureStrategy.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/RetrieveFeatureStrategy.java index f5602ea3e4..e0669d82d3 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/RetrieveFeatureStrategy.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/RetrieveFeatureStrategy.java @@ -13,6 +13,8 @@ package org.eclipse.ditto.things.service.persistence.actors.strategies.commands; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; @@ -20,7 +22,7 @@ import org.eclipse.ditto.base.model.entity.metadata.Metadata; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.base.model.headers.DittoHeadersSettable; +import org.eclipse.ditto.base.model.headers.WithDittoHeaders; import org.eclipse.ditto.base.model.headers.contenttype.ContentType; import org.eclipse.ditto.base.model.headers.entitytag.EntityTag; import org.eclipse.ditto.base.model.signals.FeatureToggle; @@ -89,7 +91,7 @@ protected Result> doApply(final Context context, } } - private DittoHeadersSettable getRetrieveThingDescriptionResponse(@Nullable final Thing thing, + private CompletionStage getRetrieveThingDescriptionResponse(@Nullable final Thing thing, final RetrieveFeature command) { final String featureId = command.getFeatureId(); if (thing != null) { @@ -98,18 +100,23 @@ private DittoHeadersSettable getRetrieveThingDescriptionResponse(@Nullable fi .map(feature -> wotThingDescriptionProvider .provideFeatureTD(command.getEntityId(), thing, feature, command.getDittoHeaders()) ) - .map(td -> RetrieveWotThingDescriptionResponse.of(command.getEntityId(), td.toJson(), - command.getDittoHeaders() - .toBuilder() - .contentType(ContentType.APPLICATION_TD_JSON) - .build()) + .map(tdStage -> tdStage.thenApply(td -> + RetrieveWotThingDescriptionResponse.of(command.getEntityId(), td.toJson(), + command.getDittoHeaders() + .toBuilder() + .contentType(ContentType.APPLICATION_TD_JSON) + .build() + ) + ).thenApply(WithDittoHeaders.class::cast) ) - .map(DittoHeadersSettable.class::cast) - .orElseGet(() -> ExceptionFactory.featureNotFound(command.getEntityId(), featureId, - command.getDittoHeaders())); + .orElseGet(() -> CompletableFuture.completedStage(ExceptionFactory.featureNotFound(command.getEntityId(), featureId, + command.getDittoHeaders()) + )); } else { - return ExceptionFactory.featureNotFound(command.getEntityId(), featureId, - command.getDittoHeaders()); + return CompletableFuture.completedStage( + ExceptionFactory.featureNotFound(command.getEntityId(), featureId, + command.getDittoHeaders()) + ); } } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/RetrieveThingStrategy.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/RetrieveThingStrategy.java index d8e33c3244..e7f343082f 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/RetrieveThingStrategy.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/RetrieveThingStrategy.java @@ -14,6 +14,8 @@ import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; @@ -21,6 +23,7 @@ import org.eclipse.ditto.base.model.entity.metadata.Metadata; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeadersSettable; +import org.eclipse.ditto.base.model.headers.WithDittoHeaders; import org.eclipse.ditto.base.model.headers.contenttype.ContentType; import org.eclipse.ditto.base.model.headers.entitytag.EntityTag; import org.eclipse.ditto.base.model.signals.FeatureToggle; @@ -39,7 +42,6 @@ import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand; import org.eclipse.ditto.things.model.signals.events.ThingEvent; import org.eclipse.ditto.wot.integration.provider.WotThingDescriptionProvider; -import org.eclipse.ditto.wot.model.ThingDescription; import akka.actor.ActorSystem; @@ -122,18 +124,24 @@ private static JsonObject getThingJson(final Thing thing, final ThingQueryComman .orElseGet(() -> thing.toJson(command.getImplementedSchemaVersion())); } - private DittoHeadersSettable getRetrieveThingDescriptionResponse(@Nullable final Thing thing, + private CompletionStage getRetrieveThingDescriptionResponse(@Nullable final Thing thing, final RetrieveThing command) { if (thing != null) { - final ThingDescription wotThingDescription = wotThingDescriptionProvider + return wotThingDescriptionProvider .provideThingTD(thing.getDefinition().orElse(null), command.getEntityId(), thing, - command.getDittoHeaders()); - return RetrieveWotThingDescriptionResponse.of(command.getEntityId(), wotThingDescription.toJson(), - command.getDittoHeaders().toBuilder().contentType(ContentType.APPLICATION_TD_JSON).build()); + command.getDittoHeaders()) + .thenApply(wotThingDescription -> + RetrieveWotThingDescriptionResponse.of(command.getEntityId(), + wotThingDescription.toJson(), + command.getDittoHeaders().toBuilder() + .contentType(ContentType.APPLICATION_TD_JSON) + .build() + ) + ); } else { - return notAccessible(command); + return CompletableFuture.completedStage(notAccessible(command)); } } diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/AbstractCommandStrategyTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/AbstractCommandStrategyTest.java index b8945fa6c8..ca612d293c 100644 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/AbstractCommandStrategyTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/AbstractCommandStrategyTest.java @@ -128,9 +128,20 @@ protected static > void assertQueryResult( final Result> thingEventResult = applyStrategy(underTest, getDefaultContext(), thing, command); final ResultVisitor> mock = mock(Dummy.class); thingEventResult.accept(mock); - final ArgumentCaptor> captor = ArgumentCaptor.forClass(CommandResponse.class); - verify(mock).onQuery(any(), captor.capture()); - commandResponseAssertions.accept(captor.getValue()); + final ArgumentCaptor> responseStage = + ArgumentCaptor.forClass(CompletionStage.class); + verify(mock).onQuery(any(), responseStage.capture()); + + assertThat(responseStage.getValue()).isInstanceOf(CompletionStage.class); + CompletableFutureAssert.assertThatCompletionStage(responseStage.getValue()) + .isCompletedWithValueMatching(p -> { + try { + commandResponseAssertions.accept((CommandResponse) p); + return true; + } catch (final AssertionError ae) { + return false; + } + }); } @@ -170,7 +181,14 @@ private static > T assertModificationResult(fina private static void assertInfoResult(final Result> result, final WithDittoHeaders infoResponse) { final ResultVisitor> mock = mock(Dummy.class); result.accept(mock); - verify(mock).onQuery(any(), eq(infoResponse)); + final ArgumentCaptor> responseStage = + ArgumentCaptor.forClass(CompletionStage.class); + + verify(mock).onQuery(any(), responseStage.capture()); + + assertThat(responseStage.getValue()).isInstanceOf(CompletionStage.class); + CompletableFutureAssert.assertThatCompletionStage(responseStage.getValue()) + .isCompletedWithValue(infoResponse); } private static > Result> applyStrategy( diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ResultFactoryTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ResultFactoryTest.java index cbde9a335f..785930ac39 100644 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ResultFactoryTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ResultFactoryTest.java @@ -58,7 +58,14 @@ public void assertImmutability() { public void notifyQueryResponse() { final Result> result = ResultFactory.newQueryResult(thingQueryCommand, response); result.accept(mock); - verify(mock).onQuery(eq(thingQueryCommand), eq(response)); + final ArgumentCaptor> responseStage = + ArgumentCaptor.forClass(CompletionStage.class); + + verify(mock).onQuery(eq(thingQueryCommand), responseStage.capture()); + + assertThat(responseStage.getValue()).isInstanceOf(CompletionStage.class); + CompletableFutureAssert.assertThatCompletionStage(responseStage.getValue()) + .isCompletedWithValue(response); } @Test diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ThingConflictStrategyTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ThingConflictStrategyTest.java index 3d5173f37e..97f0a1d17b 100644 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ThingConflictStrategyTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/strategies/commands/ThingConflictStrategyTest.java @@ -104,7 +104,7 @@ public void onMutation(final Command command, final CompletionStage command, final WithDittoHeaders response) { + public void onQuery(final Command command, final CompletionStage response) { throw new AssertionError("Expect error, got query: " + response); } diff --git a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingDescriptionGenerator.java b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingDescriptionGenerator.java index f084c3cd57..9bc61a95fb 100644 --- a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingDescriptionGenerator.java +++ b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingDescriptionGenerator.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -135,7 +136,7 @@ final class DefaultWotThingDescriptionGenerator implements WotThingDescriptionGe } @Override - public ThingDescription generateThingDescription(final ThingId thingId, + public CompletionStage generateThingDescription(final ThingId thingId, @Nullable final Thing thing, @Nullable final JsonObject placeholderLookupObject, @Nullable final String featureId, @@ -144,46 +145,51 @@ public ThingDescription generateThingDescription(final ThingId thingId, final DittoHeaders dittoHeaders) { // generation rules defined at: https://w3c.github.io/wot-thing-description/#thing-model-td-generation + return thingModelExtensionResolver + .resolveThingModelExtensions(thingModel, dittoHeaders) + .thenCompose(thingModelWithExtensions -> + thingModelExtensionResolver.resolveThingModelRefs(thingModelWithExtensions, dittoHeaders) + ) + .thenApply(thingModelWithExtensionsAndImports -> { + LOGGER.withCorrelationId(dittoHeaders) + .debug("ThingModel after resolving extensions + refs: <{}>", + thingModelWithExtensionsAndImports); + + final ThingModel cleanedTm = + removeThingModelSpecificElements(thingModelWithExtensionsAndImports, dittoHeaders); + + final ThingDescription.Builder tdBuilder = ThingDescription.newBuilder(cleanedTm.toJson()); + addBase(tdBuilder, thingId, featureId); + addInstanceVersion(tdBuilder, cleanedTm.getVersion().orElse(null)); + addThingDescriptionLinks(tdBuilder, thingModelUrl, null != featureId, thingId); + convertThingDescriptionTmSubmodelLinksToItems(tdBuilder, dittoHeaders); + addThingDescriptionTemplateFromConfig(tdBuilder); + addThingDescriptionAdditionalMetadata(tdBuilder, thing); + if (null == featureId) { + addThingDescriptionForms(cleanedTm, tdBuilder, Thing.JsonFields.ATTRIBUTES.getPointer()); + } else { + addThingDescriptionForms(cleanedTm, tdBuilder, Feature.JsonFields.PROPERTIES.getPointer()); + } - final ThingModel thingModelWithExtensions = thingModelExtensionResolver - .resolveThingModelExtensions(thingModel, dittoHeaders); - final ThingModel thingModelWithExtensionsAndImports = thingModelExtensionResolver - .resolveThingModelRefs(thingModelWithExtensions, dittoHeaders); - - LOGGER.withCorrelationId(dittoHeaders) - .debug("ThingModel after resolving extensions + refs: <{}>", thingModelWithExtensionsAndImports); - - final ThingModel cleanedTm = removeThingModelSpecificElements(thingModelWithExtensionsAndImports, dittoHeaders); - - final ThingDescription.Builder tdBuilder = ThingDescription.newBuilder(cleanedTm.toJson()); - addBase(tdBuilder, thingId, featureId); - addInstanceVersion(tdBuilder, cleanedTm.getVersion().orElse(null)); - addThingDescriptionLinks(tdBuilder, thingModelUrl, null != featureId, thingId); - convertThingDescriptionTmSubmodelLinksToItems(tdBuilder, dittoHeaders); - addThingDescriptionTemplateFromConfig(tdBuilder); - addThingDescriptionAdditionalMetadata(tdBuilder, thing); - if (null == featureId) { - addThingDescriptionForms(cleanedTm, tdBuilder, Thing.JsonFields.ATTRIBUTES.getPointer()); - } else { - addThingDescriptionForms(cleanedTm, tdBuilder, Feature.JsonFields.PROPERTIES.getPointer()); - } - - tdBuilder.setUriVariables(provideUriVariables( - DittoHeaderDefinition.CHANNEL.getKey(), - DittoHeaderDefinition.TIMEOUT.getKey(), - DittoHeaderDefinition.RESPONSE_REQUIRED.getKey(), - DITTO_FIELDS_URI_VARIABLE - )); - tdBuilder.setSchemaDefinitions(SchemaDefinitions.of( - Map.of(SCHEMA_DITTO_ERROR, buildDittoErrorSchema()) - )); - - final ThingDescription thingDescription = resolvePlaceholders(tdBuilder.build(), placeholderLookupObject, - dittoHeaders); - LOGGER.withCorrelationId(dittoHeaders) - .info("Created ThingDescription for thingId <{}> and featureId <{}>: <{}>", thingId, featureId, - thingDescription); - return thingDescription; + tdBuilder.setUriVariables(provideUriVariables( + DittoHeaderDefinition.CHANNEL.getKey(), + DittoHeaderDefinition.TIMEOUT.getKey(), + DittoHeaderDefinition.RESPONSE_REQUIRED.getKey(), + DITTO_FIELDS_URI_VARIABLE + )); + tdBuilder.setSchemaDefinitions(SchemaDefinitions.of( + Map.of(SCHEMA_DITTO_ERROR, buildDittoErrorSchema()) + )); + + final ThingDescription thingDescription = + resolvePlaceholders(tdBuilder.build(), placeholderLookupObject, + dittoHeaders); + LOGGER.withCorrelationId(dittoHeaders) + .info("Created ThingDescription for thingId <{}> and featureId <{}>: <{}>", thingId, + featureId, + thingDescription); + return thingDescription; + }); } private ObjectSchema buildDittoErrorSchema() { diff --git a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingModelExtensionResolver.java b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingModelExtensionResolver.java index 39cf8a5bbc..6a52e95765 100644 --- a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingModelExtensionResolver.java +++ b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingModelExtensionResolver.java @@ -14,12 +14,11 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; -import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.json.JsonCollectors; @@ -41,9 +40,6 @@ final class DefaultWotThingModelExtensionResolver implements WotThingModelExtens private static final String TM_EXTENDS = "tm:extends"; private static final String TM_REF = "tm:ref"; - private static final Duration MAX_FETCH_MODEL_DURATION = Duration.ofSeconds(10); - private static final Duration MAX_JOIN_WAIT_DURATION = Duration.ofSeconds(20); - private final WotThingModelFetcher thingModelFetcher; private final Executor executor; @@ -54,61 +50,85 @@ final class DefaultWotThingModelExtensionResolver implements WotThingModelExtens } @Override - public ThingModel resolveThingModelExtensions(final ThingModel thingModel, final DittoHeaders dittoHeaders) { + public CompletionStage resolveThingModelExtensions(final ThingModel thingModel, + final DittoHeaders dittoHeaders) { final ThingModel.Builder tmBuilder = thingModel.toBuilder(); - thingModel.getLinks() + return thingModel.getLinks() .map(links -> { final List> fetchedModelFutures = links.stream() .filter(baseLink -> baseLink.getRel().filter(TM_EXTENDS::equals).isPresent()) .map(extendsLink -> thingModelFetcher.fetchThingModel(extendsLink.getHref(), dittoHeaders)) .map(CompletionStage::toCompletableFuture) - .map(cf -> cf.orTimeout(MAX_FETCH_MODEL_DURATION.toSeconds(), TimeUnit.SECONDS)) .toList(); - final CompletableFuture allModelFuture = + final CompletionStage allModelFuture = CompletableFuture.allOf(fetchedModelFutures.toArray(new CompletableFuture[0])); return allModelFuture .thenApplyAsync(aVoid -> fetchedModelFutures.stream() - .map(CompletableFuture::join) // joining does not block anything here as "allOf" already guaranteed that all futures are ready - .toList(), executor) - .orTimeout(MAX_JOIN_WAIT_DURATION.toSeconds(), TimeUnit.SECONDS) - .join(); + .map(CompletableFuture::join) // joining does not block anything here as "allOf" already guaranteed that all futures are ready + .toList(), executor + ); } ) - .ifPresent(extendedModels -> extendedModels.forEach(extendedModel -> { - final ThingModel extendedRecursedModel = - resolveThingModelExtensions(extendedModel, dittoHeaders); // recurse! + .map(extendedModelsFut -> extendedModelsFut.thenComposeAsync(extendedModels -> { + if (extendedModels.isEmpty()) { + return CompletableFuture.completedStage(thingModel); + } else { + CompletionStage currentStage = + resolveThingModelExtensions(extendedModels.get(0), dittoHeaders) // recurse! + .thenApply(extendedModel -> + mergeThingModelIntoBuilder().apply(tmBuilder, extendedModel) + ); + for (int i = 1; i < extendedModels.size(); i++) { + currentStage = currentStage.thenCombine( + resolveThingModelExtensions(extendedModels.get(i), dittoHeaders), // recurse! + mergeThingModelIntoBuilder() + ); + } + return currentStage.thenApply(ThingModel.Builder::build); + } + }, executor)) + .orElse(CompletableFuture.completedStage(thingModel)); + } - final JsonObject mergedTmObject = JsonFactory - .mergeJsonValues(tmBuilder.build(), extendedRecursedModel).asObject(); - tmBuilder.removeAll(); - tmBuilder.setAll(mergedTmObject); - })); - return tmBuilder.build(); + private BiFunction mergeThingModelIntoBuilder() { + return (builder, model) -> { + final JsonObject mergedTmObject = JsonFactory.mergeJsonValues(builder.build(), model).asObject(); + builder.removeAll(); + builder.setAll(mergedTmObject); + return builder; + }; } @Override - public ThingModel resolveThingModelRefs(final ThingModel thingModel, final DittoHeaders dittoHeaders) { - final JsonObject thingModelObject = potentiallyResolveRefs(thingModel, dittoHeaders); - return ThingModel.fromJson(thingModelObject); + public CompletionStage resolveThingModelRefs(final ThingModel thingModel, final DittoHeaders dittoHeaders) { + return potentiallyResolveRefs(thingModel, dittoHeaders).thenApply(ThingModel::fromJson); } - private JsonObject potentiallyResolveRefs(final JsonObject jsonObject, final DittoHeaders dittoHeaders) { - return jsonObject.stream() + private CompletionStage potentiallyResolveRefs(final JsonObject jsonObject, + final DittoHeaders dittoHeaders) { + final List> completionStages = jsonObject.stream() .map(field -> { if (field.getValue().isObject() && field.getValue().asObject().contains(TM_REF)) { - return JsonField.newInstance(field.getKey(), - resolveRefs(field.getValue().asObject(), dittoHeaders)); + return resolveRefs(field.getValue().asObject(), dittoHeaders) + .thenApply(refs -> JsonField.newInstance(field.getKey(), refs)); } else if (field.getValue().isObject()) { - return JsonField.newInstance(field.getKey(), - potentiallyResolveRefs(field.getValue().asObject(), dittoHeaders)); // recurse! + return potentiallyResolveRefs(field.getValue().asObject(), dittoHeaders) // recurse! + .thenApply(refs -> JsonField.newInstance(field.getKey(), refs)); } else { - return field; + return CompletableFuture.completedStage(field); } }) - .collect(JsonCollectors.fieldsToObject()); + .map(CompletionStage::toCompletableFuture) + .toList(); + return CompletableFuture.allOf(completionStages.toArray(CompletableFuture[]::new)) + .thenApplyAsync(v -> completionStages.stream() + .map(CompletableFuture::join) + .collect(JsonCollectors.fieldsToObject()), + executor + ); } - private JsonValue resolveRefs(final JsonObject objectWithTmRef, final DittoHeaders dittoHeaders) { + private CompletionStage resolveRefs(final JsonObject objectWithTmRef, final DittoHeaders dittoHeaders) { final String tmRef = objectWithTmRef.getValue(TM_REF) .filter(JsonValue::isString) .map(JsonValue::asString) @@ -118,20 +138,17 @@ private JsonValue resolveRefs(final JsonObject objectWithTmRef, final DittoHeade if (urlAndPointer.length != 2) { throw WotThingModelRefInvalidException.newBuilder(tmRef).dittoHeaders(dittoHeaders).build(); } - final JsonObject refObject = thingModelFetcher.fetchThingModel(IRI.of(urlAndPointer[0]), dittoHeaders) - .toCompletableFuture() - .orTimeout(MAX_FETCH_MODEL_DURATION.toSeconds(), TimeUnit.SECONDS) + return thingModelFetcher.fetchThingModel(IRI.of(urlAndPointer[0]), dittoHeaders) .thenApply(thingModel -> thingModel.getValue(JsonPointer.of(urlAndPointer[1]))) .thenComposeAsync(optJsonValue -> optJsonValue .filter(JsonValue::isObject) .map(JsonValue::asObject) .map(CompletableFuture::completedStage) .orElseGet(() -> CompletableFuture.completedStage(null)) - , executor) - .toCompletableFuture() - .orTimeout(MAX_JOIN_WAIT_DURATION.toSeconds(), TimeUnit.SECONDS) - .join(); - - return JsonFactory.mergeJsonValues(objectWithTmRef.remove(TM_REF), refObject).asObject(); + , executor + ) + .thenApply(refObject -> + JsonFactory.mergeJsonValues(objectWithTmRef.remove(TM_REF), refObject).asObject() + ); } } diff --git a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingSkeletonGenerator.java b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingSkeletonGenerator.java index 5c3e39dcac..33b1db0156 100644 --- a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingSkeletonGenerator.java +++ b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/DefaultWotThingSkeletonGenerator.java @@ -16,7 +16,6 @@ import java.net.MalformedURLException; import java.net.URL; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -24,8 +23,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -34,8 +33,8 @@ import javax.annotation.concurrent.Immutable; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger; import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; +import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger; import org.eclipse.ditto.json.JsonArray; import org.eclipse.ditto.json.JsonArrayBuilder; import org.eclipse.ditto.json.JsonObject; @@ -73,6 +72,7 @@ import org.eclipse.ditto.wot.model.WotThingModelInvalidException; import akka.actor.ActorSystem; +import akka.japi.Pair; /** * Default Ditto specific implementation of {@link WotThingSkeletonGenerator}. @@ -80,17 +80,14 @@ @Immutable final class DefaultWotThingSkeletonGenerator implements WotThingSkeletonGenerator { - private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(DefaultWotThingSkeletonGenerator.class); + private static final ThreadSafeDittoLogger LOGGER = + DittoLoggerFactory.getThreadSafeLogger(DefaultWotThingSkeletonGenerator.class); private static final String TM_EXTENDS = "tm:extends"; private static final String TM_SUBMODEL = "tm:submodel"; private static final String TM_SUBMODEL_INSTANCE_NAME = "instanceName"; - private static final Duration MAX_FETCH_MODEL_DURATION = Duration.ofSeconds(10); - private static final Duration MAX_JOIN_WAIT_DURATION = Duration.ofSeconds(20); - - private final WotThingModelFetcher thingModelFetcher; private final Executor executor; private final WotThingModelExtensionResolver thingModelExtensionResolver; @@ -102,55 +99,63 @@ final class DefaultWotThingSkeletonGenerator implements WotThingSkeletonGenerato } @Override - public Optional generateThingSkeleton(final ThingId thingId, + public CompletionStage> generateThingSkeleton(final ThingId thingId, final ThingModel thingModel, final URL thingModelUrl, final DittoHeaders dittoHeaders) { - final ThingModel thingModelWithExtensions = thingModelExtensionResolver - .resolveThingModelExtensions(thingModel, dittoHeaders); - final ThingModel thingModelWithExtensionsAndImports = thingModelExtensionResolver - .resolveThingModelRefs(thingModelWithExtensions, dittoHeaders); - - final Optional dittoExtensionPrefix = thingModelWithExtensionsAndImports.getAtContext() - .determinePrefixFor(DittoWotExtension.DITTO_WOT_EXTENSION); - - LOGGER.withCorrelationId(dittoHeaders) - .debug("ThingModel for generating Thing skeleton after resolving extensions + refs: <{}>", - thingModelWithExtensionsAndImports); - - final ThingBuilder.FromScratch builder = Thing.newBuilder(); - thingModelWithExtensionsAndImports.getProperties() - .map(properties -> { - final JsonObjectBuilder jsonObjectBuilder = JsonObject.newBuilder(); - final Map attributesCategories = new LinkedHashMap<>(); - - fillPropertiesInOptionalCategories( - properties, - thingModelWithExtensionsAndImports.getTmOptional().orElse(null), - jsonObjectBuilder, - attributesCategories, - property -> dittoExtensionPrefix.flatMap(prefix -> - property.getValue(prefix + ":" + DittoWotExtension.DITTO_WOT_EXTENSION_CATEGORY) + return thingModelExtensionResolver + .resolveThingModelExtensions(thingModel, dittoHeaders) + .thenCompose(thingModelWithExtensions -> + thingModelExtensionResolver.resolveThingModelRefs(thingModelWithExtensions, dittoHeaders) + ) + .thenApply(thingModelWithExtensionsAndImports -> { + final Optional dittoExtensionPrefix = thingModelWithExtensionsAndImports.getAtContext() + .determinePrefixFor(DittoWotExtension.DITTO_WOT_EXTENSION); + + LOGGER.withCorrelationId(dittoHeaders) + .debug("ThingModel for generating Thing skeleton after resolving extensions + refs: <{}>", + thingModelWithExtensionsAndImports); + + final ThingBuilder.FromScratch builder = Thing.newBuilder(); + thingModelWithExtensionsAndImports.getProperties() + .map(properties -> { + final JsonObjectBuilder jsonObjectBuilder = JsonObject.newBuilder(); + final Map attributesCategories = new LinkedHashMap<>(); + + fillPropertiesInOptionalCategories( + properties, + thingModelWithExtensionsAndImports.getTmOptional().orElse(null), + jsonObjectBuilder, + attributesCategories, + property -> dittoExtensionPrefix.flatMap(prefix -> + property.getValue(prefix + ":" + + DittoWotExtension.DITTO_WOT_EXTENSION_CATEGORY + ) + ) + .filter(JsonValue::isString) + .map(JsonValue::asString) + ); + + final AttributesBuilder attributesBuilder = Attributes.newBuilder(); + if (attributesCategories.size() > 0) { + attributesCategories.forEach((attributeCategory, categoryObjBuilder) -> + attributesBuilder.set(attributeCategory, categoryObjBuilder.build()) + ); + } + attributesBuilder.setAll(jsonObjectBuilder.build()); + return attributesBuilder.build(); + }).ifPresent(builder::setAttributes); + + return Pair.apply(thingModelWithExtensionsAndImports, builder); + }) + .thenCompose(pair -> + createFeaturesFromSubmodels(pair.first(), dittoHeaders) + .thenApply(features -> + features.map(f -> pair.second().setFeatures(f)).orElse(pair.second()) ) - .filter(JsonValue::isString) - .map(JsonValue::asString) - ); - - final AttributesBuilder attributesBuilder = Attributes.newBuilder(); - if (attributesCategories.size() > 0) { - attributesCategories.forEach((attributeCategory, categoryObjBuilder) -> - attributesBuilder.set(attributeCategory, categoryObjBuilder.build()) - ); - } - attributesBuilder.setAll(jsonObjectBuilder.build()); - return attributesBuilder.build(); - }).ifPresent(builder::setAttributes); - - createFeaturesFromSubmodels(thingModelWithExtensionsAndImports, dittoHeaders) - .ifPresent(builder::setFeatures); - - return Optional.of(builder.build()); + ) + .thenApply(builder -> Optional.of(builder.build())); } private static void fillPropertiesInOptionalCategories(final Properties properties, @@ -185,7 +190,7 @@ private static void fillPropertiesInOptionalCategories(final Properties properti ); } - private Optional createFeaturesFromSubmodels(final ThingModel thingModel, + private CompletionStage> createFeaturesFromSubmodels(final ThingModel thingModel, final DittoHeaders dittoHeaders) { final FeaturesBuilder featuresBuilder = Features.newBuilder(); @@ -211,9 +216,7 @@ private Optional createFeaturesFromSubmodels(final ThingModel thingMod ) .orElseGet(Stream::empty) .map(submodel -> thingModelFetcher.fetchThingModel(submodel.href, dittoHeaders) - .toCompletableFuture() - .orTimeout(MAX_FETCH_MODEL_DURATION.toSeconds(), TimeUnit.SECONDS) - .thenApplyAsync(subThingModel -> + .thenComposeAsync(subThingModel -> generateFeatureSkeleton(submodel.instanceName, subThingModel, submodel.href, @@ -223,26 +226,24 @@ private Optional createFeaturesFromSubmodels(final ThingModel thingMod ) .toList(); - final List features = CompletableFuture.allOf(futureList.toArray(CompletableFuture[]::new)) - .thenApplyAsync(v -> futureList.stream() - .map(CompletableFuture::join) - .filter(Optional::isPresent) - .map(Optional::get) - .toList(), + return CompletableFuture.allOf(futureList.toArray(CompletableFuture[]::new)) + .thenApplyAsync(v -> { + if (futureList.isEmpty()) { + return Optional.empty(); + } else { + featuresBuilder.setAll(futureList.stream() + .map(CompletableFuture::join) + .filter(Optional::isPresent) + .map(Optional::get) + .toList()); + return Optional.of(featuresBuilder.build()); + } + }, executor - ) - .orTimeout(MAX_JOIN_WAIT_DURATION.toSeconds(), TimeUnit.SECONDS) - .join(); - - if (features.isEmpty()) { - return Optional.empty(); - } else { - featuresBuilder.setAll(features); - return Optional.of(featuresBuilder.build()); - } + ); } - private Optional generateFeatureSkeleton(final String featureId, + private CompletionStage> generateFeatureSkeleton(final String featureId, final ThingModel thingModel, final IRI thingModelIri, final DittoHeaders dittoHeaders) { @@ -256,54 +257,61 @@ private Optional generateFeatureSkeleton(final String featureId, } @Override - public Optional generateFeatureSkeleton(final String featureId, + public CompletionStage> generateFeatureSkeleton(final String featureId, final ThingModel thingModel, final URL thingModelUrl, final DittoHeaders dittoHeaders) { - final ThingModel thingModelWithExtensions = thingModelExtensionResolver - .resolveThingModelExtensions(thingModel, dittoHeaders); - final ThingModel thingModelWithExtensionsAndImports = thingModelExtensionResolver - .resolveThingModelRefs(thingModelWithExtensions, dittoHeaders); - - final Optional dittoExtensionPrefix = thingModelWithExtensionsAndImports.getAtContext() - .determinePrefixFor(DittoWotExtension.DITTO_WOT_EXTENSION); - - LOGGER.withCorrelationId(dittoHeaders) - .debug("ThingModel for generating Feature skeleton after resolving extensions + refs: <{}>", - thingModelWithExtensionsAndImports); - - final FeatureBuilder.FromScratchBuildable builder = Feature.newBuilder(); - thingModelWithExtensionsAndImports.getProperties() - .map(properties -> { - final JsonObjectBuilder jsonObjectBuilder = JsonObject.newBuilder(); - final Map propertiesCategories = new LinkedHashMap<>(); - - fillPropertiesInOptionalCategories( - properties, - thingModelWithExtensionsAndImports.getTmOptional().orElse(null), - jsonObjectBuilder, - propertiesCategories, - property -> dittoExtensionPrefix.flatMap(prefix -> - property.getValue(prefix + ":" + DittoWotExtension.DITTO_WOT_EXTENSION_CATEGORY) - ) - .filter(JsonValue::isString) - .map(JsonValue::asString) - ); - - final FeaturePropertiesBuilder propertiesBuilder = FeatureProperties.newBuilder(); - if (propertiesCategories.size() > 0) { - propertiesCategories.forEach((propertyCategory, categoryObjBuilder) -> - propertiesBuilder.set(propertyCategory, categoryObjBuilder.build()) - ); - } - propertiesBuilder.setAll(jsonObjectBuilder.build()); - return propertiesBuilder.build(); - }).ifPresent(builder::properties); + return thingModelExtensionResolver + .resolveThingModelExtensions(thingModel, dittoHeaders) + .thenCompose(thingModelWithExtensions -> thingModelExtensionResolver + .resolveThingModelRefs(thingModelWithExtensions, dittoHeaders)) + .thenCombine(resolveFeatureDefinition(thingModel, thingModelUrl, dittoHeaders), + (thingModelWithExtensionsAndImports, featureDefinition) -> { + final Optional dittoExtensionPrefix = + thingModelWithExtensionsAndImports.getAtContext() + .determinePrefixFor(DittoWotExtension.DITTO_WOT_EXTENSION); + + LOGGER.withCorrelationId(dittoHeaders) + .debug("ThingModel for generating Feature skeleton after resolving extensions + refs: <{}>", + thingModelWithExtensionsAndImports); + + final FeatureBuilder.FromScratchBuildable builder = Feature.newBuilder(); + thingModelWithExtensionsAndImports.getProperties() + .map(properties -> { + final JsonObjectBuilder jsonObjectBuilder = JsonObject.newBuilder(); + final Map propertiesCategories = + new LinkedHashMap<>(); + + fillPropertiesInOptionalCategories( + properties, + thingModelWithExtensionsAndImports.getTmOptional().orElse(null), + jsonObjectBuilder, + propertiesCategories, + property -> dittoExtensionPrefix.flatMap(prefix -> + property.getValue( + prefix + ":" + + DittoWotExtension.DITTO_WOT_EXTENSION_CATEGORY) + ) + .filter(JsonValue::isString) + .map(JsonValue::asString) + ); + + final FeaturePropertiesBuilder propertiesBuilder = + FeatureProperties.newBuilder(); + if (propertiesCategories.size() > 0) { + propertiesCategories.forEach((propertyCategory, categoryObjBuilder) -> + propertiesBuilder.set(propertyCategory, categoryObjBuilder.build()) + ); + } + propertiesBuilder.setAll(jsonObjectBuilder.build()); + return propertiesBuilder.build(); + }).ifPresent(builder::properties); - builder.definition(resolveFeatureDefinition(thingModel, thingModelUrl, dittoHeaders)); + builder.definition(featureDefinition); - return Optional.of(builder.withId(featureId).build()); + return Optional.of(builder.withId(featureId).build()); + }); } private static Optional determineInitialPropertyValue(final SingleDataSchema dataSchema) { @@ -458,14 +466,17 @@ private static String provideNeutralStringElement(@Nullable final Integer minLen return ""; } - private FeatureDefinition resolveFeatureDefinition(final ThingModel thingModel, final URL thingModelUrl, + private CompletionStage resolveFeatureDefinition(final ThingModel thingModel, final URL thingModelUrl, final DittoHeaders dittoHeaders) { - return FeatureDefinition.fromIdentifier(thingModelUrl.toString(), - determineFurtherFeatureDefinitionIdentifiers(thingModel, dittoHeaders) - .toArray(new DefinitionIdentifier[]{})); + return determineFurtherFeatureDefinitionIdentifiers(thingModel, dittoHeaders) + .thenApply(definitionIdentifiers -> FeatureDefinition.fromIdentifier( + thingModelUrl.toString(), + definitionIdentifiers.toArray(DefinitionIdentifier[]::new) + )); } - private List determineFurtherFeatureDefinitionIdentifiers(final ThingModel thingModel, + private CompletionStage> determineFurtherFeatureDefinitionIdentifiers( + final ThingModel thingModel, final DittoHeaders dittoHeaders) { return thingModel.getLinks().map(links -> { final Optional> extendsLink = links.stream() @@ -474,26 +485,23 @@ private List determineFurtherFeatureDefinitionIdentifiers( if (extendsLink.isPresent()) { final BaseLink link = extendsLink.get(); - final List recursedSubmodels = - thingModelFetcher.fetchThingModel(link.getHref(), dittoHeaders) - .toCompletableFuture() - .orTimeout(MAX_FETCH_MODEL_DURATION.toSeconds(), TimeUnit.SECONDS) - .thenApplyAsync(subThingModel -> - determineFurtherFeatureDefinitionIdentifiers( // recurse! - subThingModel, - dittoHeaders - ), executor) - .toCompletableFuture() - .orTimeout(MAX_JOIN_WAIT_DURATION.toSeconds(), TimeUnit.SECONDS) - .join(); - final List combinedIdentifiers = new ArrayList<>(); - combinedIdentifiers.add(ThingsModelFactory.newFeatureDefinitionIdentifier(link.getHref())); - combinedIdentifiers.addAll(recursedSubmodels); - return combinedIdentifiers; + return thingModelFetcher.fetchThingModel(link.getHref(), dittoHeaders) + .thenComposeAsync(subThingModel -> + determineFurtherFeatureDefinitionIdentifiers( // recurse! + subThingModel, + dittoHeaders + ), executor + ) + .thenApply(recursedSubmodels -> { + final List combinedIdentifiers = new ArrayList<>(); + combinedIdentifiers.add(ThingsModelFactory.newFeatureDefinitionIdentifier(link.getHref())); + combinedIdentifiers.addAll(recursedSubmodels); + return combinedIdentifiers; + }); } else { - return Collections.emptyList(); + return CompletableFuture.completedStage(Collections.emptyList()); } - }).orElseGet(Collections::emptyList); + }).orElseGet(() -> CompletableFuture.completedStage(Collections.emptyList())); } private static class Submodel { diff --git a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingDescriptionGenerator.java b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingDescriptionGenerator.java index 799d565e5a..c40d8f1b57 100644 --- a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingDescriptionGenerator.java +++ b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingDescriptionGenerator.java @@ -13,6 +13,7 @@ package org.eclipse.ditto.wot.integration.generator; import java.net.URL; +import java.util.concurrent.CompletionStage; import javax.annotation.Nullable; @@ -54,7 +55,7 @@ public interface WotThingDescriptionGenerator { * @throws org.eclipse.ditto.wot.model.WotThingModelInvalidException if the WoT ThingModel did not contain the * mandatory {@code "@type"} being {@code "tm:ThingModel"} */ - ThingDescription generateThingDescription(ThingId thingId, + CompletionStage generateThingDescription(ThingId thingId, @Nullable Thing thing, @Nullable JsonObject placeholderLookupObject, @Nullable String featureId, diff --git a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingModelExtensionResolver.java b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingModelExtensionResolver.java index 251602bc63..1cd749befe 100644 --- a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingModelExtensionResolver.java +++ b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingModelExtensionResolver.java @@ -12,6 +12,8 @@ */ package org.eclipse.ditto.wot.integration.generator; +import java.util.concurrent.CompletionStage; + import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.wot.model.ThingModel; @@ -37,7 +39,7 @@ public interface WotThingModelExtensionResolver { * @throws org.eclipse.ditto.wot.model.WotThingModelInvalidException if the fetched extended ThingModel could not be * parsed/interpreted as correct WoT ThingModel. */ - ThingModel resolveThingModelExtensions(ThingModel thingModel, DittoHeaders dittoHeaders); + CompletionStage resolveThingModelExtensions(ThingModel thingModel, DittoHeaders dittoHeaders); /** * Resolves the "references" ({@code tm:ref}) contained in the passed {@code thingModel} and merges them into the @@ -53,5 +55,5 @@ public interface WotThingModelExtensionResolver { * @throws org.eclipse.ditto.wot.model.WotThingModelInvalidException if the fetched referenced ThingModel could not * be parsed/interpreted as correct WoT ThingModel. */ - ThingModel resolveThingModelRefs(ThingModel thingModel, DittoHeaders dittoHeaders); + CompletionStage resolveThingModelRefs(ThingModel thingModel, DittoHeaders dittoHeaders); } diff --git a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingSkeletonGenerator.java b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingSkeletonGenerator.java index c61ca23845..b458ddb63b 100644 --- a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingSkeletonGenerator.java +++ b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/generator/WotThingSkeletonGenerator.java @@ -14,6 +14,7 @@ import java.net.URL; import java.util.Optional; +import java.util.concurrent.CompletionStage; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.things.model.Feature; @@ -50,7 +51,7 @@ public interface WotThingSkeletonGenerator { * @throws org.eclipse.ditto.wot.model.WotThingModelInvalidException if the WoT ThingModel did not contain the * mandatory {@code "@type"} being {@code "tm:ThingModel"} */ - Optional generateThingSkeleton(ThingId thingId, + CompletionStage> generateThingSkeleton(ThingId thingId, ThingModel thingModel, URL thingModelUrl, DittoHeaders dittoHeaders); @@ -71,7 +72,7 @@ Optional generateThingSkeleton(ThingId thingId, * @throws org.eclipse.ditto.wot.model.WotThingModelInvalidException if the WoT ThingModel did not contain the * mandatory {@code "@type"} being {@code "tm:ThingModel"} */ - Optional generateFeatureSkeleton(String featureId, + CompletionStage> generateFeatureSkeleton(String featureId, ThingModel thingModel, URL thingModelUrl, DittoHeaders dittoHeaders); diff --git a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/DefaultWotThingDescriptionProvider.java b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/DefaultWotThingDescriptionProvider.java index 58b6a2a760..46e1158128 100644 --- a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/DefaultWotThingDescriptionProvider.java +++ b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/DefaultWotThingDescriptionProvider.java @@ -15,12 +15,10 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; import java.net.URL; -import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; @@ -61,8 +59,6 @@ final class DefaultWotThingDescriptionProvider implements WotThingDescriptionPro public static final String MODEL_PLACEHOLDERS_KEY = "model-placeholders"; - private static final Duration MAX_JOIN_WAIT_DURATION = Duration.ofSeconds(20); - private final WotConfig wotConfig; private final WotThingModelFetcher thingModelFetcher; private final WotThingDescriptionGenerator thingDescriptionGenerator; @@ -90,7 +86,7 @@ public static DefaultWotThingDescriptionProvider of(final ActorSystem actorSyste } @Override - public ThingDescription provideThingTD(@Nullable final ThingDefinition thingDefinition, + public CompletionStage provideThingTD(@Nullable final ThingDefinition thingDefinition, final ThingId thingId, @Nullable final Thing thing, final DittoHeaders dittoHeaders) { @@ -104,7 +100,7 @@ public ThingDescription provideThingTD(@Nullable final ThingDefinition thingDefi } @Override - public ThingDescription provideFeatureTD(final ThingId thingId, + public CompletionStage provideFeatureTD(final ThingId thingId, @Nullable final Thing thing, final Feature feature, final DittoHeaders dittoHeaders) { @@ -134,7 +130,7 @@ public CompletionStage> provideThingSkeletonForCreation(final Th logger.debug("Fetching ThingModel from <{}> in order to create Thing skeleton for new Thing " + "with id <{}>", url, thingId); return thingModelFetcher.fetchThingModel(url, dittoHeaders) - .thenApplyAsync(thingModel -> thingSkeletonGenerator + .thenComposeAsync(thingModel -> thingSkeletonGenerator .generateThingSkeleton(thingId, thingModel, url, dittoHeaders), executor ) @@ -144,7 +140,7 @@ public CompletionStage> provideThingSkeletonForCreation(final Th return thingSkeleton; }) .exceptionally(throwable -> { - logger.info("Could not fetch ThingModel or generate Feature skeleton based on it due " + + logger.info("Could not fetch ThingModel or generate Thing skeleton based on it due " + "to: <{}: {}>", throwable.getClass().getSimpleName(),throwable.getMessage(), throwable); return Optional.empty(); @@ -172,7 +168,7 @@ public CompletionStage> provideFeatureSkeletonForCreation(fina logger.debug("Fetching ThingModel from <{}> in order to create Feature skeleton for new Feature " + "with id <{}>", url, featureId); return thingModelFetcher.fetchThingModel(url, dittoHeaders) - .thenApplyAsync(thingModel -> thingSkeletonGenerator + .thenComposeAsync(thingModel -> thingSkeletonGenerator .generateFeatureSkeleton(featureId, thingModel, url, dittoHeaders), executor ) @@ -198,7 +194,7 @@ public CompletionStage> provideFeatureSkeletonForCreation(fina /** * Download TM, add it to local cache + build TD + return it */ - private ThingDescription getWotThingDescriptionForThing(final ThingDefinition definitionIdentifier, + private CompletionStage getWotThingDescriptionForThing(final ThingDefinition definitionIdentifier, final ThingId thingId, @Nullable final Thing thing, final DittoHeaders dittoHeaders) { @@ -206,35 +202,30 @@ private ThingDescription getWotThingDescriptionForThing(final ThingDefinition de final Optional urlOpt = definitionIdentifier.getUrl(); if (urlOpt.isPresent()) { final URL url = urlOpt.get(); - try { - return thingModelFetcher.fetchThingModel(url, dittoHeaders) - .thenApplyAsync(thingModel -> thingDescriptionGenerator - .generateThingDescription(thingId, - thing, - Optional.ofNullable(thing) - .flatMap(Thing::getAttributes) - .flatMap(a -> a.getValue(MODEL_PLACEHOLDERS_KEY)) - .filter(JsonValue::isObject) - .map(JsonValue::asObject) - .orElse(null), - null, - thingModel, - url, - dittoHeaders - ), - executor - ) - .toCompletableFuture() - .orTimeout(MAX_JOIN_WAIT_DURATION.toSeconds(), TimeUnit.SECONDS) - .join(); - } catch (final Exception e) { - throw DittoRuntimeException.asDittoRuntimeException(e, throwable -> - WotInternalErrorException.newBuilder() - .dittoHeaders(dittoHeaders) - .cause(e) - .build() - ); - } + return thingModelFetcher.fetchThingModel(url, dittoHeaders) + .thenComposeAsync(thingModel -> thingDescriptionGenerator + .generateThingDescription(thingId, + thing, + Optional.ofNullable(thing) + .flatMap(Thing::getAttributes) + .flatMap(a -> a.getValue(MODEL_PLACEHOLDERS_KEY)) + .filter(JsonValue::isObject) + .map(JsonValue::asObject) + .orElse(null), + null, + thingModel, + url, + dittoHeaders + ), + executor + ) + .exceptionally(throwable -> { + throw DittoRuntimeException.asDittoRuntimeException(throwable, t -> + WotInternalErrorException.newBuilder() + .dittoHeaders(dittoHeaders) + .cause(t) + .build()); + }); } else { throw ThingDefinitionInvalidException.newBuilder(definitionIdentifier) .dittoHeaders(dittoHeaders) @@ -245,7 +236,7 @@ private ThingDescription getWotThingDescriptionForThing(final ThingDefinition de /** * Download TM, add it to local cache + build TD + return it */ - private ThingDescription getWotThingDescriptionForFeature(final ThingId thingId, + private CompletionStage getWotThingDescriptionForFeature(final ThingId thingId, @Nullable final Thing thing, final Feature feature, final DittoHeaders dittoHeaders) { @@ -255,34 +246,29 @@ private ThingDescription getWotThingDescriptionForFeature(final ThingId thingId, final Optional urlOpt = definitionIdentifier.flatMap(DefinitionIdentifier::getUrl); if (urlOpt.isPresent()) { final URL url = urlOpt.get(); - try { - return thingModelFetcher.fetchThingModel(url, dittoHeaders) - .thenApplyAsync(thingModel -> thingDescriptionGenerator - .generateThingDescription(thingId, - thing, - feature.getProperties() - .flatMap(p -> p.getValue(MODEL_PLACEHOLDERS_KEY)) - .filter(JsonValue::isObject) - .map(JsonValue::asObject) - .orElse(null), - feature.getId(), - thingModel, - url, - dittoHeaders - ), - executor - ) - .toCompletableFuture() - .orTimeout(MAX_JOIN_WAIT_DURATION.toSeconds(), TimeUnit.SECONDS) - .join(); - } catch (final Exception e) { - throw DittoRuntimeException.asDittoRuntimeException(e, throwable -> - WotInternalErrorException.newBuilder() - .dittoHeaders(dittoHeaders) - .cause(e) - .build() - ); - } + return thingModelFetcher.fetchThingModel(url, dittoHeaders) + .thenComposeAsync(thingModel -> thingDescriptionGenerator + .generateThingDescription(thingId, + thing, + feature.getProperties() + .flatMap(p -> p.getValue(MODEL_PLACEHOLDERS_KEY)) + .filter(JsonValue::isObject) + .map(JsonValue::asObject) + .orElse(null), + feature.getId(), + thingModel, + url, + dittoHeaders + ), + executor + ) + .exceptionally(throwable -> { + throw DittoRuntimeException.asDittoRuntimeException(throwable, t -> + WotInternalErrorException.newBuilder() + .dittoHeaders(dittoHeaders) + .cause(t) + .build()); + }); } else { throw ThingDefinitionInvalidException.newBuilder(definitionIdentifier.orElse(null)) .dittoHeaders(dittoHeaders) diff --git a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/DefaultWotThingModelFetcher.java b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/DefaultWotThingModelFetcher.java index cce98ff243..f9c8406b38 100644 --- a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/DefaultWotThingModelFetcher.java +++ b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/DefaultWotThingModelFetcher.java @@ -15,10 +15,12 @@ import java.net.MalformedURLException; import java.net.URL; import java.text.MessageFormat; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -63,6 +65,8 @@ final class DefaultWotThingModelFetcher implements WotThingModelFetcher { private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(DefaultWotThingModelFetcher.class); + private static final Duration MAX_FETCH_MODEL_DURATION = Duration.ofSeconds(10); + private static final HttpHeader ACCEPT_HEADER = Accept.create( MediaRanges.create(MediaTypes.applicationWithOpenCharset("tm+json")), MediaRanges.create(MediaTypes.APPLICATION_JSON) @@ -98,7 +102,8 @@ public CompletableFuture fetchThingModel(final URL url, final DittoH LOGGER.withCorrelationId(dittoHeaders) .debug("Fetching ThingModel (from cache or downloading as fallback) from URL: <{}>", url); return thingModelCache.get(url) - .thenApply(optTm -> resolveThingModel(optTm.orElse(null), url, dittoHeaders)); + .thenApply(optTm -> resolveThingModel(optTm.orElse(null), url, dittoHeaders)) + .orTimeout(MAX_FETCH_MODEL_DURATION.toSeconds(), TimeUnit.SECONDS); } private ThingModel resolveThingModel(@Nullable final ThingModel thingModel, diff --git a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/WotThingDescriptionProvider.java b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/WotThingDescriptionProvider.java index 5191f6b9c2..72ca8c754b 100644 --- a/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/WotThingDescriptionProvider.java +++ b/wot/integration/src/main/java/org/eclipse/ditto/wot/integration/provider/WotThingDescriptionProvider.java @@ -53,7 +53,7 @@ public interface WotThingDescriptionProvider extends Extension { * @throws org.eclipse.ditto.wot.model.WotThingModelNotAccessibleException if the ThingModel could not be accessed/ * downloaded. */ - ThingDescription provideThingTD(@Nullable ThingDefinition thingDefinition, + CompletionStage provideThingTD(@Nullable ThingDefinition thingDefinition, ThingId thingId, @Nullable Thing thing, DittoHeaders dittoHeaders); @@ -72,7 +72,7 @@ ThingDescription provideThingTD(@Nullable ThingDefinition thingDefinition, * @throws org.eclipse.ditto.wot.model.WotThingModelNotAccessibleException if the ThingModel could not be accessed/ * downloaded. */ - ThingDescription provideFeatureTD(ThingId thingId, + CompletionStage provideFeatureTD(ThingId thingId, @Nullable Thing thing, Feature feature, DittoHeaders dittoHeaders);