Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1893 fix ensuring the consistency when doing signal enrichment #1904

Merged
merged 2 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public interface CachingSignalEnrichmentFacade extends SignalEnrichmentFacade {
*
* @param thingId the thing to retrieve.
* @param events received thing events to reduce traffic. If there are no events, a fresh entry is retrieved.
* @param minAcceptableSeqNr the minimum sequence number acceptable as result. If negative,
* @param atRevisionNumber the revision/sequence number to retrieve the thing at. If negative,
* cache loading is forced.
* @return future of the retrieved thing.
*/
CompletionStage<JsonObject> retrieveThing(ThingId thingId, List<ThingEvent<?>> events, long minAcceptableSeqNr);
CompletionStage<JsonObject> retrieveThing(ThingId thingId, List<ThingEvent<?>> events, long atRevisionNumber);

default JsonObject applyJsonFieldSelector(final JsonObject jsonObject,
@Nullable final JsonFieldSelector fieldSelector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
Expand All @@ -25,7 +26,9 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithResource;
import org.eclipse.ditto.internal.utils.cache.Cache;
Expand Down Expand Up @@ -54,7 +57,8 @@
*/
public class DittoCachingSignalEnrichmentFacade implements CachingSignalEnrichmentFacade {

private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
private static final String CACHE_NAME_SUFFIX = "_signal_enrichment_cache";

protected final Cache<SignalEnrichmentCacheKey, JsonObject> extraFieldsCache;
Expand Down Expand Up @@ -93,23 +97,45 @@ public static DittoCachingSignalEnrichmentFacade newInstance(final SignalEnrichm

@Override
public CompletionStage<JsonObject> retrieveThing(final ThingId thingId, final List<ThingEvent<?>> events,
final long minAcceptableSeqNr) {
final long atRevisionNumber) {

final DittoHeaders dittoHeaders = DittoHeaders.empty();
final DittoHeaders dittoHeadersNotAddedToCacheKey =
buildDittoHeadersNotAddedToCacheKey(events, atRevisionNumber);

final JsonFieldSelector fieldSelector = determineSelector(thingId.getNamespace());

if (minAcceptableSeqNr < 0) {
final var cacheKey =
SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, fieldSelector));
if (atRevisionNumber < 0) {
final var cacheKey = SignalEnrichmentCacheKey.of(
thingId,
SignalEnrichmentContext.of(DittoHeaders.empty(), dittoHeadersNotAddedToCacheKey, fieldSelector));
extraFieldsCache.invalidate(cacheKey);
return doCacheLookup(cacheKey, dittoHeaders);
return doCacheLookup(cacheKey, dittoHeadersNotAddedToCacheKey);
} else {
final var cachingParameters =
new CachingParameters(fieldSelector, events, false, minAcceptableSeqNr);
new CachingParameters(fieldSelector, events, false, atRevisionNumber);

return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters);
return doRetrievePartialThing(thingId, DittoHeaders.empty(), dittoHeadersNotAddedToCacheKey,
cachingParameters);
}
}

private static DittoHeaders buildDittoHeadersNotAddedToCacheKey(final List<ThingEvent<?>> events,
final long atRevisionNumber) {
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder = DittoHeaders.newBuilder();
if (!events.isEmpty()) {
dittoHeadersBuilder.correlationId(
events.get(events.size() - 1)
.getDittoHeaders()
.getCorrelationId()
.orElseGet(() -> UUID.randomUUID().toString())
+ "-enrichment"
);
}
if (atRevisionNumber > 0) {
dittoHeadersBuilder
.putHeader(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(atRevisionNumber));
}
return dittoHeadersBuilder.build();
}

@Override
Expand All @@ -125,7 +151,7 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,
final var cachingParameters =
new CachingParameters(jsonFieldSelector, thingEvents, true, 0);

return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters)
return doRetrievePartialThing(thingId, dittoHeaders, null, cachingParameters)
.thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector));
}

