Skip to content

Commit

Permalink
Merge pull request #921 from rubenvp8510/HWKMETRICS-759
Browse files Browse the repository at this point in the history
[HWKMETRICS-759] Improve simple tag query performance by enabling the A-cost query pos…
  • Loading branch information
John Sanda committed Mar 22, 2018
2 parents 164c5e2 + 9f340e1 commit 24c5ae9
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ <T> Observable<ResultSet> updateRetentionsIndex(String tenantId, MetricType<T> t

Observable<Row> findMetricsByTagName(String tenantId, String tag);

Observable<Row> findMetricsByTagNameValue(String tenantId, String tag, String tvalue);
Observable<Row> findMetricsByTagNameValue(String tenantId, String tag, String ... tvalues);

Observable<Row> findAllMetricsFromTagsIndex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.hawkular.metrics.model.MetricType.STRING;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -510,7 +511,7 @@ protected void initPreparedStatements() {
findMetricsByTagNameValue = session.prepare(
"SELECT tenant_id, type, metric " +
"FROM metrics_tags_idx " +
"WHERE tenant_id = ? AND tname = ? AND tvalue = ?");
"WHERE tenant_id = ? AND tname = ? AND tvalue IN ?");
}

@Override
Expand Down Expand Up @@ -954,8 +955,8 @@ public Observable<Row> findMetricsByTagName(String tenantId, String tag) {
}

