Navigation Menu

Skip to content

Commit

Permalink
Adds tag to search tracing timers to distinguish between count and query
Browse files Browse the repository at this point in the history
Signed-off-by: Klem Yannic (INST/ECS1) <Yannic.Klem@bosch-si.com>
  • Loading branch information
Yannic92 committed Jul 10, 2018
1 parent 30816e9 commit e46b5f9
Showing 1 changed file with 57 additions and 14 deletions.
Expand Up @@ -105,9 +105,13 @@ public final class SearchActor extends AbstractActor {
private static final int QUERY_ASK_TIMEOUT = 500;
private static final int THINGS_ASK_TIMEOUT = 60 * 1000;

private static final String TRACING_QUERY_PARSING = "things_search_query_parsing";
private static final String TRACING_DATABASE_ACCESS = "things_search_database_access";
private static final String TRACING_THINGS_SERVICE_ACCESS = "things_search_things_service_access";
private static final String TRACING_THINGS_SEARCH = "things_search";
private static final String QUERY_PARSING_SEGMENT_SUFFIX = "_query_parsing";
private static final String TRACING_QUERY_PARSING = TRACING_THINGS_SEARCH + QUERY_PARSING_SEGMENT_SUFFIX;
private static final String DATABASE_ACCESS_SUFFIX = "_database_access";
private static final String TRACING_DATABASE_ACCESS = TRACING_THINGS_SEARCH + DATABASE_ACCESS_SUFFIX;
private static final String TRACING_THINGS_SERVICE_ACCESS = TRACING_THINGS_SEARCH + "_things_service_access";
private static final String QUERY_TYPE_TAG = "query_type";

private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
private final QueryFilterCriteriaFactory queryFilterCriteriaFactory =
Expand Down Expand Up @@ -189,9 +193,12 @@ private void count(final Command countThings) {
log.info("Processing CountThings command: {}", countThings);
final JsonSchemaVersion version = countThings.getImplementedSchemaVersion();

final String queryType = "count";

final MutableKamonTimer countTimer =
startNewTimer(TRACING_THINGS_SEARCH, queryType, correlationIdOpt.orElse(null));
final MutableKamonTimer queryParsingTimer =
startNewTimer(TRACING_QUERY_PARSING, correlationIdOpt.orElse(null));
startNewTimer(TRACING_QUERY_PARSING, queryType, correlationIdOpt.orElse(null));

final ActorRef sender = getSender();

Expand All @@ -204,15 +211,29 @@ private void count(final Command countThings) {
LogUtil.enhanceLogWithCorrelationId(log, correlationIdOpt);
queryParsingTimer.stop();
if (query instanceof PolicyRestrictedSearchAggregation) {
final MutableKamonTimer databaseAccessTimer =
startNewTimer(TRACING_DATABASE_ACCESS, queryType,
correlationIdOpt.orElse(null));
// aggregation-based count for things with policies
return processSearchPersistenceResult(
() -> searchPersistence.count((PolicyRestrictedSearchAggregation) query),
dittoHeaders)
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
}))
.map(count -> CountThingsResponse.of(count, dittoHeaders));
} else if (query instanceof Query) {
final MutableKamonTimer databaseAccessTimer =
startNewTimer(TRACING_DATABASE_ACCESS, queryType,
correlationIdOpt.orElse(null));
// count without aggregation for things without policies
return processSearchPersistenceResult(() -> searchPersistence.count((Query) query),
dittoHeaders)
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
}))
.map(count -> CountThingsResponse.of(count, dittoHeaders));
} else if (query instanceof DittoRuntimeException) {
log.info("QueryActor responded with DittoRuntimeException: {}", query);
Expand All @@ -222,6 +243,10 @@ private void count(final Command countThings) {
return Source.single(CountThingsResponse.of(-1, dittoHeaders));
}
})
.via(Flow.fromFunction(result -> {
countTimer.stop();
return result;
}))
.runWith(Sink.head(), materializer), dispatcher)
.to(sender);
}
Expand All @@ -233,7 +258,11 @@ private void query(final QueryThings queryThings) {
log.info("Processing QueryThings command: {}", queryThings);
final JsonSchemaVersion version = queryThings.getImplementedSchemaVersion();

final MutableKamonTimer queryParsingTimer = startNewTimer(TRACING_QUERY_PARSING, correlationIdOpt.orElse(null));
final String queryType = "query";
final MutableKamonTimer searchTimer =
startNewTimer(TRACING_THINGS_SEARCH, queryType, correlationIdOpt.orElse(null));
final MutableKamonTimer queryParsingTimer =
startNewTimer(TRACING_QUERY_PARSING, queryType, correlationIdOpt.orElse(null));

final ActorRef sender = getSender();

Expand All @@ -247,15 +276,29 @@ private void query(final QueryThings queryThings) {
queryParsingTimer.stop();

if (query instanceof PolicyRestrictedSearchAggregation) {
final MutableKamonTimer databaseAccessTimer =
startNewTimer(TRACING_DATABASE_ACCESS, queryType,
correlationIdOpt.orElse(null));
// policy-based search via aggregation
return processSearchPersistenceResult(
() -> searchPersistence.findAll((PolicyRestrictedSearchAggregation) query),
dittoHeaders)
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
}))
.flatMapConcat(resultList -> retrieveThingsForIds(resultList, queryThings));
} else if (query instanceof Query) {
final MutableKamonTimer databaseAccessTimer =
startNewTimer(TRACING_DATABASE_ACCESS, queryType,
correlationIdOpt.orElse(null));
// api/1 search via 'find'
return processSearchPersistenceResult(() -> searchPersistence.findAll((Query) query),
dittoHeaders)
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
}))
.flatMapConcat(resultList -> retrieveThingsForIds(resultList, queryThings));
} else if (query instanceof DittoRuntimeException) {
log.info("QueryActor responded with DittoRuntimeException: {}", query);
Expand All @@ -267,23 +310,22 @@ private void query(final QueryThings queryThings) {
QueryThingsResponse.of(SearchModelFactory.emptySearchResult(), dittoHeaders));
}
})
.via(Flow.fromFunction(result -> {
searchTimer.stop();
return result;
}))
.runWith(Sink.head(), materializer), dispatcher)
.to(sender);
}