Expand All @@ -140,7 +166,7 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,
* @param minAcceptableSeqNr minimum sequence number of the concerned signals to not invalidate the cache.
* @return future that completes with the parts of a thing or fails with an error.
*/
@SuppressWarnings("java:S1612")
@SuppressWarnings({"java:S1612", "unused"})
public CompletionStage<JsonObject> retrievePartialThing(final EntityId thingId,
final JsonFieldSelector jsonFieldSelector,
final DittoHeaders dittoHeaders,
Expand All @@ -156,19 +182,22 @@ public CompletionStage<JsonObject> retrievePartialThing(final EntityId thingId,
final var cachingParameters =
new CachingParameters(jsonFieldSelector, thingEvents, true, minAcceptableSeqNr);

return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters)
return doRetrievePartialThing(thingId, dittoHeaders, null, cachingParameters)
.thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector));
}

protected CompletionStage<JsonObject> doRetrievePartialThing(final EntityId thingId,
final DittoHeaders dittoHeaders,
final CachingParameters cachingParameters) {
final DittoHeaders dittoHeaders,
@Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey,
final CachingParameters cachingParameters) {

final var fieldSelector = cachingParameters.fieldSelector;
final JsonFieldSelector enhancedFieldSelector = enhanceFieldSelectorWithRevision(fieldSelector);

final var idWithResourceType =
SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, enhancedFieldSelector));
final var idWithResourceType = SignalEnrichmentCacheKey.of(
thingId,
SignalEnrichmentContext.of(dittoHeaders, dittoHeadersNotAddedToCacheKey, enhancedFieldSelector)
);

