Skip to content

Commit

Permalink
added timer metric for measuring search updater consistency lag
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jan 28, 2021
1 parent 50d45c0 commit b3bc9e0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 21 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.models.thingsearch.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.services.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;

import akka.actor.ActorRef;
Expand All @@ -43,20 +44,23 @@ public final class Metadata {
@Nullable private final PolicyId policyId;
@Nullable private final Long policyRevision;
@Nullable final Instant modified;
@Nullable private final StartedTimer timer;
private final List<ActorRef> senders;

private Metadata(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyId policyId,
@Nullable final Long policyRevision,
@Nullable final Instant modified,
@Nullable final StartedTimer timer,
final List<ActorRef> senders) {

this.thingId = thingId;
this.thingRevision = thingRevision;
this.policyId = policyId;
this.policyRevision = policyRevision;
this.modified = modified;
this.timer = timer;
this.senders = senders; // does not need to be made unmodifiable as there is no getter returning that to the "outside world"
}

Expand All @@ -67,14 +71,16 @@ private Metadata(final ThingId thingId,
* @param thingRevision the Thing revision.
* @param policyId the Policy ID if the Thing has one.
* @param policyRevision the Policy revision if the Thing has a policy, or null if it does not.
* @param timer an optional timer measuring the search updater's consistency lag.
* @return the new Metadata object.
*/
public static Metadata of(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyId policyId,
@Nullable final Long policyRevision) {
@Nullable final Long policyRevision,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, null, List.of());
return new Metadata(thingId, thingRevision, policyId, policyRevision, null, timer, List.of());
}

/**
Expand All @@ -84,16 +90,18 @@ public static Metadata of(final ThingId thingId,
* @param thingRevision the Thing revision.
* @param policyId the Policy ID if the Thing has one.
* @param policyRevision the Policy revision if the Thing has a policy, or null if it does not.
* @param sender the sender
* @param timer an optional timer measuring the search updater's consistency lag.
* @param sender the sender.
* @return the new Metadata object.
*/
public static Metadata of(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyId policyId,
@Nullable final Long policyRevision,
@Nullable final StartedTimer timer,
final ActorRef sender) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, null, List.of(sender));
return new Metadata(thingId, thingRevision, policyId, policyRevision, null, timer, List.of(sender));
}

/**
Expand All @@ -104,15 +112,17 @@ public static Metadata of(final ThingId thingId,
* @param policyId the Policy ID if the Thing has one.
* @param policyRevision the Policy revision if the Thing has a policy, or null if it does not.
* @param modified the timestamp of the last change incorporated into the search index, or null if not known.
* @param timer an optional timer measuring the search updater's consistency lag.
* @return the new Metadata object.
*/
public static Metadata of(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyId policyId,
@Nullable final Long policyRevision,
@Nullable final Instant modified) {
@Nullable final Instant modified,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, List.of());
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, timer, List.of());
}

