Skip to content

Commit

Permalink
#1893 fix ensuring the consistency when doing signal enrichment
Browse files Browse the repository at this point in the history
  • Loading branch information
thjaeckle committed Feb 16, 2024
1 parent 0d04168 commit f72c079
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 122 deletions.
Expand Up @@ -38,11 +38,11 @@ public interface CachingSignalEnrichmentFacade extends SignalEnrichmentFacade {
*
* @param thingId the thing to retrieve.
* @param events received thing events to reduce traffic. If there are no events, a fresh entry is retrieved.
* @param minAcceptableSeqNr the minimum sequence number acceptable as result. If negative,
* @param atRevisionNumber the revision/sequence number to retrieve the thing at. If negative,
* cache loading is forced.
* @return future of the retrieved thing.
*/
CompletionStage<JsonObject> retrieveThing(ThingId thingId, List<ThingEvent<?>> events, long minAcceptableSeqNr);
CompletionStage<JsonObject> retrieveThing(ThingId thingId, List<ThingEvent<?>> events, long atRevisionNumber);

default JsonObject applyJsonFieldSelector(final JsonObject jsonObject,
@Nullable final JsonFieldSelector fieldSelector) {
Expand Down
Expand Up @@ -14,18 +14,19 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithResource;
import org.eclipse.ditto.internal.utils.cache.Cache;
Expand Down Expand Up @@ -54,7 +55,8 @@
*/
public class DittoCachingSignalEnrichmentFacade implements CachingSignalEnrichmentFacade {

private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
private static final String CACHE_NAME_SUFFIX = "_signal_enrichment_cache";

protected final Cache<SignalEnrichmentCacheKey, JsonObject> extraFieldsCache;
Expand Down Expand Up @@ -93,25 +95,47 @@ public static DittoCachingSignalEnrichmentFacade newInstance(final SignalEnrichm

@Override
public CompletionStage<JsonObject> retrieveThing(final ThingId thingId, final List<ThingEvent<?>> events,
final long minAcceptableSeqNr) {
final long atRevisionNumber) {

final DittoHeaders dittoHeaders = DittoHeaders.empty();
final DittoHeaders dittoHeadersNotAddedToCacheKey =
buildDittoHeadersNotAddedToCacheKey(events, atRevisionNumber);

final JsonFieldSelector fieldSelector = determineSelector(thingId.getNamespace());

if (minAcceptableSeqNr < 0) {
final var cacheKey =
SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, fieldSelector));
if (atRevisionNumber < 0) {
final var cacheKey = SignalEnrichmentCacheKey.of(
thingId,
SignalEnrichmentContext.of(DittoHeaders.empty(), dittoHeadersNotAddedToCacheKey, fieldSelector));
extraFieldsCache.invalidate(cacheKey);
return doCacheLookup(cacheKey, dittoHeaders);
return doCacheLookup(cacheKey, dittoHeadersNotAddedToCacheKey);
} else {
final var cachingParameters =
new CachingParameters(fieldSelector, events, false, minAcceptableSeqNr);
new CachingParameters(fieldSelector, events, false, atRevisionNumber);

return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters);
return doRetrievePartialThing(thingId, DittoHeaders.empty(), dittoHeadersNotAddedToCacheKey,
cachingParameters);
}
}

private static DittoHeaders buildDittoHeadersNotAddedToCacheKey(final List<ThingEvent<?>> events,
final long atRevisionNumber) {
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder = DittoHeaders.newBuilder();
if (!events.isEmpty()) {
dittoHeadersBuilder.correlationId(
events.get(events.size() - 1)
.getDittoHeaders()
.getCorrelationId()
.orElseGet(() -> UUID.randomUUID().toString())
+ "-enrichment"
);
}
if (atRevisionNumber > 0) {
dittoHeadersBuilder
.putHeader(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(atRevisionNumber));
}
return dittoHeadersBuilder.build();
}

@Override
public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,
@Nullable final JsonFieldSelector jsonFieldSelector, final DittoHeaders dittoHeaders,
Expand All @@ -125,50 +149,22 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,
final var cachingParameters =
new CachingParameters(jsonFieldSelector, thingEvents, true, 0);

return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters)
.thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector));
}

