Skip to content

Commit

Permalink
Merge pull request #1658 from eclipse-ditto/bugfix/wot-dispatcher
Browse files Browse the repository at this point in the history
Fix WoT dispatcher starvation by adding timeouts to fetch models
  • Loading branch information
thjaeckle committed Jun 22, 2023
2 parents dd9683c + a6fc08e commit c0e8e0f
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 6 deletions.
4 changes: 4 additions & 0 deletions things/service/src/main/resources/things.conf
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ wot-dispatcher {
type = Dispatcher
executor = "org.eclipse.ditto.internal.utils.metrics.executor.InstrumentedThreadPoolExecutorServiceConfigurator"
}
wot-dispatcher-cache-loader {
type = Dispatcher
executor = "org.eclipse.ditto.internal.utils.metrics.executor.InstrumentedThreadPoolExecutorServiceConfigurator"
}

blocked-namespaces-dispatcher {
type = Dispatcher
Expand Down
4 changes: 4 additions & 0 deletions things/service/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ wot-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
}
wot-dispatcher-cache-loader {
type = Dispatcher
executor = "thread-pool-executor"
}

blocked-namespaces-dispatcher {
type = Dispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.json.JsonCollectors;
Expand All @@ -39,6 +41,8 @@ final class DefaultWotThingModelExtensionResolver implements WotThingModelExtens
private static final String TM_EXTENDS = "tm:extends";
private static final String TM_REF = "tm:ref";

private static final Duration MAX_FETCH_MODEL_DURATION = Duration.ofSeconds(30);

private final WotThingModelFetcher thingModelFetcher;
private final Executor executor;

Expand All @@ -57,12 +61,14 @@ public ThingModel resolveThingModelExtensions(final ThingModel thingModel, final
.filter(baseLink -> baseLink.getRel().filter(TM_EXTENDS::equals).isPresent())
.map(extendsLink -> thingModelFetcher.fetchThingModel(extendsLink.getHref(), dittoHeaders))
.map(CompletionStage::toCompletableFuture)
.map(cf -> cf.orTimeout(MAX_FETCH_MODEL_DURATION.toSeconds(), TimeUnit.SECONDS))
.toList();
final CompletableFuture<Void> allModelFuture =
CompletableFuture.allOf(fetchedModelFutures.toArray(new CompletableFuture[0]));
return allModelFuture.thenApplyAsync(aVoid -> fetchedModelFutures.stream()
.map(CompletableFuture::join) // joining does not block anything here as "allOf" already guaranteed that all futures are ready
.toList(), executor)
return allModelFuture
.thenApplyAsync(aVoid -> fetchedModelFutures.stream()
.map(CompletableFuture::join) // joining does not block anything here as "allOf" already guaranteed that all futures are ready
.toList(), executor)
.join();
}
)
Expand Down Expand Up @@ -111,13 +117,17 @@ private JsonValue resolveRefs(final JsonObject objectWithTmRef, final DittoHeade
throw WotThingModelRefInvalidException.newBuilder(tmRef).dittoHeaders(dittoHeaders).build();
}
final JsonObject refObject = thingModelFetcher.fetchThingModel(IRI.of(urlAndPointer[0]), dittoHeaders)
.toCompletableFuture()
.orTimeout(MAX_FETCH_MODEL_DURATION.toSeconds(), TimeUnit.SECONDS)
.thenApply(thingModel -> thingModel.getValue(JsonPointer.of(urlAndPointer[1])))
.thenComposeAsync(optJsonValue -> optJsonValue
.filter(JsonValue::isObject)
.map(JsonValue::asObject)
.map(CompletableFuture::completedStage)
.orElseGet(() -> CompletableFuture.completedStage(null))
, executor).toCompletableFuture().join();
, executor)
.toCompletableFuture()
.join();

return JsonFactory.mergeJsonValues(objectWithTmRef.remove(TM_REF), refObject).asObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
Expand All @@ -24,6 +25,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -85,6 +87,7 @@ final class DefaultWotThingSkeletonGenerator implements WotThingSkeletonGenerato
private static final String TM_SUBMODEL = "tm:submodel";
private static final String TM_SUBMODEL_INSTANCE_NAME = "instanceName";

private static final Duration MAX_FETCH_MODEL_DURATION = Duration.ofSeconds(30);


private final WotThingModelFetcher thingModelFetcher;
Expand Down Expand Up @@ -207,11 +210,14 @@ private Optional<Features> createFeaturesFromSubmodels(final ThingModel thingMod
)
.orElseGet(Stream::empty)
.map(submodel -> thingModelFetcher.fetchThingModel(submodel.href, dittoHeaders)
.toCompletableFuture()
.orTimeout(MAX_FETCH_MODEL_DURATION.toSeconds(), TimeUnit.SECONDS)
.thenApplyAsync(subThingModel ->
generateFeatureSkeleton(submodel.instanceName,
subThingModel,
submodel.href,
dittoHeaders), executor)
dittoHeaders
), executor)
.toCompletableFuture()
)
.toList();
Expand Down Expand Up @@ -466,6 +472,8 @@ private List<DefinitionIdentifier> determineFurtherFeatureDefinitionIdentifiers(
final BaseLink<?> link = extendsLink.get();
final List<DefinitionIdentifier> recursedSubmodels =
thingModelFetcher.fetchThingModel(link.getHref(), dittoHeaders)
.toCompletableFuture()
.orTimeout(MAX_FETCH_MODEL_DURATION.toSeconds(), TimeUnit.SECONDS)
.thenApplyAsync(subThingModel ->
determineFurtherFeatureDefinitionIdentifiers( // recurse!
subThingModel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ final class DefaultWotThingModelFetcher implements WotThingModelFetcher {
thingModelCache = CacheFactory.createCache(loader,
wotConfig.getCacheConfig(),
"ditto_wot_thing_model_cache",
actorSystem.dispatchers().lookup("wot-dispatcher"));
actorSystem.dispatchers().lookup("wot-dispatcher-cache-loader"));
}

@Override
Expand Down

0 comments on commit c0e8e0f

Please sign in to comment.