final var cachingParametersWithEnhancedFieldSelector = new CachingParameters(enhancedFieldSelector,
cachingParameters.concernedEvents,
Expand Down Expand Up @@ -282,7 +311,7 @@ private static DittoHeaders getLastDittoHeaders(final List<? extends Signal<?>>
}

protected CompletableFuture<JsonObject> doCacheLookup(final SignalEnrichmentCacheKey cacheKey,
final DittoHeaders dittoHeaders) {
final DittoHeaders dittoHeaders) {
LOGGER.withCorrelationId(dittoHeaders).debug("Looking up cache entry for <{}>", cacheKey);

return extraFieldsCache.get(cacheKey)
Expand Down Expand Up @@ -343,16 +372,11 @@ private CompletionStage<JsonObject> handleNextExpectedThingEvents(final SignalEn
JsonObject jsonObject = cachedJsonObject;
for (final ThingEvent<?> thingEvent : concernedSignals) {

switch (thingEvent.getCommandCategory()) {
case MERGE:
jsonObject = getMergeJsonObject(jsonObject, thingEvent);
break;
case DELETE:
jsonObject = getDeleteJsonObject(jsonObject, thingEvent);
break;
default:
jsonObject = getDefaultJsonObject(jsonObject, thingEvent);
}
jsonObject = switch (thingEvent.getCommandCategory()) {
case MERGE -> getMergeJsonObject(jsonObject, thingEvent);
case DELETE -> getDeleteJsonObject(jsonObject, thingEvent);
default -> getDefaultJsonObject(jsonObject, thingEvent);
};
// invalidate cache on policy change if the flag is set
if (cachingParameters.invalidateCacheOnPolicyChange) {
final var optionalCompletionStage =
Expand Down Expand Up @@ -392,7 +416,8 @@ private static JsonObject getDeleteJsonObject(final JsonObject jsonObject, final
} else if (resourcePath.isEmpty()) {
result = JsonObject.empty();
} else {
result = jsonObject.remove(resourcePath).remove(Thing.JsonFields.METADATA.getPointer().append(resourcePath));
result =
jsonObject.remove(resourcePath).remove(Thing.JsonFields.METADATA.getPointer().append(resourcePath));
}

return result;
Expand Down Expand Up @@ -463,9 +488,9 @@ protected static final class CachingParameters {
private final long minAcceptableSeqNr;

public CachingParameters(@Nullable final JsonFieldSelector fieldSelector,
final List<ThingEvent<?>> concernedEvents,
final boolean invalidateCacheOnPolicyChange,
final long minAcceptableSeqNr) {
final List<ThingEvent<?>> concernedEvents,
final boolean invalidateCacheOnPolicyChange,
final long minAcceptableSeqNr) {

this.fieldSelector = fieldSelector;
this.concernedEvents = concernedEvents;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* Implementation for a {@link CacheKey} used in scope of signal enrichment.
*/
@Immutable
final class SignalEnrichmentCacheKey implements CacheKey<SignalEnrichmentContext> {
public final class SignalEnrichmentCacheKey implements CacheKey<SignalEnrichmentContext> {

static final String DELIMITER = ":";

Expand Down Expand Up @@ -63,8 +63,7 @@ public Optional<SignalEnrichmentContext> getCacheLookupContext() {

@Override
public boolean equals(@Nullable final Object o) {
if (o instanceof SignalEnrichmentCacheKey) {
final var that = (SignalEnrichmentCacheKey) o;
if (o instanceof SignalEnrichmentCacheKey that) {
return isIdEqualValueBased(that) && Objects.equals(context, that.context);
} else {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public CompletableFuture<JsonObject> asyncLoad(final SignalEnrichmentCacheKey ke
final ThingId thingId = ThingId.of(key.getId());
final JsonFieldSelector jsonFieldSelector = selectorOptional.orElse(null);
final DittoHeaders dittoHeaders = context.getDittoHeaders();
return facade.retrievePartialThing(thingId, jsonFieldSelector, dittoHeaders, null)
final DittoHeaders retrieveHeaders = context.getDittoHeadersNotAddedToCacheKey()
.map(extraHeaders -> (DittoHeaders) dittoHeaders.toBuilder().putHeaders(extraHeaders).build())
.orElse(dittoHeaders);
return facade.retrievePartialThing(thingId, jsonFieldSelector, retrieveHeaders, null)
.toCompletableFuture();
} else {
// no context; nothing to load.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,36 @@
* signal enrichment caching.
*/
@Immutable
final class SignalEnrichmentContext implements CacheLookupContext {
public final class SignalEnrichmentContext implements CacheLookupContext {

private final DittoHeaders dittoHeaders;
@Nullable private final DittoHeaders dittoHeadersNotAddedToCacheKey;
@Nullable private final JsonFieldSelector jsonFieldSelector;

private SignalEnrichmentContext(final DittoHeaders dittoHeaders,
@Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey,
@Nullable final JsonFieldSelector jsonFieldSelector) {
this.dittoHeaders = checkNotNull(dittoHeaders, "dittoHeaders");
this.dittoHeadersNotAddedToCacheKey = dittoHeadersNotAddedToCacheKey;
this.jsonFieldSelector = jsonFieldSelector;
}

/**
* Creates a new SignalEnrichmentContext from the passed optional {@code dittoHeaders} and {@code jsonFieldSelector}
* retaining the for caching relevant {@code dittoHeaders} from the passed ones.
* Creates a new SignalEnrichmentContext from the passed optional {@code dittoHeaders},
* {@code dittoHeadersNotAddedToCacheKey} and {@code jsonFieldSelector}
* retaining the for caching relevant {@code dittoHeaders} from the passed ones, but not adding the
* {@code dittoHeadersNotAddedToCacheKey} to hashCode/equals, ignoring those for caching.
*
* @param dittoHeaders the DittoHeaders to use as key in the cache lookup context.
* @param dittoHeadersNotAddedToCacheKey the DittoHeaders to additionally use, but not include in the cache key.
* @param jsonFieldSelector the JsonFieldSelector to use in the cache lookup context.
* @return the created context.
*/
static SignalEnrichmentContext of(final DittoHeaders dittoHeaders,
@Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey,
@Nullable final JsonFieldSelector jsonFieldSelector) {

return new SignalEnrichmentContext(dittoHeaders, jsonFieldSelector);
return new SignalEnrichmentContext(dittoHeaders, dittoHeadersNotAddedToCacheKey, jsonFieldSelector);
}

/**
Expand All @@ -63,6 +70,14 @@ public DittoHeaders getDittoHeaders() {
return dittoHeaders;
}

/**
* @return the additional DittoHeaders to use, e.g. for looking up the cache entry, but which are not part of the
* cache key.
*/
public Optional<DittoHeaders> getDittoHeadersNotAddedToCacheKey() {
return Optional.ofNullable(dittoHeadersNotAddedToCacheKey);
}

/**
* Returns the optional JsonFieldSelector this context provides.
*
Expand Down Expand Up @@ -94,6 +109,7 @@ public int hashCode() {
public String toString() {
return getClass().getSimpleName() + " [" +
"dittoHeaders=" + dittoHeaders +
", dittoHeadersNotAddedToCacheKey=" + dittoHeadersNotAddedToCacheKey +
", jsonFieldSelector=" + jsonFieldSelector +
"]";
}
Expand Down