/**
* Retrieve parts of a thing.
*
* @param thingId ID of the thing.
* @param jsonFieldSelector the selected fields of the thing.
* @param dittoHeaders Ditto headers containing authorization information.
* @param concernedSignals the Signals which caused that this partial thing retrieval was triggered
* (e.g. a {@code ThingEvent})
* @param minAcceptableSeqNr minimum sequence number of the concerned signals to not invalidate the cache.
* @return future that completes with the parts of a thing or fails with an error.
*/
@SuppressWarnings("java:S1612")
public CompletionStage<JsonObject> retrievePartialThing(final EntityId thingId,
final JsonFieldSelector jsonFieldSelector,
final DittoHeaders dittoHeaders,
final Collection<? extends Signal<?>> concernedSignals,
final long minAcceptableSeqNr) {

final List<ThingEvent<?>> thingEvents = concernedSignals.stream()
.filter(signal -> signal instanceof ThingEvent && !Signal.isChannelLive(signal))
.map(signal -> (ThingEvent<?>) signal)
.collect(Collectors.toList());

// as second step only return what was originally requested as fields:
final var cachingParameters =
new CachingParameters(jsonFieldSelector, thingEvents, true, minAcceptableSeqNr);

return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters)
return doRetrievePartialThing(thingId, dittoHeaders, null, cachingParameters)
.thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector));
}

protected CompletionStage<JsonObject> doRetrievePartialThing(final EntityId thingId,
final DittoHeaders dittoHeaders,
final CachingParameters cachingParameters) {
final DittoHeaders dittoHeaders,
@Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey,
final CachingParameters cachingParameters) {

final var fieldSelector = cachingParameters.fieldSelector;
final JsonFieldSelector enhancedFieldSelector = enhanceFieldSelectorWithRevision(fieldSelector);

final var idWithResourceType =
SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, enhancedFieldSelector));
final var idWithResourceType = SignalEnrichmentCacheKey.of(
thingId,
SignalEnrichmentContext.of(dittoHeaders, dittoHeadersNotAddedToCacheKey, enhancedFieldSelector)
);

