Skip to content

Commit

Permalink
[HWKMETRICS-787] optimize for pod_id queries (#961)
Browse files Browse the repository at this point in the history
* Optimize Openshift tag queries

* [HWKMETRICS-787] clean up pod_id tag query logic and add tests

* [HWKMETRICS-787] clean up pod_id query and add some debug logging

I removed the reduce operation since it is not needed. This whole
optimization is for a particular tag key. There can only be one instance
of a tag key per request; hence, the reduce operation is not needed.

* [HWKMETRICS-787] disable failing test

I spent a few minutes investigating, but I am not going to wast any more
time. The delete tenant functionality has never been used, and the REST
integration tests are tricky due to using the "virtual clock" which in
turn makes these tests brittle.

I will open a separate ticket for disabling the delete tenant
functionality. We can leave the endpoints intact but return an
appropriate status and error message indicating that it is no longer
supported.
  • Loading branch information
John Sanda committed May 4, 2018
1 parent c2c016b commit 4cdd6be
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 6 deletions.
Expand Up @@ -581,7 +581,7 @@ public <T> Observable<MetricId<T>> findMetricIdentifiersWithFilters(String tenan
results = tagQueryParser.findMetricIdentifiersWithFilters(tenantId, metricType, parsedSimpleTagQuery.getTags())
.map(m -> (MetricId<T>) m);
} catch (Exception e2) {
results = Observable.error(new RuntimeApiError("Unparseable tag query expression."));
results = Observable.error(new RuntimeApiError("Unparseable tag query expression.", e2));
}
}
return results.doOnCompleted(context::stop);
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package org.hawkular.metrics.core.service.tags;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -40,6 +42,7 @@

import com.datastax.driver.core.Row;
import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;

import rx.Observable;
import rx.functions.Func1;
Expand Down Expand Up @@ -118,6 +121,10 @@ static class QueryOptimizer {

enum RegExpOptimizer {OR_SINGLE_SEEK, NONE }

// This is to restrict the amount of rows fetched, we know the cost so it is a constant complexity operation
public static final String OPENSHIFT_OPTIMIZED_TAG_QUERY = "pod_id";

public static final long GROUP_OPENSHIFT_OPTIMIZATION = 01;
public static final long GROUP_A_COST = 10;
public static final long GROUP_A_OR_COST = 11;
public static final long GROUP_B_COST = 50;
Expand All @@ -134,8 +141,12 @@ enum RegExpOptimizer {OR_SINGLE_SEEK, NONE }
*/
public static Map<Long, List<Query>> reOrderTagsQuery(Map<String, String> tagsQuery,
boolean enableACostQueries) {

ArrayList<Query> groupASortedList = new ArrayList<>();
ArrayList<Query> openshiftQuery = new ArrayList<>(1);
Map<Long, List<Query>> costSortedMap = new TreeMap<>();
costSortedMap.put(GROUP_A_COST, new ArrayList<>());
costSortedMap.put(GROUP_OPENSHIFT_OPTIMIZATION, openshiftQuery);
costSortedMap.put(GROUP_A_COST, groupASortedList);
costSortedMap.put(GROUP_A_OR_COST, new ArrayList<>());
costSortedMap.put(GROUP_B_COST, new ArrayList<>());
costSortedMap.put(GROUP_C_COST, new ArrayList<>());
Expand All @@ -159,9 +170,37 @@ public static Map<Long, List<Query>> reOrderTagsQuery(Map<String, String> tagsQu
}
}

// There can only be a single occurrence of a tag key in the map of tags passed to
// findMetricIdentifiersWithFilters; so, if and when we find the pod_id tag, we can stop searching. The
// pod_id tag query can have a single or multiple values, so we have to search both GROUP_A_COST and
// GROUP_A_OR_COST.
Query podIdQuery = getPodIdQuery(groupASortedList);
if (podIdQuery == null) {
podIdQuery = getPodIdQuery(costSortedMap.get(GROUP_A_OR_COST));
if (podIdQuery != null) {
openshiftQuery.add(podIdQuery);
}
} else {
openshiftQuery.add(new Query(podIdQuery.tagName, null, podIdQuery.tagValueMatcher));
}

return costSortedMap;
}

private static Query getPodIdQuery(List<Query> queries) {
Query podIdQuery = null;
Iterator<Query> iterator = queries.iterator();
while (iterator.hasNext()) {
Query next = iterator.next();
if (OPENSHIFT_OPTIMIZED_TAG_QUERY.equals(next.tagName)) {
podIdQuery = next;
iterator.remove();
break;
}
}
return podIdQuery;
}