private <T> Source<T, NotUsed> processSearchPersistenceResult(final Supplier<Source<T, NotUsed>> resultSupplier,
final DittoHeaders dittoHeaders) {
final Optional<String> correlationIdOpt = dittoHeaders.getCorrelationId();
final MutableKamonTimer databaseAccessTimer =
startNewTimer(TRACING_DATABASE_ACCESS, correlationIdOpt.orElse(null));

final Source<T, NotUsed> source = resultSupplier.get();

final Flow<T, T, NotUsed> logAndFinishPersistenceSegmentFlow =
Flow.fromFunction(result -> {
// we know that the source provides exactly one ResultList
LogUtil.enhanceLogWithCorrelationId(log, correlationIdOpt);
databaseAccessTimer.stop();
LogUtil.enhanceLogWithCorrelationId(log, dittoHeaders.getCorrelationId());
log.debug("Persistence returned: {}", result);
return result;
});
Expand Down Expand Up @@ -326,7 +368,7 @@ private Graph<SourceShape<QueryThingsResponse>, NotUsed> retrieveThingsForIds(fi

log.debug("About to send command to Things: {}", retrieveThings);
final MutableKamonTimer thingsServiceAccessTimer =
startNewTimer(TRACING_THINGS_SERVICE_ACCESS, correlationIdOpt.orElse(null));
startNewTimer(TRACING_THINGS_SERVICE_ACCESS, "query", correlationIdOpt.orElse(null));

result = retrieveFromThings(thingIds, retrieveThings, thingsServiceAccessTimer);
}
Expand Down Expand Up @@ -382,8 +424,9 @@ private Graph<SourceShape<QueryThingsResponse>, NotUsed> retrieveFromThings(fina
});
}

private static MutableKamonTimer startNewTimer(final String tracingFilter, @Nullable final String correlationId) {
final MutableKamonTimerBuilder timerBuilder = TraceUtils.newTimer(tracingFilter);
private static MutableKamonTimer startNewTimer(final String tracingFilter, final String queryType,
@Nullable final String correlationId) {
final MutableKamonTimerBuilder timerBuilder = TraceUtils.newTimer(tracingFilter).tag(QUERY_TYPE_TAG, queryType);
if (correlationId != null) {
timerBuilder.tag(TracingTags.CORRELATION_ID, correlationId);
}
Expand Down

0 comments on commit e46b5f9

Please sign in to comment.