Skip to content

Commit

Permalink
Optimize ThingEvent/ Signal casting in DittoCachingSignalEnrichmentFa…
Browse files Browse the repository at this point in the history
…cade

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Oct 25, 2021
1 parent a3e03cf commit 3da7994
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import java.util.List;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;

/**
Expand All @@ -33,6 +33,6 @@ public interface CachingSignalEnrichmentFacade extends SignalEnrichmentFacade{
* cache loading is forced.
* @return future of the retrieved thing.
*/
CompletionStage<JsonObject> retrieveThing(EntityId thingId,List<ThingEvent<?>> events, long minAcceptableSeqNr);
CompletionStage<JsonObject> retrieveThing(ThingId thingId, List<ThingEvent<?>> events, long minAcceptableSeqNr);

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static DittoCachingSignalEnrichmentFacade newInstance(final SignalEnrichm
}

@Override
public CompletionStage<JsonObject> retrieveThing(final EntityId thingId, final List<ThingEvent<?>> events,
public CompletionStage<JsonObject> retrieveThing(final ThingId thingId, final List<ThingEvent<?>> events,
final long minAcceptableSeqNr) {

if (minAcceptableSeqNr < 0) {
Expand All @@ -116,9 +116,38 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,
final DittoHeaders dittoHeaders,
@Nullable final Signal<?> concernedSignal) {

final List<ThingEvent<?>> thingEvents =
(concernedSignal instanceof ThingEvent) && !(ProtocolAdapter.isLiveSignal(concernedSignal)) ?
List.of((ThingEvent<?>) concernedSignal) : List.of();

// as second step only return what was originally requested as fields:
final List<Signal<?>> concernedSignals = concernedSignal == null ? List.of() : List.of(concernedSignal);
final var cachingParameters = new CachingParameters(jsonFieldSelector, concernedSignals, true, 0);
final var cachingParameters = new CachingParameters(jsonFieldSelector, thingEvents, true, 0);
return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters)
.thenApply(jsonObject -> jsonObject.get(jsonFieldSelector));
}

/**
* Retrieve parts of a thing.
*
* @param thingId ID of the thing.
* @param jsonFieldSelector the selected fields of the thing.
* @param dittoHeaders Ditto headers containing authorization information.
* @param concernedSignals the Signals which caused that this partial thing retrieval was triggered
* (e.g. a {@code ThingEvent})
* @return future that completes with the parts of a thing or fails with an error.
*/
public CompletionStage<JsonObject> retrievePartialThing(final EntityId thingId,
final JsonFieldSelector jsonFieldSelector,
final DittoHeaders dittoHeaders,
final Collection<Signal<?>> concernedSignals) {

final List<ThingEvent<?>> thingEvents = concernedSignals.stream()
.filter(signal -> (signal instanceof ThingEvent) && !(ProtocolAdapter.isLiveSignal(signal)))
.map(event -> (ThingEvent<?>) event)
.collect(Collectors.toList());

// as second step only return what was originally requested as fields:
final var cachingParameters = new CachingParameters(jsonFieldSelector, thingEvents, true, 0);
return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters)
.thenApply(jsonObject -> jsonObject.get(jsonFieldSelector));
}
Expand All @@ -133,7 +162,7 @@ private CompletionStage<JsonObject> doRetrievePartialThing(final EntityId thingI
CacheKey.of(thingId, CacheFactory.newCacheLookupContext(dittoHeaders, enhancedFieldSelector));

final var cachingParametersWithEnhancedFieldSelector = new CachingParameters(enhancedFieldSelector,
cachingParameters.concernedSignals,
cachingParameters.concernedEvents,
cachingParameters.invalidateCacheOnPolicyChange,
cachingParameters.minAcceptableSeqNr);

Expand All @@ -160,7 +189,7 @@ private CompletableFuture<JsonObject> smartUpdateCachedObject(final CacheKey idW
final CompletableFuture<JsonObject> result;

final var invalidateCacheOnPolicyChange = cachingParameters.invalidateCacheOnPolicyChange;
final var concernedSignals = cachingParameters.concernedSignals;
final var concernedSignals = cachingParameters.concernedEvents;
final var fieldSelector = cachingParameters.fieldSelector;

final Optional<List<ThingEvent<?>>> thingEventsOptional =
Expand Down Expand Up @@ -196,12 +225,7 @@ private CompletableFuture<JsonObject> smartUpdateCachedObject(final CacheKey idW
}

private static Optional<List<ThingEvent<?>>> extractConsecutiveTwinEvents(
final Collection<? extends Signal<?>> concernedSignals, final long minAcceptableSeqNr) {

final List<ThingEvent<?>> thingEvents = concernedSignals.stream()
.filter(signal -> (signal instanceof ThingEvent) && !(ProtocolAdapter.isLiveSignal(signal)))
.map(event -> (ThingEvent<?>) event)
.collect(Collectors.toList());
final List<ThingEvent<?>> thingEvents, final long minAcceptableSeqNr) {

// ignore events before ThingDeleted or ThingCreated
// not safe to ignore events before ThingModified because ThingModified has merge semantics at the top level
Expand Down Expand Up @@ -268,8 +292,7 @@ private CompletionStage<JsonObject> doSmartUpdateCachedObject(final CacheKey idW
final CompletionStage<JsonObject> result;

final long cachedRevision = cachedJsonObject.getValue(Thing.JsonFields.REVISION).orElse(0L);
final List<ThingEvent<?>> relevantEvents = cachingParameters.concernedSignals.stream()
.map(event -> (ThingEvent<?>) event)
final List<ThingEvent<?>> relevantEvents = cachingParameters.concernedEvents.stream()
.filter(e -> e.getRevision() > cachedRevision)
.collect(Collectors.toList());

Expand Down Expand Up @@ -306,7 +329,7 @@ private CompletionStage<JsonObject> handleNextExpectedThingEvents(
final CacheKey idWithResourceType, final JsonObject cachedJsonObject,
final CachingParameters cachingParameters) {

final var concernedSignals = (List<ThingEvent<?>>) cachingParameters.concernedSignals;
final var concernedSignals = cachingParameters.concernedEvents;
final var enhancedFieldSelector = cachingParameters.fieldSelector;
final Optional<String> cachedPolicyIdOpt = cachedJsonObject.getValue(Thing.JsonFields.POLICY_ID);
JsonObject jsonObject = cachedJsonObject;
Expand Down Expand Up @@ -403,17 +426,17 @@ private static JsonObject enhanceJsonObject(final JsonObject jsonObject, final L
private static final class CachingParameters {

@Nullable private final JsonFieldSelector fieldSelector;
private final List<? extends Signal<?>> concernedSignals;
private final List<ThingEvent<?>> concernedEvents;
private final boolean invalidateCacheOnPolicyChange;
private final long minAcceptableSeqNr;

private CachingParameters(@Nullable final JsonFieldSelector fieldSelector,
final List<? extends Signal<?>> concernedSignals,
final List<ThingEvent<?>> concernedEvents,
final boolean invalidateCacheOnPolicyChange,
final long minAcceptableSeqNr) {

this.fieldSelector = fieldSelector;
this.concernedSignals = concernedSignals;
this.concernedEvents = concernedEvents;
this.invalidateCacheOnPolicyChange = invalidateCacheOnPolicyChange;
this.minAcceptableSeqNr = minAcceptableSeqNr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.base.model.signals.Signal;

/**
* Asynchronous interface for retrieving things to enrich signals from and to those things,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ private JsonObject render() {
}

/**
* Get the most severe log level from evennts.
* Get the most severe log level from events.
*
* @return The most severe log level to report.
*/
Expand Down

0 comments on commit 3da7994

Please sign in to comment.