Skip to content

Commit

Permalink
Merge pull request #928 from bosch-io/feature/search-persisted
Browse files Browse the repository at this point in the history
Add built-in acknowledgement label "search-persisted"
  • Loading branch information
yufei-cai committed Dec 23, 2020
2 parents cf1a678 + 57c6e57 commit b7d5f5b
Show file tree
Hide file tree
Showing 23 changed files with 312 additions and 78 deletions.
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<!-- ### Testing dependencies versions -->
<junit.version>4.13.1</junit.version>
<assertj.version>3.12.0</assertj.version>
<mutability-detector.version>0.10.2</mutability-detector.version>
<mutability-detector.version>0.10.4</mutability-detector.version>
<equals-verifier.version>3.1.10</equals-verifier.version>
<mockito.version>3.1.0</mockito.version>
<jsonassert.version>1.2.3</jsonassert.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ It is ignored for commands in the live channel.
* **live-response**: For acknowledgement requests of live commands and live messages.
It is fulfilled when a subscriber of the live command or message sends a corresponding response.
It is ignored for commands in the twin channel.
* **search-persisted**: For acknowledgement requests of twin modifying commands.
It is fulfilled when a modifying command has successfully updated the search index of the digital twin.
It is ignored for commands in the live channel.

### Custom acknowledgement labels
In addition to the [built-in](#built-in-acknowledgement-labels) acknowledgement requests,
Expand Down
4 changes: 4 additions & 0 deletions documentation/src/main/resources/pages/ditto/basic-search.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ That means that when a thing is updated and the API (e.g. the HTTP endpoint) ret
will not reflect that change in that instant. The change will most likely be reflected in the search index within
1-2 seconds. In rare cases the duration until consistency is reached again might be higher.

If it is important to know when a twin modification is reflected in the search index, request the
[built-in acknowledgement](#built-in-acknowledgement-labels) `search-persisted` in the corresponding command.
Search index update is successful if the status code of `search-persisted` in the command response is 204 "no content".
Status codes at or above 400 indicate failed search index update due to client or server errors.

## Search queries

Expand Down
2 changes: 1 addition & 1 deletion legal/3rd-party-dependencies/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ org.hamcrest:hamcrest-library:jar:1.3:test
org.json:json:jar:20090211:test
org.json:json:jar:20180130:test
org.mockito:mockito-core:jar:3.1.0:test
org.mutabilitydetector:MutabilityDetector:jar:0.10.2:test
org.mutabilitydetector:MutabilityDetector:jar:0.10.4:test
org.objenesis:objenesis:jar:2.6:test
org.openjdk.jmh:jmh-core:jar:1.21:test
org.openjdk.jmh:jmh-generator-annprocess:jar:1.21:test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,31 @@ public abstract class AbstractCommandAckRequestSetter<C extends WithDittoHeaders
private final AcknowledgementLabel implicitAcknowledgementLabel;
private final Set<AcknowledgementLabel> negatedDittoAcknowledgementLabels;

/**
* Create a command acknowledgment request setter that sets one Ditto acknowledgement label implicitly and
* filters out other Ditto acknowledgment5 labels.
*
* @param implicitAcknowledgementLabel the label to set if the header 'requested-acks' is absent.
*/
protected AbstractCommandAckRequestSetter(final AcknowledgementLabel implicitAcknowledgementLabel) {
negatedDittoAcknowledgementLabels = Collections.unmodifiableSet(
Arrays.stream(DittoAcknowledgementLabel.values())
this(implicitAcknowledgementLabel,
Collections.unmodifiableSet(Arrays.stream(DittoAcknowledgementLabel.values())
.filter(v -> !implicitAcknowledgementLabel.equals(v))
.collect(Collectors.toSet()));
.collect(Collectors.toSet()))
);
}

/**
* Create a command acknowledgement request setter.
*
* @param implicitAcknowledgementLabel the label to set if the header 'requested-acks' is absent.
* @param negatedDittoAcknowledgementLabels the labels to filter out if present.
* @since 2.0.0
*/
protected AbstractCommandAckRequestSetter(final AcknowledgementLabel implicitAcknowledgementLabel,
final Set<AcknowledgementLabel> negatedDittoAcknowledgementLabels) {
this.implicitAcknowledgementLabel = implicitAcknowledgementLabel;
this.negatedDittoAcknowledgementLabels = negatedDittoAcknowledgementLabels;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ public final class DittoAcknowledgementLabel implements AcknowledgementLabel {
*/
public static final DittoAcknowledgementLabel LIVE_RESPONSE = new DittoAcknowledgementLabel("live-response");

/**
* Label for Acknowledgements indicating that a change to an entity (e. g. a thing) has successfully been reflected
* in the search index.
*
* @since 2.0.0
*/
public static final DittoAcknowledgementLabel SEARCH_PERSISTED = new DittoAcknowledgementLabel("search-persisted");

private final AcknowledgementLabel delegate;

private DittoAcknowledgementLabel(final CharSequence labelValue) {
Expand All @@ -52,7 +60,7 @@ private DittoAcknowledgementLabel(final CharSequence labelValue) {
* @return an array containing the Ditto acknowledgement labels, in the order they're declared.
*/
public static AcknowledgementLabel[] values() {
return new AcknowledgementLabel[]{TWIN_PERSISTED, LIVE_RESPONSE};
return new AcknowledgementLabel[]{TWIN_PERSISTED, LIVE_RESPONSE, SEARCH_PERSISTED};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private Acknowledgement getNack(final AcknowledgementLabel label,
dittoRuntimeException.toJson());
}

static boolean isNotBuiltIn(final AcknowledgementRequest request) {
static boolean isNotTwinPersistedOrLiveResponse(final AcknowledgementRequest request) {
return isNotLiveResponse(request) && isNotTwinPersisted(request);
}

Expand All @@ -231,7 +231,8 @@ static boolean isLiveSignal(final Signal<?> signal) {
static boolean hasEffectiveAckRequests(final Signal<?> signal, final Set<AcknowledgementRequest> ackRequests) {
final boolean isLiveSignal = isLiveSignal(signal);
if (signal instanceof ThingEvent && !isLiveSignal) {
return ackRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotBuiltIn);
return ackRequests.stream()
.anyMatch(AcknowledgementForwarderActorStarter::isNotTwinPersistedOrLiveResponse);
} else if (signal instanceof MessageCommand || (isLiveSignal && signal instanceof ThingCommand)) {
return ackRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotTwinPersisted);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Map;
import java.util.stream.Stream;

import javax.annotation.Nullable;

import org.bson.BsonArray;
import org.bson.BsonString;
import org.bson.BsonValue;
Expand Down Expand Up @@ -77,7 +79,7 @@ public static ThingWriteModel toWriteModel(final JsonObject thing,
final Enforcer enforcer,
final long policyRevision) {

return toWriteModel(thing, enforcer, policyRevision, -1);
return toWriteModel(thing, enforcer, policyRevision, -1, null);
}

/**
Expand All @@ -87,13 +89,15 @@ public static ThingWriteModel toWriteModel(final JsonObject thing,
* @param enforcer the policy- or ACL-enforcer of the Thing.
* @param policyRevision revision of the policy for an policy enforcer, or any number for an ACL enforcer.
* @param maxArraySize only arrays smaller than this are indexed.
* @param oldMetadata the meatadata that triggered the search update, possibly containing sender information.
* @return BSON document to write into the search index.
* @throws org.eclipse.ditto.json.JsonMissingFieldException if Thing ID or revision is missing.
*/
public static ThingWriteModel toWriteModel(final JsonObject thing,
final Enforcer enforcer,
final long policyRevision,
final int maxArraySize) {
final int maxArraySize,
@Nullable final Metadata oldMetadata) {

final String extractedThing = thing.getValueOrThrow(Thing.JsonFields.ID);
final ThingId thingId = ThingId.of(extractedThing);
Expand All @@ -117,7 +121,8 @@ public static ThingWriteModel toWriteModel(final JsonObject thing,
.append(FIELD_SORTING, thingCopyForSorting)
.append(FIELD_INTERNAL, flattenedValues);

return ThingWriteModel.of(metadata, thingDocument);
final Metadata metadataToRetain = oldMetadata == null ? metadata : oldMetadata.prependSenders(metadata);
return ThingWriteModel.of(metadataToRetain, thingDocument);
}

private static BsonArray getGlobalRead(final Enforcer enforcer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@
package org.eclipse.ditto.services.thingsearch.persistence.write.model;

import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.models.thingsearch.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;

import akka.actor.ActorRef;

/**
* Data class holding information about a "thingEntities" database record.
Expand All @@ -34,18 +43,21 @@ public final class Metadata {
@Nullable private final PolicyId policyId;
@Nullable private final Long policyRevision;
@Nullable final Instant modified;
private final List<ActorRef> senders;

private Metadata(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyId policyId,
@Nullable final Long policyRevision,
@Nullable final Instant modified) {
@Nullable final Instant modified,
final List<ActorRef> senders) {

this.thingId = thingId;
this.thingRevision = thingRevision;
this.policyId = policyId;
this.policyRevision = policyRevision;
this.modified = modified;
this.senders = senders; // does not need to be made unmodifiable as there is no getter returning that to the "outside world"
}

/**
Expand All @@ -62,7 +74,26 @@ public static Metadata of(final ThingId thingId,
@Nullable final PolicyId policyId,
@Nullable final Long policyRevision) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, null);
return new Metadata(thingId, thingRevision, policyId, policyRevision, null, List.of());
}

/**
* Create an Metadata object retaining the original sender of an event.
*
* @param thingId the Thing ID.
* @param thingRevision the Thing revision.
* @param policyId the Policy ID if the Thing has one.
* @param policyRevision the Policy revision if the Thing has a policy, or null if it does not.
* @param sender the sender
* @return the new Metadata object.
*/
public static Metadata of(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyId policyId,
@Nullable final Long policyRevision,
final ActorRef sender) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, null, List.of(sender));
}

/**
Expand All @@ -81,7 +112,7 @@ public static Metadata of(final ThingId thingId,
@Nullable final Long policyRevision,
@Nullable final Instant modified) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, modified);
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, List.of());
}

/**
Expand Down Expand Up @@ -151,6 +182,39 @@ public Optional<Instant> getModified() {
return Optional.ofNullable(modified);
}

/**
* Prepend new senders to the senders stored in this object.
*
* @param newMetadata a previous metadata record.
* @return the new metadata with concatenated senders.
*/
public Metadata prependSenders(final Metadata newMetadata) {
final List<ActorRef> newSenders =
Stream.concat(newMetadata.senders.stream(), senders.stream()).collect(Collectors.toList());
return new Metadata(newMetadata.thingId, newMetadata.thingRevision, newMetadata.policyId,
newMetadata.policyRevision, newMetadata.modified, newSenders);
}

/**
* Send negative acknowledgements to senders.
*/
public void sendNAck() {
send(Acknowledgement.of(DittoAcknowledgementLabel.SEARCH_PERSISTED, thingId,
HttpStatusCode.INTERNAL_SERVER_ERROR, DittoHeaders.empty()));
}

/**
* Send positive acknowledgements to senders.
*/
public void sendAck() {
send(Acknowledgement.of(DittoAcknowledgementLabel.SEARCH_PERSISTED, thingId, HttpStatusCode.NO_CONTENT,
DittoHeaders.empty()));
}

private void send(final Acknowledgement ack) {
senders.forEach(sender -> sender.tell(ack, ActorRef.noSender()));
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -164,12 +228,13 @@ public boolean equals(final Object o) {
Objects.equals(policyRevision, that.policyRevision) &&
Objects.equals(thingId, that.thingId) &&
Objects.equals(policyId, that.policyId) &&
Objects.equals(modified, that.modified);
Objects.equals(modified, that.modified) &&
Objects.equals(senders, that.senders);
}

@Override
public int hashCode() {
return Objects.hash(thingId, thingRevision, policyId, policyRevision, modified);
return Objects.hash(thingId, thingRevision, policyId, policyRevision, modified, senders);
}

@Override
Expand All @@ -180,6 +245,7 @@ public String toString() {
", policyId=" + policyId +
", policyRevision=" + policyRevision +
", modified=" + modified +
", senders=" + senders +
"]";
}

Expand Down

0 comments on commit b7d5f5b

Please sign in to comment.