@Override
public Observable<Row> findMetricsByTagNameValue(String tenantId, String tag, String tvalue) {
return rxSession.executeAndFetch(findMetricsByTagNameValue.bind(tenantId, tag, tvalue));
public Observable<Row> findMetricsByTagNameValue(String tenantId, String tag, String ... tvalues) {
return rxSession.executeAndFetch(findMetricsByTagNameValue.bind(tenantId, tag, Arrays.asList(tvalues)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public int hashCode() {
/**
* Tools that do tagQueryParsing and execution
*/
private boolean disableACostOptimization;
private TagQueryParser tagQueryParser;

private int defaultTTL = Duration.standardDays(7).toStandardSeconds().getSeconds();
Expand Down Expand Up @@ -288,7 +289,7 @@ public void startUp(Session session, String keyspace, boolean resetDb, boolean c
setDefaultTTL(session, keyspace);
initMetrics();

tagQueryParser = new TagQueryParser(this.dataAccess, this);
tagQueryParser = new TagQueryParser(this.dataAccess, this, disableACostOptimization);
}

void loadDataRetentions() {
Expand Down Expand Up @@ -351,6 +352,7 @@ private void initConfiguration(Session session) {
log.infoInsertRetryConfig(insertMaxRetries, insertRetryMaxDelay);

defaultPageSize = Integer.parseInt(configuration.get("page-size", "5000"));
disableACostOptimization = Boolean.parseBoolean(configuration.get("disable.parser.optimization", "false"));
}

private void setDefaultTTL(Session session, String keyspace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,39 @@ public class TagQueryParser {

private DataAccess dataAccess;
private MetricsService metricsService;
private boolean enableACostQueries;

public TagQueryParser(DataAccess dataAccess, MetricsService metricsService) {
public TagQueryParser(DataAccess dataAccess, MetricsService metricsService, boolean disableACostQueries) {
this.dataAccess = dataAccess;
this.metricsService = metricsService;
this.enableACostQueries = !disableACostQueries;
}

static class Query {
private String tagName;
private String tagValueMatcher;
private String[] tagValues;

public Query(String tagName, String tagValueMatcher, String... tagValues) {
this.tagName = tagName;
if(tagValues.length > 0) {
this.tagValues = tagValues;
} else {
this.tagValueMatcher = tagValueMatcher;
}
}

public String getTagName() {
return tagName;
}

public String getTagValueMatcher() {
return tagValueMatcher;
}

public String[] getTagValues() {
return tagValues;
}
}

// Arrange the queries:
Expand All @@ -62,65 +91,192 @@ public TagQueryParser(DataAccess dataAccess, MetricsService metricsService) {

static class QueryOptimizer {

enum RegExpOptimizer {OR_SINGLE_SEEK, NONE }

public static final long GROUP_A_COST = 10;
public static final long GROUP_A_OR_COST = 11;
public static final long GROUP_B_COST = 50;
public static final long GROUP_C_COST = 99;

public static String IS_REGEXP = "^.*[]?+*{}()\\[^$|\\\\]+.*$|^[!].*";
public static Pattern MATCH_REGEXP = Pattern.compile(IS_REGEXP);

/**
* Sorts the query language parameters based on their query cost.
*
* @param tagsQuery User's TagQL
* @return TreeMap with Long key indicating query cost (lower is better)
*/
public static Map<Long, List<Map.Entry<String, String>>> reOrderTagsQuery(Map<String, String> tagsQuery) {
Map<Long, List<Map.Entry<String, String>>> costSortedMap = new TreeMap<>();
public static Map<Long, List<Query>> reOrderTagsQuery(Map<String, String> tagsQuery,
boolean enableACostQueries) {
Map<Long, List<Query>> costSortedMap = new TreeMap<>();
costSortedMap.put(GROUP_A_COST, new ArrayList<>());
costSortedMap.put(GROUP_A_OR_COST, new ArrayList<>());
costSortedMap.put(GROUP_B_COST, new ArrayList<>());
costSortedMap.put(GROUP_C_COST, new ArrayList<>());

for (Map.Entry<String, String> tagQuery : tagsQuery.entrySet()) {
if(tagQuery.getKey().startsWith("!")) {
// In-memory sorted query, requires fetching all the definitions
List<Map.Entry<String, String>> entries = costSortedMap.get(GROUP_C_COST);
entries.add(tagQuery);
costSortedMap.get(GROUP_C_COST).add(new Query(tagQuery.getKey(), tagQuery.getValue()));
} else if(enableACostQueries && !isRegExp(tagQuery.getValue())) {
costSortedMap.get(GROUP_A_COST).add(new Query(tagQuery.getKey(), tagQuery.getValue()));
} else {
// TODO How to filter exact matching from regexp matching?
List<Map.Entry<String, String>> entries = costSortedMap.get(GROUP_B_COST);
entries.add(tagQuery);
RegExpOptimizer strategy = optimalStrategy(tagQuery.getValue());
switch(strategy) {
case OR_SINGLE_SEEK:
String[] queries = tagQuery.getValue().split("\\|");
costSortedMap.get(GROUP_A_OR_COST).add(new Query(tagQuery.getKey(), null, queries));
break;
default:
costSortedMap.get(GROUP_B_COST).add(new Query(tagQuery.getKey(), tagQuery.getValue()));
}
}
}

return costSortedMap;
}

/**
* Not a perfect matching way (regexp can't detect a regexp), but we'll try to match the most common queries to
* tags processor.
*/
public static boolean isRegExp(String tagValuesQuery) {
return MATCH_REGEXP.matcher(tagValuesQuery).matches();
}

/**
* Detect optimized Cassandra query strategy for regexp query
*/
public static QueryOptimizer.RegExpOptimizer optimalStrategy(String tagValuesQuery) {
String[] orParts = tagValuesQuery.split("\\|");
if(orParts.length > 1) {
// We have multiple smaller queries
for (String orPart : orParts) {
if(isRegExp(orPart)) {
return RegExpOptimizer.NONE;
}
}
// All subqueries were single match queries
return RegExpOptimizer.OR_SINGLE_SEEK;
}

return RegExpOptimizer.NONE;
}
}

public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId, MetricType<?> metricType,
Map<String, String> tagsQueries) {

Map<Long, List<Map.Entry<String, String>>> costSortedMap = QueryOptimizer.reOrderTagsQuery(tagsQueries);
Map<Long, List<Query>> costSortedMap =
QueryOptimizer.reOrderTagsQuery(tagsQueries, enableACostQueries);

List<Query> groupAEntries = costSortedMap.get(QueryOptimizer.GROUP_A_COST);
List<Query> groupAOREntries = costSortedMap.get(QueryOptimizer.GROUP_A_OR_COST);
List<Query> groupBEntries = costSortedMap.get(QueryOptimizer.GROUP_B_COST);
List<Query> groupCEntries = costSortedMap.get(QueryOptimizer.GROUP_C_COST);

Observable<MetricId<?>> groupMetrics = null;

/*
Potential candidates:
List<Map.Entry<String, String>> groupBEntries = costSortedMap.get(QueryOptimizer.GROUP_B_COST);
List<Map.Entry<String, String>> groupCEntries = costSortedMap.get(QueryOptimizer.GROUP_C_COST);
Build a tree, only the first will fetch from Cassandra - rest will at most enrich
A
A+B+C
A+B
A+C
B
B+C
C
*/
if(!groupAEntries.isEmpty() || !groupAOREntries.isEmpty()) {
// Option A
if(!groupAEntries.isEmpty()) {
groupMetrics = Observable.from(groupAEntries)
.flatMap(e -> dataAccess.findMetricsByTagNameValue(tenantId, e.getTagName(), e.getTagValueMatcher())
.compose(new TagsIndexRowTransformer<>(metricType))
.compose(new ItemsToSetTransformer<>()))
.reduce((s1, s2) -> {
s1.retainAll(s2);
return s1;
})
.flatMap(Observable::from);
}

Observable<MetricId<?>> groupMetrics;
// It might be more costly to request n*findMetric from Cassandra, so we'll do extra Cassandra queries
// for OR queries also - better worst case performance

// Fetch everything from the tagsQueries
groupMetrics = Observable.from(groupBEntries)
.flatMap(e -> dataAccess.findMetricsByTagName(tenantId, e.getKey())
.filter(tagValueFilter(e.getValue(), 3))
// Option AOR
if(!groupAOREntries.isEmpty()) {
Observable<MetricId<?>> groupAORMetrics = Observable.from(groupAOREntries)
.flatMap(e -> dataAccess.findMetricsByTagNameValue(tenantId, e.getTagName(), e.getTagValues())
.compose(new TagsIndexRowTransformer<>(metricType))
.compose(new ItemsToSetTransformer<>())
.reduce((s1, s2) -> {
s1.addAll(s2);
return s1;
}))
.reduce((s1, s2) -> {
s1.retainAll(s2);
return s1;
})
.flatMap(Observable::from);

// There might not be any metrics fetched yet.. if this is the only query
if(groupBEntries.isEmpty() && !groupCEntries.isEmpty()) {
if(groupMetrics == null) {
groupMetrics = groupAORMetrics;
} else {
Observable<HashSet<MetricId<?>>> groupAPart = groupMetrics.toList().map(HashSet::new);
Observable<HashSet<MetricId<?>>> groupAORPart = groupAORMetrics.toList().map(HashSet::new);

groupMetrics = groupAPart.mergeWith(groupAORPart)
.reduce((s1, s2) -> {
s1.retainAll(s2);
return s1;
})
.flatMap(Observable::from);
}
}

// Options A+B, A+B+C and A+C
if(!groupBEntries.isEmpty() || !groupCEntries.isEmpty()) {
Observable<Metric<?>> enrichedMetrics = groupMetrics
.flatMap(metricsService::findMetric);

// Option +B (A+B and A+B+C)
if(!groupBEntries.isEmpty()) {
enrichedMetrics = applyBFilters(enrichedMetrics, groupBEntries);
}

// Options +C (A+B+C and A+C)
if(!groupCEntries.isEmpty()) {
enrichedMetrics = applyCFilters(enrichedMetrics, groupCEntries);
}
groupMetrics = enrichedMetrics.map(Metric::getMetricId);
}

} else if(!groupBEntries.isEmpty()) {
// Options B
// Fetch everything from the tagsQueries
groupMetrics = Observable.from(groupBEntries)
.flatMap(e -> dataAccess.findMetricsByTagName(tenantId, e.getTagName())
.filter(tagValueFilter(e.getTagValueMatcher(), 3))
.compose(new TagsIndexRowTransformer<>(metricType))
.compose(new ItemsToSetTransformer<>())
.reduce((s1, s2) -> {
s1.addAll(s2);
return s1;
}))
.reduce((s1, s2) -> {
s1.retainAll(s2);
return s1;
})
.flatMap(Observable::from);

// Option B+C
if(!groupCEntries.isEmpty()) {
groupMetrics = applyCFilters(groupMetrics.flatMap(metricsService::findMetric), groupCEntries)
.map(Metric::getMetricId);
}
} else {
// Option C
// Fetch all the available metrics for this tenant
Observable<? extends MetricId<?>> tagsMetrics = dataAccess.findAllMetricsFromTagsIndex()
.compose(new TagsIndexRowTransformer<>(metricType))
Expand All @@ -130,21 +286,33 @@ public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId,
.filter(m -> m.getTenantId().equals(tenantId))
.filter(metricTypeFilter(metricType));

groupMetrics = Observable.concat(tagsMetrics, dataMetrics).distinct();
groupMetrics = applyCFilters(
Observable
.concat(tagsMetrics, dataMetrics)
.distinct()
.flatMap(metricsService::findMetric),
groupCEntries)
.map(Metric::getMetricId);
}

// Group C processing, everything outside Cassandra
if (groupCEntries.size() > 0) {
// TODO zipWith or something here instead of this monstrosity
Observable<Metric<?>> metrics = groupMetrics.flatMap(metricsService::findMetric);
return groupMetrics;
}

for (Map.Entry<String, String> groupCQuery : groupCEntries) {
metrics = metrics.filter(tagNotExistsFilter(groupCQuery.getKey().substring(1)));
}
groupMetrics = metrics.map(Metric::getMetricId);
private Observable<Metric<?>> applyBFilters(Observable<Metric<?>> metrics, List<Query> groupBEntries) {
for (Query groupBQuery : groupBEntries) {
metrics = metrics
.filter(tagValueFilter(groupBQuery.getTagValueMatcher(), groupBQuery.getTagName()));
}
return metrics;
}

return groupMetrics;
private Observable<Metric<?>> applyCFilters(Observable<Metric<?>> metrics, List<Query> groupCEntries) {
// TODO zipWith or something here instead of this monstrosity
for (Query groupCQuery : groupCEntries) {
metrics = metrics
.filter(tagNotExistsFilter(groupCQuery.getTagName().substring(1)));
}
return metrics;
}

public Observable<Map<String, Set<String>>> getTagValues(String tenantId, MetricType<?> metricType,
Expand Down Expand Up @@ -213,6 +381,19 @@ private Func1<Row, Boolean> tagValueFilter(String regexp, int index) {
return r -> positive == p.matcher(r.getString(index)).matches(); // XNOR
}

private Func1<Metric<?>, Boolean> tagValueFilter(String regexp, String tagName) {
// If no such tagName -> no match
return tMetric -> {
String tagValue = tMetric.getTags().get(tagName);
if(tagValue != null) {
boolean positive = (!regexp.startsWith("!"));
Pattern p = PatternUtil.filterPattern(regexp);
return positive == p.matcher(tagValue).matches(); // XNOR
}
return false;
};
}

public Func1<Row, Boolean> typeFilter(MetricType<?> type) {
return row -> {
MetricType<?> metricType = MetricType.fromCode(row.getByte(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ public Observable<Row> findMetricsByTagName(String tenantId, String tag) {
}

@Override
public Observable<Row> findMetricsByTagNameValue(String tenantId, String tag, String tvalue) {
return delegate.findMetricsByTagNameValue(tenantId, tag, tvalue);
public Observable<Row> findMetricsByTagNameValue(String tenantId, String tag, String ... tvalues) {
return delegate.findMetricsByTagNameValue(tenantId, tag, tvalues);
}

@Override
Expand Down

0 comments on commit 24c5ae9

Please sign in to comment.