/**
* Not a perfect matching way (regexp can't detect a regexp), but we'll try to match the most common queries to
* tags processor.
Expand Down Expand Up @@ -199,13 +238,54 @@ public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId,
Map<Long, List<Query>> costSortedMap =
QueryOptimizer.reOrderTagsQuery(tagsQueries, enableACostQueries);

List<Query> openshiftQuery = costSortedMap.get(QueryOptimizer.GROUP_OPENSHIFT_OPTIMIZATION);
List<Query> groupAEntries = costSortedMap.get(QueryOptimizer.GROUP_A_COST);
List<Query> groupAOREntries = costSortedMap.get(QueryOptimizer.GROUP_A_OR_COST);
List<Query> groupBEntries = costSortedMap.get(QueryOptimizer.GROUP_B_COST);
List<Query> groupCEntries = costSortedMap.get(QueryOptimizer.GROUP_C_COST);

Observable<MetricId<?>> groupMetrics = null;

if(openshiftQuery.size() > 0) {
// Filter using this option
Stopwatch stopwatch = Stopwatch.createStarted();
Observable<Metric<?>> enrichedMetrics = Observable.just(openshiftQuery.get(0))
.flatMap(query -> dataAccess.findMetricsByTagNameValue(tenantId, query.getTagName(),
query.getTagValues())
.compose(new TagsIndexRowTransformerFilter<>(metricType))
.compose(new ItemsToSetTransformer<>())
.doOnNext(metricIds -> logQuery(tenantId, query, "pod_id", metricIds)))
.flatMap(Observable::from)
// Here we fetch the full metric definition, namely the tags, for each metric id in a separate
// query. Each query is made against the same partition. I would like fetch the metric definitions
// with a single query using the IN clause; however, CQL does not allow this when one of the columns
// in the SELECT clause is a non-frozen collection. This highlights a problem with the data model.
// If we later decide to introduce a table in which we index on the pod_id, then we can altogether
// avoid querying metrics_idx.
.flatMap(metricId -> {
Stopwatch findMetricStopWatch = Stopwatch.createStarted();
return metricsService.findMetric(metricId)
.doOnCompleted(() -> {
findMetricStopWatch.stop();
logger.debugf("Fetched metric definition for %s in %d ms", metricId,
findMetricStopWatch.elapsed(MILLISECONDS));
});
});

// Use the fetched metrics to filter out the remaining
// No, this isn't pretty.. but it'll suffice
enrichedMetrics = applyBFilters(enrichedMetrics, groupAEntries);
enrichedMetrics = applyBFilters(enrichedMetrics, groupAOREntries);
enrichedMetrics = applyBFilters(enrichedMetrics, groupBEntries);
enrichedMetrics = applyCFilters(enrichedMetrics, groupCEntries);
groupMetrics = enrichedMetrics.map(Metric::getMetricId);
return groupMetrics.doOnCompleted(() -> {
stopwatch.stop();
logger.debugf("Finished fetch metrics using pod_id optimization in %d ms",
stopwatch.elapsed(MILLISECONDS));
});
}

/*
Potential candidates:
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -173,6 +174,119 @@ public void createAndFindMetricsWithTags() throws Exception {
assertEquals(gauges.size(), 8, "Only metrics without a1 and type GAUGE should have been found");
}

@Test
public void findMetricsWithPodIdTagHavingMultipleValues() throws Exception {
String tenantId = "PodIdQueryTestForMultipleIds";
createMetricsWithPodIdTag(tenantId);

String tags =
"pod_id:871497e0-16b6-11e8-985b-54e1ad486be8|941497e0-16b6-11e8-985b-54e1ad486ae2," +
"descriptor_name:memory/usage," +
"type:pod_container," +
"container_name:myservice";
List<MetricId<Double>> actual = metricsService.findMetricIdentifiersWithFilters(tenantId, GAUGE, tags)
.toList()
.toBlocking()
.firstOrDefault(emptyList());
List<MetricId<Double>> expected = ImmutableList.of(
new MetricId<>(tenantId, GAUGE, "M1"),
new MetricId<>(tenantId, GAUGE, "M3")
);

// convert to sets since we do not compare about order
assertEquals(new HashSet<>(actual), new HashSet<>(expected),
"The pod_id query returned unexpected results. Expected " + expected + " but got " + actual);
}

@Test
public void findMetricsWithPodIdTagHavingSingleValue() throws Exception {
String tenantId = "PodIdQueryTestForSingleId";
createMetricsWithPodIdTag(tenantId);

String tags =
"pod_id:871497e0-16b6-11e8-985b-54e1ad486be8," +
"descriptor_name:memory/usage," +
"type:pod_container," +
"container_name:myservice";

List<MetricId<Double>> actual = metricsService.findMetricIdentifiersWithFilters(tenantId, GAUGE, tags)
.toList()
.toBlocking()
.firstOrDefault(emptyList());
List<MetricId<Double>> expected = ImmutableList.of(new MetricId<>(tenantId, GAUGE, "M1"));

// convert to sets since we do not compare about order
assertEquals(new HashSet<>(actual), new HashSet<>(expected),
"The pod_id query returned unexpected results. Expected " + expected + " but got " + actual);
}

private void createMetricsWithPodIdTag(String tenantId) {
List<Metric<Double>> metrics = ImmutableList.of(
new Metric<>(new MetricId<>(tenantId, GAUGE, "M1"), ImmutableMap.of(
"descriptor_name", "memory/usage",
"type", "pod_container",
"pod_id", "871497e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M2"), ImmutableMap.of(
"descriptor_name", "cpu/usage",
"type", "pod_container",
"pod_id", "871497e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M3"), ImmutableMap.of(
"descriptor_name", "memory/usage",
"type", "pod_container",
"pod_id", "941497e0-16b6-11e8-985b-54e1ad486ae2",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M4"), ImmutableMap.of(
"descriptor_name", "cpu/usage",
"type", "pod_container",
"pod_id", "941497e0-16b6-11e8-985b-54e1ad486ae2",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M5"), ImmutableMap.of(
"descriptor_name", "memory/usage",
"type", "pod_container",
"pod_id", "541497e0-16b6-33e8-981c-54e1ad486ae2",
"container_name", "nodeapp"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M6"), ImmutableMap.of(
"descriptor_name", "cpu/usage",
"type", "pod_container",
"pod_id", "541497e0-16b6-33e8-981c-54e1ad486ae2",
"container_name", "nodeapp"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M7"), ImmutableMap.of(
"descriptor_name", "memory/page_faults",
"type", "pod_container",
"pod_id", "871497e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M8"), ImmutableMap.of(
"descriptor_name", "memory/major_page_faults",
"type", "pod_container",
"pod_id", "871497e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M7"), ImmutableMap.of(
"descriptor_name", "cpu/usage",
"type", "deployment",
"pod_id", "93a697e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M8"), ImmutableMap.of(
"descriptor_name", "memory/usage",
"type", "deployment",
"pod_id", "93a697e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
))
);

metrics.forEach(metric -> metricsService.createMetric(metric, true).toBlocking().lastOrDefault(null));
}

@Test
public void tagValueSearch() throws Exception {
String tenantId = "t1tag";
Expand Down
Expand Up @@ -73,7 +73,8 @@ public void testMultiSingleQueriesMatcher() {
public void testReOrder() {
Map<String, String> tagsQuery = Maps.newHashMap();
tagsQuery.put("pod_hit", "norate"); // A
tagsQuery.put("pod_id", "pod1|pod2|pod3"); // AOR
tagsQuery.put("env", "qa|stage|prod"); // AOR
tagsQuery.put("pod_id", "pod1|pod2|pod3"); // openshift pod_id optimization
tagsQuery.put("time", "*"); // B
tagsQuery.put("!seek", ""); // C
tagsQuery.put("pod_name", "pod1|!pod2"); // B
Expand All @@ -84,8 +85,12 @@ public void testReOrder() {

assertEquals(1, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_A_COST).size());
assertEquals(1, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_A_OR_COST).size());
assertEquals(3, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_A_OR_COST).get(0).getTagValues().length);
assertEquals(3, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_A_OR_COST).get(0)
.getTagValues().length);
assertEquals(2, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_B_COST).size());
assertEquals(1, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_C_COST).size());
assertEquals(1, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_OPENSHIFT_OPTIMIZATION).size());
assertEquals(3, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_OPENSHIFT_OPTIMIZATION).get(0)
.getTagValues().length);
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -254,6 +254,7 @@ class TenantITest extends RESTTest {
}

@Test
@Ignore
void deleteTenantHavingNoMetrics() {
def response = hawkularMetrics.post(path: "tenants",
headers: [
Expand Down

0 comments on commit 4cdd6be

Please sign in to comment.