Skip to content

Commit

Permalink
[HWKMETRICS-777] (#948)
Browse files Browse the repository at this point in the history
* [HWKMETRICS-774] add some logging for tag queries

* [HWKMETICS-774] more logging

* [HWKMETRICS-774] fix logger formatting
  • Loading branch information
John Sanda committed Apr 16, 2018
1 parent 418076c commit 710c07c
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ public void startUp(Session session, String keyspace, boolean resetDb, boolean c
setDefaultTTL(session, keyspace);
initMetrics();

tagQueryParser = new SimpleTagQueryParser(this.dataAccess, this, disableACostOptimization);
int defaultPageSize = session.getCluster().getConfiguration().getQueryOptions().getFetchSize();
int pageThreshold = Integer.getInteger("hawkular.metrics.page-threshold", 10);
tagQueryParser = new SimpleTagQueryParser(this.dataAccess, this, disableACostOptimization, defaultPageSize,
pageThreshold);
expresssionTagQueryParser = new ExpressionTagQueryParser(this.dataAccess, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package org.hawkular.metrics.core.service.tags;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;

import org.hawkular.metrics.core.service.DataAccess;
Expand All @@ -34,8 +36,10 @@
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;
import org.jboss.logging.Logger;

import com.datastax.driver.core.Row;
import com.google.common.base.MoreObjects;

import rx.Observable;
import rx.functions.Func1;
Expand All @@ -47,14 +51,21 @@
*/
public class SimpleTagQueryParser {

private static final Logger logger = Logger.getLogger(SimpleTagQueryParser.class);

private DataAccess dataAccess;
private MetricsService metricsService;
private boolean enableACostQueries;
private int pageSize;
private int pageThreshold;

public SimpleTagQueryParser(DataAccess dataAccess, MetricsService metricsService, boolean disableACostQueries) {
public SimpleTagQueryParser(DataAccess dataAccess, MetricsService metricsService, boolean disableACostQueries,
int pageSize, int pageThreshold) {
this.dataAccess = dataAccess;
this.metricsService = metricsService;
this.enableACostQueries = !disableACostQueries;
this.pageSize = pageSize;
this.pageThreshold = pageThreshold;
}

static class Query {
Expand Down Expand Up @@ -82,6 +93,17 @@ public String getTagValueMatcher() {
public String[] getTagValues() {
return tagValues;
}

public static String toString(String tenantId, Query query, String queryType) {
return MoreObjects.toStringHelper(query)
.omitNullValues()
.add("tenantId", tenantId)
.add("queryType", queryType)
.add("tagName", query.tagName)
.add("tagValueMatcher", query.tagValueMatcher)
.add("tagValues", query.tagValues == null ? null : Arrays.toString(query.tagValues))
.toString();
}
}

// Arrange the queries:
Expand Down Expand Up @@ -171,6 +193,9 @@ public static QueryOptimizer.RegExpOptimizer optimalStrategy(String tagValuesQue
public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId, MetricType<?> metricType,
Map<String, String> tagsQueries) {

logger.debugf("Preparing to optimize and execute %s for tenant %s and for metric type %s", tagsQueries,
tenantId, metricType);

Map<Long, List<Query>> costSortedMap =
QueryOptimizer.reOrderTagsQuery(tagsQueries, enableACostQueries);

Expand Down Expand Up @@ -200,7 +225,8 @@ public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId,
groupMetrics = Observable.from(groupAEntries)
.flatMap(e -> dataAccess.findMetricsByTagNameValue(tenantId, e.getTagName(), e.getTagValueMatcher())
.compose(new TagsIndexRowTransformerFilter<>(metricType))
.compose(new ItemsToSetTransformer<>()))
.compose(new ItemsToSetTransformer<>())
.doOnNext(metricIds -> logQuery(tenantId, e, "A", metricIds)))
.reduce((s1, s2) -> {
s1.retainAll(s2);
return s1;
Expand All @@ -217,6 +243,7 @@ public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId,
.flatMap(e -> dataAccess.findMetricsByTagNameValue(tenantId, e.getTagName(), e.getTagValues())
.compose(new TagsIndexRowTransformerFilter<>(metricType))
.compose(new ItemsToSetTransformer<>())
.doOnNext(metricIds -> logQuery(tenantId, e, "A_OR", metricIds))
.reduce((s1, s2) -> {
s1.addAll(s2);
return s1;
Expand All @@ -240,8 +267,14 @@ public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId,

// Options A+B, A+B+C and A+C
if(!groupBEntries.isEmpty() || !groupCEntries.isEmpty()) {
logger.debug("Fetching metric definitions");
AtomicLong count = new AtomicLong();
Observable<Metric<?>> enrichedMetrics = groupMetrics
.flatMap(metricsService::findMetric);
.flatMap(mId -> metricsService.findMetric(mId)
.doOnNext(m -> count.incrementAndGet())
.doOnTerminate(() ->
logger.debugf("Fetched %d metric definitions for tenant %s", count.get(),
tenantId)));

// Option +B (A+B and A+B+C)
if(!groupBEntries.isEmpty()) {
Expand All @@ -259,14 +292,18 @@ public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId,
// Options B
// Fetch everything from the tagsQueries
groupMetrics = Observable.from(groupBEntries)
.flatMap(e -> dataAccess.findMetricsByTagName(tenantId, e.getTagName())
.flatMap(e -> {
logger.debugf("Fetching all metrics for %s", Query.toString(tenantId, e, "B"));
return dataAccess.findMetricsByTagName(tenantId, e.getTagName())
.filter(tagValueFilter(e.getTagValueMatcher(), 3))
.compose(new TagsIndexRowTransformerFilter<>(metricType))
.compose(new ItemsToSetTransformer<>())
.doOnNext(metricIds -> logQuery(tenantId, e, "B", metricIds))
.reduce((s1, s2) -> {
s1.addAll(s2);
return s1;
}))
});
})
.reduce((s1, s2) -> {
s1.retainAll(s2);
return s1;
Expand All @@ -281,6 +318,8 @@ public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId,
} else {
// Option C
// Fetch all the available metrics for this tenant
logger.warnf("Fetching all metric definitions for tenant %s. This can be an expensive operation for " +
"large data sets which can result in timeouts!", tenantId);
Observable<? extends MetricId<?>> tagsMetrics = dataAccess.findAllMetricsFromTagsIndex()
.compose(new TagsIndexRowTransformerFilter<>(metricType))
.filter(mId -> mId.getTenantId().equals(tenantId));
Expand All @@ -289,18 +328,37 @@ public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId,
.filter(m -> m.getTenantId().equals(tenantId))
.filter(metricTypeFilter(metricType));

AtomicLong count = new AtomicLong();
groupMetrics = applyCFilters(
Observable
.concat(tagsMetrics, dataMetrics)
.distinct()
.flatMap(metricsService::findMetric),
.flatMap(mId -> metricsService.findMetric(mId)
.doOnNext(m -> count.incrementAndGet())
.doAfterTerminate(() ->
logger.infof("Fetched %d metric definitions for tenant %s", count.get(),
tenantId))),
groupCEntries)
.map(Metric::getMetricId);
}

return groupMetrics;
}

private void logQuery(String tenantId, Query query, String queryType, Set<? extends MetricId<?>> metricIds) {
// If debug is enabled, then always log the query info; otherwise, only log query info if the page threshold
// is exceeded so as to avoid spamming the log file. The page threshold is a simple mechanism to let us know
// that a particular query has a large result set and requires a number of round trips to Cassandra that exceeds
// the threshold.
if (logger.isDebugEnabled()) {
logger.debugf("Tag query %s returned %d rows", Query.toString(tenantId, query, queryType),
metricIds.size());
} else if ((metricIds.size() / pageSize) > pageThreshold) {
logger.infof("Tag query %s returned %d rows", Query.toString(tenantId, query, queryType),
metricIds.size());
}
}

private Observable<Metric<?>> applyBFilters(Observable<Metric<?>> metrics, List<Query> groupBEntries) {
for (Query groupBQuery : groupBEntries) {
metrics = metrics
Expand Down

0 comments on commit 710c07c

Please sign in to comment.