final var cachingParametersWithEnhancedFieldSelector = new CachingParameters(enhancedFieldSelector,
cachingParameters.concernedEvents,
Expand Down Expand Up @@ -282,7 +278,7 @@ private static DittoHeaders getLastDittoHeaders(final List<? extends Signal<?>>
}

protected CompletableFuture<JsonObject> doCacheLookup(final SignalEnrichmentCacheKey cacheKey,
final DittoHeaders dittoHeaders) {
final DittoHeaders dittoHeaders) {
LOGGER.withCorrelationId(dittoHeaders).debug("Looking up cache entry for <{}>", cacheKey);

return extraFieldsCache.get(cacheKey)
Expand Down Expand Up @@ -343,16 +339,11 @@ private CompletionStage<JsonObject> handleNextExpectedThingEvents(final SignalEn
JsonObject jsonObject = cachedJsonObject;
for (final ThingEvent<?> thingEvent : concernedSignals) {

switch (thingEvent.getCommandCategory()) {
case MERGE:
jsonObject = getMergeJsonObject(jsonObject, thingEvent);
break;
case DELETE:
jsonObject = getDeleteJsonObject(jsonObject, thingEvent);
break;
default:
jsonObject = getDefaultJsonObject(jsonObject, thingEvent);
}
jsonObject = switch (thingEvent.getCommandCategory()) {
case MERGE -> getMergeJsonObject(jsonObject, thingEvent);
case DELETE -> getDeleteJsonObject(jsonObject, thingEvent);
default -> getDefaultJsonObject(jsonObject, thingEvent);
};
// invalidate cache on policy change if the flag is set
if (cachingParameters.invalidateCacheOnPolicyChange) {
final var optionalCompletionStage =
Expand Down Expand Up @@ -392,7 +383,8 @@ private static JsonObject getDeleteJsonObject(final JsonObject jsonObject, final
} else if (resourcePath.isEmpty()) {
result = JsonObject.empty();
} else {
result = jsonObject.remove(resourcePath).remove(Thing.JsonFields.METADATA.getPointer().append(resourcePath));
result =
jsonObject.remove(resourcePath).remove(Thing.JsonFields.METADATA.getPointer().append(resourcePath));
}

return result;
Expand Down Expand Up @@ -463,9 +455,9 @@ protected static final class CachingParameters {
private final long minAcceptableSeqNr;

public CachingParameters(@Nullable final JsonFieldSelector fieldSelector,
final List<ThingEvent<?>> concernedEvents,
final boolean invalidateCacheOnPolicyChange,
final long minAcceptableSeqNr) {
final List<ThingEvent<?>> concernedEvents,
final boolean invalidateCacheOnPolicyChange,
final long minAcceptableSeqNr) {

this.fieldSelector = fieldSelector;
this.concernedEvents = concernedEvents;
Expand Down
Expand Up @@ -27,7 +27,7 @@
* Implementation for a {@link CacheKey} used in scope of signal enrichment.
*/
@Immutable
final class SignalEnrichmentCacheKey implements CacheKey<SignalEnrichmentContext> {
public final class SignalEnrichmentCacheKey implements CacheKey<SignalEnrichmentContext> {

static final String DELIMITER = ":";

Expand Down Expand Up @@ -63,8 +63,7 @@ public Optional<SignalEnrichmentContext> getCacheLookupContext() {

@Override
public boolean equals(@Nullable final Object o) {
if (o instanceof SignalEnrichmentCacheKey) {
final var that = (SignalEnrichmentCacheKey) o;
if (o instanceof SignalEnrichmentCacheKey that) {
return isIdEqualValueBased(that) && Objects.equals(context, that.context);
} else {
return false;
Expand Down
Expand Up @@ -56,7 +56,10 @@ public CompletableFuture<JsonObject> asyncLoad(final SignalEnrichmentCacheKey ke
final ThingId thingId = ThingId.of(key.getId());
final JsonFieldSelector jsonFieldSelector = selectorOptional.orElse(null);
final DittoHeaders dittoHeaders = context.getDittoHeaders();
return facade.retrievePartialThing(thingId, jsonFieldSelector, dittoHeaders, null)
final DittoHeaders retrieveHeaders = context.getDittoHeadersNotAddedToCacheKey()
.map(extraHeaders -> (DittoHeaders) dittoHeaders.toBuilder().putHeaders(extraHeaders).build())
.orElse(dittoHeaders);
return facade.retrievePartialThing(thingId, jsonFieldSelector, retrieveHeaders, null)
.toCompletableFuture();
} else {
// no context; nothing to load.
Expand Down
Expand Up @@ -29,29 +29,36 @@
* signal enrichment caching.
*/
@Immutable
final class SignalEnrichmentContext implements CacheLookupContext {
public final class SignalEnrichmentContext implements CacheLookupContext {

private final DittoHeaders dittoHeaders;
@Nullable private final DittoHeaders dittoHeadersNotAddedToCacheKey;
@Nullable private final JsonFieldSelector jsonFieldSelector;

private SignalEnrichmentContext(final DittoHeaders dittoHeaders,
@Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey,
@Nullable final JsonFieldSelector jsonFieldSelector) {
this.dittoHeaders = checkNotNull(dittoHeaders, "dittoHeaders");
this.dittoHeadersNotAddedToCacheKey = dittoHeadersNotAddedToCacheKey;
this.jsonFieldSelector = jsonFieldSelector;
}

/**
* Creates a new SignalEnrichmentContext from the passed optional {@code dittoHeaders} and {@code jsonFieldSelector}
* retaining the for caching relevant {@code dittoHeaders} from the passed ones.
* Creates a new SignalEnrichmentContext from the passed optional {@code dittoHeaders},
* {@code dittoHeadersNotAddedToCacheKey} and {@code jsonFieldSelector}
* retaining the for caching relevant {@code dittoHeaders} from the passed ones, but not adding the
* {@code dittoHeadersNotAddedToCacheKey} to hashCode/equals, ignoring those for caching.
*
* @param dittoHeaders the DittoHeaders to use as key in the cache lookup context.
* @param dittoHeadersNotAddedToCacheKey the DittoHeaders to additionally use, but not include in the cache key.
* @param jsonFieldSelector the JsonFieldSelector to use in the cache lookup context.
* @return the created context.
*/
static SignalEnrichmentContext of(final DittoHeaders dittoHeaders,
@Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey,
@Nullable final JsonFieldSelector jsonFieldSelector) {

return new SignalEnrichmentContext(dittoHeaders, jsonFieldSelector);
return new SignalEnrichmentContext(dittoHeaders, dittoHeadersNotAddedToCacheKey, jsonFieldSelector);
}

/**
Expand All @@ -63,6 +70,14 @@ public DittoHeaders getDittoHeaders() {
return dittoHeaders;
}

/**
* @return the additional DittoHeaders to use, e.g. for looking up the cache entry, but which are not part of the
* cache key.
*/
public Optional<DittoHeaders> getDittoHeadersNotAddedToCacheKey() {
return Optional.ofNullable(dittoHeadersNotAddedToCacheKey);
}

/**
* Returns the optional JsonFieldSelector this context provides.
*
Expand Down Expand Up @@ -94,6 +109,7 @@ public int hashCode() {
public String toString() {
return getClass().getSimpleName() + " [" +
"dittoHeaders=" + dittoHeaders +
", dittoHeadersNotAddedToCacheKey=" + dittoHeadersNotAddedToCacheKey +
", jsonFieldSelector=" + jsonFieldSelector +
"]";
}
Expand Down

0 comments on commit f72c079

Please sign in to comment.