Skip to content

Commit

Permalink
Add counters for search background sync stream.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 12, 2021
1 parent 4086558 commit 92d5658
Showing 1 changed file with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.eclipse.ditto.internal.utils.akka.streaming.TimestampPersistence;
import org.eclipse.ditto.internal.utils.health.AbstractBackgroundStreamingActorWithConfigWithStatusReport;
import org.eclipse.ditto.internal.utils.health.StatusDetailMessage;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.things.model.ThingConstants;
import org.eclipse.ditto.things.model.ThingId;
Expand Down Expand Up @@ -61,6 +63,10 @@ public final class BackgroundSyncActor
private final BackgroundSyncStream backgroundSyncStream;
private final ActorRef thingsUpdater;

private final Counter streamedSnapshots = DittoMetrics.counter("search_streamed_snapshots");
private final Counter scannedIndexDocs = DittoMetrics.counter("search_scanned_index_docs");
private final Counter inconsistentThings = DittoMetrics.counter("search_inconsistent_things");

private ThingId progressPersisted = EMPTY_THING_ID;
private ThingId progressIndexed = EMPTY_THING_ID;

Expand Down Expand Up @@ -112,10 +118,10 @@ public static Props props(final BackgroundSyncConfig config,
@Override
protected void preEnhanceSleepingBehavior(final ReceiveBuilder sleepingReceiveBuilder) {
sleepingReceiveBuilder.matchEquals(Control.BOOKMARK_THING_ID,
trigger ->
// ignore scheduled bookmark messages when sleeping
log.debug("Ignoring: <{}>", trigger)
)
trigger ->
// ignore scheduled bookmark messages when sleeping
log.debug("Ignoring: <{}>", trigger)
)
.match(ThingId.class, thingId ->
// got outdated progress update message after actor resumes sleeping; ignore it.
log.debug("Ignoring: <{}>", thingId)
Expand Down Expand Up @@ -173,10 +179,12 @@ protected StatusDetailMessage.Level getMostSevereLevelFromEvents(final Deque<Pai
}

private Source<Metadata, NotUsed> streamMetadataFromLowerBound(final ThingId lowerBound) {
final Source<Metadata, NotUsed> persistedMetadata =
getPersistedMetadataSourceWithProgressReporting(lowerBound);
final Source<Metadata, NotUsed> indexedMetadata = getIndexedMetadataSource(lowerBound);
return backgroundSyncStream.filterForInconsistencies(persistedMetadata, indexedMetadata);
final Source<Metadata, NotUsed> persistedMetadata = getPersistedMetadataSourceWithProgressReporting(lowerBound)
.wireTap(x -> streamedSnapshots.increment());
final Source<Metadata, NotUsed> indexedMetadata = getIndexedMetadataSource(lowerBound)
.wireTap(x -> scannedIndexDocs.increment());
return backgroundSyncStream.filterForInconsistencies(persistedMetadata, indexedMetadata)
.wireTap(x -> inconsistentThings.increment());
}

private void setProgress(ProgressReport progress) {
Expand Down

0 comments on commit 92d5658

Please sign in to comment.