/**
Expand All @@ -124,7 +134,8 @@ public static Metadata of(final ThingId thingId,
public static Metadata fromResponse(final UpdateThingResponse updateThingResponse) {
return of(updateThingResponse.getThingId(), updateThingResponse.getThingRevision(),
updateThingResponse.getPolicyId().orElse(null),
updateThingResponse.getPolicyRevision().orElse(null));
updateThingResponse.getPolicyRevision().orElse(null),
null);
}

/**
Expand Down Expand Up @@ -192,7 +203,7 @@ public Metadata prependSenders(final Metadata newMetadata) {
final List<ActorRef> newSenders =
Stream.concat(newMetadata.senders.stream(), senders.stream()).collect(Collectors.toList());
return new Metadata(newMetadata.thingId, newMetadata.thingRevision, newMetadata.policyId,
newMetadata.policyRevision, newMetadata.modified, newSenders);
newMetadata.policyRevision, newMetadata.modified, newMetadata.timer, newSenders);
}

/**
Expand All @@ -212,6 +223,9 @@ public void sendAck() {
}

private void send(final Acknowledgement ack) {
if (null != timer) {
timer.tag("success", ack.isSuccess()).stop();
}
senders.forEach(sender -> sender.tell(ack, ActorRef.noSender()));
}

Expand Down
Expand Up @@ -38,6 +38,9 @@
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.streaming.StreamAck;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.timer.OnStopHandler;
import org.eclipse.ditto.services.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.signals.events.things.ThingEvent;

import akka.actor.AbstractActor;
Expand All @@ -52,6 +55,8 @@
*/
final class ThingUpdater extends AbstractActor {

private static final String SEARCH_UPDATER_CONSISTENCY_LAG = "things_search_updater_consistency_lag";

private static final AcknowledgementRequest SEARCH_PERSISTED_REQUEST =
AcknowledgementRequest.of(DittoAcknowledgementLabel.SEARCH_PERSISTED);

Expand Down Expand Up @@ -113,24 +118,27 @@ private void stopThisActor(final ReceiveTimeout receiveTimeout) {

/**
* Export the metadata of this updater.
*
* @param timer an optional timer measuring the search updater's consistency lag.
*/
private Metadata exportMetadata() {
return Metadata.of(thingId, thingRevision, policyId, policyRevision);
private Metadata exportMetadata(@Nullable final StartedTimer timer) {
return Metadata.of(thingId, thingRevision, policyId, policyRevision, timer);
}

private Metadata exportMetadataWithSender(final boolean shouldAcknowledge, final ActorRef sender) {
private Metadata exportMetadataWithSender(final boolean shouldAcknowledge, final ActorRef sender,
final StartedTimer consistencyLagTimer) {
if (shouldAcknowledge) {
return Metadata.of(thingId, thingRevision, policyId, policyRevision, sender);
return Metadata.of(thingId, thingRevision, policyId, policyRevision, consistencyLagTimer, sender);
} else {
return exportMetadata();
return exportMetadata(consistencyLagTimer);
}
}

/**
* Push metadata of this updater to the queue of thing-changes to be streamed into the persistence.
*/
private void enqueueMetadata() {
enqueueMetadata(exportMetadata());
enqueueMetadata(exportMetadata(null));
}

private void enqueueMetadata(final Metadata metadata) {
Expand Down Expand Up @@ -160,7 +168,7 @@ private void updateThing(final UpdateThing updateThing) {

private void processUpdateThingResponse(final UpdateThingResponse response) {
if (!response.isSuccess()) {
final Metadata metadata = exportMetadata();
final Metadata metadata = exportMetadata(null);
log.warning("Got negative acknowledgement for <{}>; updating to <{}>.",
Metadata.fromResponse(response),
metadata);
Expand Down Expand Up @@ -189,20 +197,23 @@ private void processPolicyReferenceTag(final PolicyReferenceTag policyReferenceT
}

private void processThingEvent(final ThingEvent<?> thingEvent) {
log.withCorrelationId(thingEvent);
log.debug("Received new thing event for thing id <{}> with revision <{}>.", thingId, thingEvent.getRevision());
final DittoDiagnosticLoggingAdapter l = log.withCorrelationId(thingEvent);
l.debug("Received new thing event for thing id <{}> with revision <{}>.", thingId, thingEvent.getRevision());
final boolean shouldAcknowledge =
thingEvent.getDittoHeaders().getAcknowledgementRequests().contains(SEARCH_PERSISTED_REQUEST);

// check if the revision is valid (thingEvent.revision = 1 + sequenceNumber)
if (thingEvent.getRevision() <= thingRevision && !shouldAcknowledge) {
log.debug("Dropped thing event for thing id <{}> with revision <{}> because it was older than or "
l.debug("Dropped thing event for thing id <{}> with revision <{}> because it was older than or "
+ "equal to the current sequence number <{}> of the update actor.", thingId,
thingEvent.getRevision(), thingRevision);
} else {
log.debug("Applying thing event <{}>.", thingEvent);
l.debug("Applying thing event <{}>.", thingEvent);
thingRevision = thingEvent.getRevision();
enqueueMetadata(exportMetadataWithSender(shouldAcknowledge, getSender()));
final StartedTimer timer = DittoMetrics.expiringTimer(SEARCH_UPDATER_CONSISTENCY_LAG).build()
.onStop(new OnStopHandler(stimer ->
l.debug("Applied thing event to search index in duration: <{}>", stimer.getDuration())));
enqueueMetadata(exportMetadataWithSender(shouldAcknowledge, getSender(), timer));
}
}

Expand Down
Expand Up @@ -17,7 +17,7 @@
/**
* Contains {@code stoppedTimerConsumer} to be invoked when a Timer stops.
*/
public class OnStopHandler {
public final class OnStopHandler {

private final Consumer<StoppedTimer> stoppedTimerConsumer;

Expand Down

0 comments on commit b3bc9e0

Please sign in to comment.