Skip to content

Commit

Permalink
Use CachingSignalEnrichmentFacade for search index update.
Browse files Browse the repository at this point in the history
- Removed priority for control messages in ChanngeQueueActor in
  order to aggregate updates better.

- Remove deprecated method use in BulkWriteResultAckFlowTest.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 21, 2021
1 parent 75ae753 commit e50ed57
Show file tree
Hide file tree
Showing 11 changed files with 563 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,

// as second step only return what was originally requested as fields:
final List<Signal<?>> concernedSignals = concernedSignal == null ? List.of() : List.of(concernedSignal);
return doRetrievePartialThing(thingId, jsonFieldSelector, dittoHeaders, concernedSignals, true)
return doRetrievePartialThing(thingId, jsonFieldSelector, dittoHeaders, concernedSignals, true, 0)
.thenApply(jsonObject -> jsonObject.get(jsonFieldSelector));
}

Expand All @@ -102,24 +102,28 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,
*
* @param thingId the thing to retrieve.
* @param events received thing events to reduce traffic. If there are no events, a fresh entry is retrieved.
* @param minAcceptableSeqNr the minimum sequence number acceptable as result. If negative,
* cache loading is forced.
* @return future of the retrieved thing.
*/
public CompletionStage<JsonObject> retrieveThing(final ThingId thingId, final List<ThingEvent<?>> events) {
if (events.isEmpty()) {
public CompletionStage<JsonObject> retrieveThing(final ThingId thingId, final List<ThingEvent<?>> events,
final long minAcceptableSeqNr) {
if (minAcceptableSeqNr < 0) {
final var cacheKey =
CacheKey.of(thingId, CacheFactory.newCacheLookupContext(DittoHeaders.empty(), null));
extraFieldsCache.invalidate(cacheKey);
return doCacheLookup(cacheKey, DittoHeaders.empty());
} else {
return doRetrievePartialThing(thingId, null, DittoHeaders.empty(), events, false);
return doRetrievePartialThing(thingId, null, DittoHeaders.empty(), events, false, minAcceptableSeqNr);
}
}

private CompletionStage<JsonObject> doRetrievePartialThing(final ThingId thingId,
@Nullable final JsonFieldSelector jsonFieldSelector,
final DittoHeaders dittoHeaders,
final List<? extends Signal<?>> concernedSignals,
final boolean invalidateCacheOnPolicyChange) {
final boolean invalidateCacheOnPolicyChange,
final long minAcceptableSeqNr) {

final JsonFieldSelector enhancedFieldSelector;
if (jsonFieldSelector == null) {
Expand All @@ -135,7 +139,7 @@ private CompletionStage<JsonObject> doRetrievePartialThing(final ThingId thingId
CacheKey.of(thingId, CacheFactory.newCacheLookupContext(dittoHeaders, enhancedFieldSelector));

return smartUpdateCachedObject(enhancedFieldSelector, idWithResourceType, concernedSignals,
invalidateCacheOnPolicyChange);
invalidateCacheOnPolicyChange, minAcceptableSeqNr);
}

private Optional<Integer> findLastThingDeletedOrCreated(final List<ThingEvent<?>> thingEvents) {
Expand All @@ -149,19 +153,25 @@ private Optional<Integer> findLastThingDeletedOrCreated(final List<ThingEvent<?>
}

private Optional<List<ThingEvent<?>>> extractConsecutiveTwinEvents(
final List<? extends Signal<?>> concernedSignals) {
final List<? extends Signal<?>> concernedSignals, final long minAcceptableSeqNr) {
final List<ThingEvent<?>> thingEvents = concernedSignals.stream()
.filter(signal -> (signal instanceof ThingEvent) && !(ProtocolAdapter.isLiveSignal(signal)))
.map(signal -> (ThingEvent<?>) signal)
.collect(Collectors.toList());

// ignore events before ThingDeleted or ThingCreated
// not safe to ignore events before ThingModified because ThingModified has merge semantics at the top level
final var events = findLastThingDeletedOrCreated(thingEvents)
.map(i -> thingEvents.subList(i, thingEvents.size()))
.orElse(thingEvents);

// Check if minimum acceptable sequence number is met
if (minAcceptableSeqNr >= 0 && (events.isEmpty() || getLast(events).getRevision() < minAcceptableSeqNr)) {
return Optional.empty();
}

// Validate sequence numbers. Discard if events have gaps
if (!events.isEmpty()) {
// Validate sequence numbers. Discard
long lastSeq = -1;
for (final ThingEvent<?> event : events) {
if (lastSeq >= 0 && event.getRevision() != lastSeq + 1) {
Expand All @@ -171,6 +181,7 @@ private Optional<List<ThingEvent<?>>> extractConsecutiveTwinEvents(
}
}
}

return Optional.of(events);
}

Expand All @@ -187,19 +198,22 @@ private CompletableFuture<JsonObject> smartUpdateCachedObject(
@Nullable final JsonFieldSelector enhancedFieldSelector,
final CacheKey idWithResourceType,
final List<? extends Signal<?>> concernedSignals,
final boolean invalidateCacheOnPolicyChange) {
final boolean invalidateCacheOnPolicyChange,
final long minAcceptableSeqNr) {

final Optional<List<ThingEvent<?>>> thingEventsOptional = extractConsecutiveTwinEvents(concernedSignals);
final Optional<List<ThingEvent<?>>> thingEventsOptional =
extractConsecutiveTwinEvents(concernedSignals, minAcceptableSeqNr);
final var dittoHeaders = getLastDittoHeaders(concernedSignals);

// there are twin events, but their sequence numbers have gaps
// there are twin events, but their sequence numbers have gaps or do not reach the min acceptable seq nr
if (thingEventsOptional.isEmpty()) {
extraFieldsCache.invalidate(idWithResourceType);
return doCacheLookup(idWithResourceType, dittoHeaders);
}

// there are no twin event; return the cached thing
final var thingEvents = thingEventsOptional.orElseThrow();

// there are no twin event; return the cached thing
if (thingEvents.isEmpty()) {
return doCacheLookup(idWithResourceType, dittoHeaders);
}
Expand All @@ -214,14 +228,16 @@ private CompletableFuture<JsonObject> smartUpdateCachedObject(
// there are twin events; perform smart update
return doCacheLookup(idWithResourceType, dittoHeaders).thenCompose(cachedJsonObject -> {
final long cachedRevision = cachedJsonObject.getValue(Thing.JsonFields.REVISION).orElse(0L);
final long lastRevision = getLast(thingEvents).getRevision();
if (cachedRevision >= lastRevision) {
final var relevantEvents = thingEvents.stream()
.filter(e -> e.getRevision() > cachedRevision)
.collect(Collectors.toList());
if (relevantEvents.isEmpty()) {
// the cache entry was more up-to-date
return CompletableFuture.completedFuture(cachedJsonObject);
} else if (cachedRevision + 1 == getFirst(thingEvents).getRevision()) {
} else if (cachedRevision + 1 == getFirst(relevantEvents).getRevision()) {
// the cache entry was already present and the first thingEvent was the next expected revision no
// -> we have all information necessary to calculate it without making another roundtrip
return handleNextExpectedThingEvents(enhancedFieldSelector, idWithResourceType, thingEvents,
return handleNextExpectedThingEvents(enhancedFieldSelector, idWithResourceType, relevantEvents,
cachedJsonObject, invalidateCacheOnPolicyChange);
} else {
// the cache entry was already present, but we missed sth and need to invalidate the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public CompletableFuture<JsonObject> asyncLoad(final CacheKey key, final Executo
final Optional<CacheLookupContext> contextOptional = key.getCacheLookupContext();
final Optional<JsonFieldSelector> selectorOptional =
contextOptional.flatMap(CacheLookupContext::getJsonFieldSelector);
if (contextOptional.isPresent() && selectorOptional.isPresent()) {
if (contextOptional.isPresent()) {
final CacheLookupContext context = contextOptional.get();
final ThingId thingId = ThingId.of(key.getId());
final JsonFieldSelector jsonFieldSelector = selectorOptional.get();
final JsonFieldSelector jsonFieldSelector = selectorOptional.orElse(null);
final DittoHeaders dittoHeaders = context.getDittoHeaders().orElseGet(DittoHeaders::empty);
return facade.retrievePartialThing(thingId, jsonFieldSelector, dittoHeaders, null)
.toCompletableFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.thingsearch.service.persistence.write.mapping;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
Expand Down Expand Up @@ -104,8 +105,8 @@ public static ThingWriteModel toWriteModel(final JsonObject thing,
final var nullablePolicyId = thing.getValue(Thing.JsonFields.POLICY_ID).map(PolicyId::of).orElse(null);
final var metadata = Metadata.of(thingId, thingRevision, nullablePolicyId, policyRevision,
Optional.ofNullable(oldMetadata).flatMap(Metadata::getModified).orElse(null),
Optional.ofNullable(oldMetadata).map(Metadata::getTimers).orElse(Collections.emptyList()),
Optional.ofNullable(oldMetadata).map(Metadata::getSenders).orElse(Collections.emptyList()));
Optional.ofNullable(oldMetadata).map(Metadata::getTimers).orElse(List.of()),
Optional.ofNullable(oldMetadata).map(Metadata::getSenders).orElse(List.of()));

// hierarchical values for sorting
final BsonValue thingCopyForSorting = JsonToBson.convert(pruneArrays(thing, maxArraySize));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;

import akka.actor.ActorRef;

Expand All @@ -43,6 +44,7 @@ public final class Metadata {
@Nullable private final PolicyId policyId;
@Nullable private final Long policyRevision;
@Nullable final Instant modified;
private final List<ThingEvent<?>> events;
private final List<StartedTimer> timers;
private final List<ActorRef> senders;
private final boolean invalidateCache;
Expand All @@ -52,6 +54,7 @@ private Metadata(final ThingId thingId,
@Nullable final PolicyId policyId,
@Nullable final Long policyRevision,
@Nullable final Instant modified,
final List<ThingEvent<?>> events,
final Collection<StartedTimer> timers,
final Collection<ActorRef> senders,
final boolean invalidateCache) {
Expand All @@ -61,6 +64,7 @@ private Metadata(final ThingId thingId,
this.policyId = policyId;
this.policyRevision = policyRevision;
this.modified = modified;
this.events = events;
this.timers = List.copyOf(timers);
this.senders = List.copyOf(senders);
this.invalidateCache = invalidateCache;
Expand All @@ -83,7 +87,7 @@ public static Metadata of(final ThingId thingId,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, null,
null != timer ? List.of(timer) : List.of(), List.of(), false);
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false);
}

/**
Expand All @@ -101,11 +105,13 @@ public static Metadata of(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyId policyId,
@Nullable final Long policyRevision,
final List<ThingEvent<?>> events,
@Nullable final StartedTimer timer,
final ActorRef sender) {
@Nullable final ActorRef sender) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, null,
null != timer ? List.of(timer) : List.of(), List.of(sender), false);
return new Metadata(thingId, thingRevision, policyId, policyRevision, null, events,
null != timer ? List.of(timer) : List.of(),
null != sender ? List.of(sender) : List.of(), false);
}

/**
Expand All @@ -128,7 +134,7 @@ public static Metadata of(final ThingId thingId,
final Collection<StartedTimer> timers,
final Collection<ActorRef> senders) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, timers, senders,
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, List.of(), timers, senders,
false);
}

Expand All @@ -151,7 +157,7 @@ public static Metadata of(final ThingId thingId,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, modified,
null != timer ? List.of(timer) : List.of(), List.of(), false);
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false);
}

/**
Expand All @@ -173,7 +179,7 @@ public static Metadata fromResponse(final UpdateThingResponse updateThingRespons
* @return the copy.
*/
public Metadata invalidateCache() {
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, timers, senders, true);
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders, true);
}

/**
Expand Down Expand Up @@ -231,6 +237,15 @@ public Optional<Instant> getModified() {
return Optional.ofNullable(modified);
}

/**
* Returns the known thing events.
*
* @return the known thing events.
*/
public List<ThingEvent<?>> getEvents() {
return events;
}

/**
* Returns the timers measuring the consistency lag.
*
Expand Down Expand Up @@ -273,13 +288,15 @@ public boolean shouldInvalidateCache() {
* @param newMetadata a previous metadata record.
* @return the new metadata with concatenated senders.
*/
public Metadata prependTimersAndSenders(final Metadata newMetadata) {
public Metadata append(final Metadata newMetadata) {
final List<ThingEvent<?>> newEvents =
Stream.concat(events.stream(), newMetadata.events.stream()).collect(Collectors.toList());
final List<StartedTimer> newTimers =
Stream.concat(newMetadata.timers.stream(), timers.stream()).collect(Collectors.toList());
Stream.concat(timers.stream(), newMetadata.timers.stream()).collect(Collectors.toList());
final List<ActorRef> newSenders =
Stream.concat(newMetadata.senders.stream(), senders.stream()).collect(Collectors.toList());
Stream.concat(senders.stream(), newMetadata.senders.stream()).collect(Collectors.toList());
return new Metadata(newMetadata.thingId, newMetadata.thingRevision, newMetadata.policyId,
newMetadata.policyRevision, newMetadata.modified, newTimers, newSenders,
newMetadata.policyRevision, newMetadata.modified, newEvents, newTimers, newSenders,
invalidateCache || newMetadata.invalidateCache);
}

Expand Down Expand Up @@ -322,14 +339,15 @@ public boolean equals(final Object o) {
Objects.equals(thingId, that.thingId) &&
Objects.equals(policyId, that.policyId) &&
Objects.equals(modified, that.modified) &&
Objects.equals(events, that.events) &&
Objects.equals(timers, that.timers) &&
Objects.equals(senders, that.senders) &&
invalidateCache == that.invalidateCache;
}

@Override
public int hashCode() {
return Objects.hash(thingId, thingRevision, policyId, policyRevision, modified, timers, senders,
return Objects.hash(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders,
invalidateCache);
}

Expand All @@ -341,6 +359,7 @@ public String toString() {
", policyId=" + policyId +
", policyRevision=" + policyRevision +
", modified=" + modified +
", events=" + events +
", timers=" + timers +
", senders=" + senders +
", invalidateCache=" + invalidateCache +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.ControlMessage;
import akka.dispatch.RequiresMessageQueue;
import akka.dispatch.UnboundedControlAwareMessageQueueSemantics;
import akka.japi.function.Function;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
Expand All @@ -35,8 +32,7 @@
/**
* Collects changes from ThingUpdaters and forward them downstream on demand.
*/
public final class ChangeQueueActor extends AbstractActor
implements RequiresMessageQueue<UnboundedControlAwareMessageQueueSemantics> {
public final class ChangeQueueActor extends AbstractActor {

/**
* Name of this actor.
Expand Down Expand Up @@ -81,10 +77,10 @@ public Receive createReceive() {
private void enqueue(final Metadata metadata) {
if (metadata.getSenders().isEmpty()) {
ConsistencyLag.startS1InChangeQueue(metadata);
cache.merge(metadata.getThingId(), metadata, Metadata::prependTimersAndSenders);
cache.merge(metadata.getThingId(), metadata, Metadata::append);
} else {
ConsistencyLag.startS1InChangeQueue(metadata);
cacheShouldAcknowledge.merge(metadata.getThingId(), metadata, Metadata::prependTimersAndSenders);
cacheShouldAcknowledge.merge(metadata.getThingId(), metadata, Metadata::append);
}
}

Expand Down Expand Up @@ -131,19 +127,22 @@ private void dump(final Control dump) {
@SuppressWarnings("unchecked")
private static Function<Control, Source<Map<ThingId, Metadata>, NotUsed>> askSelf(final ActorRef self) {
return message -> Source.completionStageSource(
Patterns.ask(self, message, ASK_SELF_TIMEOUT)
.handle((result, error) -> {
if (result instanceof Map) {
return Source.single((Map<ThingId, Metadata>) result);
} else {
return Source.empty();
}
}))
Patterns.ask(self, message, ASK_SELF_TIMEOUT)
.handle((result, error) -> {
if (result instanceof Map) {
return Source.single((Map<ThingId, Metadata>) result);
} else {
return Source.empty();
}
}))
.withAttributes(Attributes.inputBuffer(1, 1))
.mapMaterializedValue(whatever -> NotUsed.getInstance());
}

private enum Control implements ControlMessage {
// DO NOT give control messages priority over Metadata.
// We want Metadata of each thing to stay in change queue as long as possible in order to aggregate updates
// to reduce DB load.
enum Control {
DUMP,
DUMP_SHOULD_ACKNOWLEDGE
}
Expand Down
Loading

0 comments on commit e50ed57

Please sign in to comment.