Skip to content

Commit

Permalink
Merge pull request #964 from jsanda/hwkmetrics-787-0.30.0
Browse files Browse the repository at this point in the history
[HWKMETRICS-787] release/0.30.0 branch
  • Loading branch information
John Sanda committed May 4, 2018
2 parents 9101b39 + eaf9964 commit ebbaf4b
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 6 deletions.
Expand Up @@ -581,7 +581,7 @@ public <T> Observable<MetricId<T>> findMetricIdentifiersWithFilters(String tenan
results = tagQueryParser.findMetricIdentifiersWithFilters(tenantId, metricType, parsedSimpleTagQuery.getTags())
.map(m -> (MetricId<T>) m);
} catch (Exception e2) {
results = Observable.error(new RuntimeApiError("Unparseable tag query expression."));
results = Observable.error(new RuntimeApiError("Unparseable tag query expression.", e2));
}
}
return results.doOnCompleted(context::stop);
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package org.hawkular.metrics.core.service.tags;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -40,6 +42,7 @@

import com.datastax.driver.core.Row;
import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;

import rx.Observable;
import rx.functions.Func1;
Expand Down Expand Up @@ -118,6 +121,10 @@ static class QueryOptimizer {

enum RegExpOptimizer {OR_SINGLE_SEEK, NONE }

// This is to restrict the amount of rows fetched, we know the cost so it is a constant complexity operation
public static final String OPENSHIFT_OPTIMIZED_TAG_QUERY = "pod_id";

public static final long GROUP_OPENSHIFT_OPTIMIZATION = 01;
public static final long GROUP_A_COST = 10;
public static final long GROUP_A_OR_COST = 11;
public static final long GROUP_B_COST = 50;
Expand All @@ -134,8 +141,12 @@ enum RegExpOptimizer {OR_SINGLE_SEEK, NONE }
*/
public static Map<Long, List<Query>> reOrderTagsQuery(Map<String, String> tagsQuery,
boolean enableACostQueries) {

ArrayList<Query> groupASortedList = new ArrayList<>();
ArrayList<Query> openshiftQuery = new ArrayList<>(1);
Map<Long, List<Query>> costSortedMap = new TreeMap<>();
costSortedMap.put(GROUP_A_COST, new ArrayList<>());
costSortedMap.put(GROUP_OPENSHIFT_OPTIMIZATION, openshiftQuery);
costSortedMap.put(GROUP_A_COST, groupASortedList);
costSortedMap.put(GROUP_A_OR_COST, new ArrayList<>());
costSortedMap.put(GROUP_B_COST, new ArrayList<>());
costSortedMap.put(GROUP_C_COST, new ArrayList<>());
Expand All @@ -159,9 +170,37 @@ public static Map<Long, List<Query>> reOrderTagsQuery(Map<String, String> tagsQu
}
}

// There can only be a single occurrence of a tag key in the map of tags passed to
// findMetricIdentifiersWithFilters; so, if and when we find the pod_id tag, we can stop searching. The
// pod_id tag query can have a single or multiple values, so we have to search both GROUP_A_COST and
// GROUP_A_OR_COST.
Query podIdQuery = getPodIdQuery(groupASortedList);
if (podIdQuery == null) {
podIdQuery = getPodIdQuery(costSortedMap.get(GROUP_A_OR_COST));
if (podIdQuery != null) {
openshiftQuery.add(podIdQuery);
}
} else {
openshiftQuery.add(new Query(podIdQuery.tagName, null, podIdQuery.tagValueMatcher));
}

return costSortedMap;
}

private static Query getPodIdQuery(List<Query> queries) {
Query podIdQuery = null;
Iterator<Query> iterator = queries.iterator();
while (iterator.hasNext()) {
Query next = iterator.next();
if (OPENSHIFT_OPTIMIZED_TAG_QUERY.equals(next.tagName)) {
podIdQuery = next;
iterator.remove();
break;
}
}
return podIdQuery;
}

/**
* Not a perfect matching way (regexp can't detect a regexp), but we'll try to match the most common queries to
* tags processor.
Expand Down Expand Up @@ -199,13 +238,54 @@ public Observable<MetricId<?>> findMetricIdentifiersWithFilters(String tenantId,
Map<Long, List<Query>> costSortedMap =
QueryOptimizer.reOrderTagsQuery(tagsQueries, enableACostQueries);

List<Query> openshiftQuery = costSortedMap.get(QueryOptimizer.GROUP_OPENSHIFT_OPTIMIZATION);
List<Query> groupAEntries = costSortedMap.get(QueryOptimizer.GROUP_A_COST);
List<Query> groupAOREntries = costSortedMap.get(QueryOptimizer.GROUP_A_OR_COST);
List<Query> groupBEntries = costSortedMap.get(QueryOptimizer.GROUP_B_COST);
List<Query> groupCEntries = costSortedMap.get(QueryOptimizer.GROUP_C_COST);

Observable<MetricId<?>> groupMetrics = null;

if(openshiftQuery.size() > 0) {
// Filter using this option
Stopwatch stopwatch = Stopwatch.createStarted();
Observable<Metric<?>> enrichedMetrics = Observable.just(openshiftQuery.get(0))
.flatMap(query -> dataAccess.findMetricsByTagNameValue(tenantId, query.getTagName(),
query.getTagValues())
.compose(new TagsIndexRowTransformerFilter<>(metricType))
.compose(new ItemsToSetTransformer<>())
.doOnNext(metricIds -> logQuery(tenantId, query, "pod_id", metricIds)))
.flatMap(Observable::from)
// Here we fetch the full metric definition, namely the tags, for each metric id in a separate
// query. Each query is made against the same partition. I would like fetch the metric definitions
// with a single query using the IN clause; however, CQL does not allow this when one of the columns
// in the SELECT clause is a non-frozen collection. This highlights a problem with the data model.
// If we later decide to introduce a table in which we index on the pod_id, then we can altogether
// avoid querying metrics_idx.
.flatMap(metricId -> {
Stopwatch findMetricStopWatch = Stopwatch.createStarted();
return metricsService.findMetric(metricId)
.doOnCompleted(() -> {
findMetricStopWatch.stop();
logger.debugf("Fetched metric definition for %s in %d ms", metricId,
findMetricStopWatch.elapsed(MILLISECONDS));
});
});

// Use the fetched metrics to filter out the remaining
// No, this isn't pretty.. but it'll suffice
enrichedMetrics = applyBFilters(enrichedMetrics, groupAEntries);
enrichedMetrics = applyBFilters(enrichedMetrics, groupAOREntries);
enrichedMetrics = applyBFilters(enrichedMetrics, groupBEntries);
enrichedMetrics = applyCFilters(enrichedMetrics, groupCEntries);
groupMetrics = enrichedMetrics.map(Metric::getMetricId);
return groupMetrics.doOnCompleted(() -> {
stopwatch.stop();
logger.debugf("Finished fetch metrics using pod_id optimization in %d ms",
stopwatch.elapsed(MILLISECONDS));
});
}

/*
Potential candidates:
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -173,6 +174,119 @@ public void createAndFindMetricsWithTags() throws Exception {
assertEquals(gauges.size(), 8, "Only metrics without a1 and type GAUGE should have been found");
}

@Test
public void findMetricsWithPodIdTagHavingMultipleValues() throws Exception {
String tenantId = "PodIdQueryTestForMultipleIds";
createMetricsWithPodIdTag(tenantId);

String tags =
"pod_id:871497e0-16b6-11e8-985b-54e1ad486be8|941497e0-16b6-11e8-985b-54e1ad486ae2," +
"descriptor_name:memory/usage," +
"type:pod_container," +
"container_name:myservice";
List<MetricId<Double>> actual = metricsService.findMetricIdentifiersWithFilters(tenantId, GAUGE, tags)
.toList()
.toBlocking()
.firstOrDefault(emptyList());
List<MetricId<Double>> expected = ImmutableList.of(
new MetricId<>(tenantId, GAUGE, "M1"),
new MetricId<>(tenantId, GAUGE, "M3")
);

// convert to sets since we do not compare about order
assertEquals(new HashSet<>(actual), new HashSet<>(expected),
"The pod_id query returned unexpected results. Expected " + expected + " but got " + actual);
}

@Test
public void findMetricsWithPodIdTagHavingSingleValue() throws Exception {
String tenantId = "PodIdQueryTestForSingleId";
createMetricsWithPodIdTag(tenantId);

String tags =
"pod_id:871497e0-16b6-11e8-985b-54e1ad486be8," +
"descriptor_name:memory/usage," +
"type:pod_container," +
"container_name:myservice";

List<MetricId<Double>> actual = metricsService.findMetricIdentifiersWithFilters(tenantId, GAUGE, tags)
.toList()
.toBlocking()
.firstOrDefault(emptyList());
List<MetricId<Double>> expected = ImmutableList.of(new MetricId<>(tenantId, GAUGE, "M1"));

// convert to sets since we do not compare about order
assertEquals(new HashSet<>(actual), new HashSet<>(expected),
"The pod_id query returned unexpected results. Expected " + expected + " but got " + actual);
}

private void createMetricsWithPodIdTag(String tenantId) {
List<Metric<Double>> metrics = ImmutableList.of(
new Metric<>(new MetricId<>(tenantId, GAUGE, "M1"), ImmutableMap.of(
"descriptor_name", "memory/usage",
"type", "pod_container",
"pod_id", "871497e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M2"), ImmutableMap.of(
"descriptor_name", "cpu/usage",
"type", "pod_container",
"pod_id", "871497e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M3"), ImmutableMap.of(
"descriptor_name", "memory/usage",
"type", "pod_container",
"pod_id", "941497e0-16b6-11e8-985b-54e1ad486ae2",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M4"), ImmutableMap.of(
"descriptor_name", "cpu/usage",
"type", "pod_container",
"pod_id", "941497e0-16b6-11e8-985b-54e1ad486ae2",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M5"), ImmutableMap.of(
"descriptor_name", "memory/usage",
"type", "pod_container",
"pod_id", "541497e0-16b6-33e8-981c-54e1ad486ae2",
"container_name", "nodeapp"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M6"), ImmutableMap.of(
"descriptor_name", "cpu/usage",
"type", "pod_container",
"pod_id", "541497e0-16b6-33e8-981c-54e1ad486ae2",
"container_name", "nodeapp"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M7"), ImmutableMap.of(
"descriptor_name", "memory/page_faults",
"type", "pod_container",
"pod_id", "871497e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M8"), ImmutableMap.of(
"descriptor_name", "memory/major_page_faults",
"type", "pod_container",
"pod_id", "871497e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M7"), ImmutableMap.of(
"descriptor_name", "cpu/usage",
"type", "deployment",
"pod_id", "93a697e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
)),
new Metric<>(new MetricId<>(tenantId, GAUGE, "M8"), ImmutableMap.of(
"descriptor_name", "memory/usage",
"type", "deployment",
"pod_id", "93a697e0-16b6-11e8-985b-54e1ad486be8",
"container_name", "myservice"
))
);

metrics.forEach(metric -> metricsService.createMetric(metric, true).toBlocking().lastOrDefault(null));
}

@Test
public void tagValueSearch() throws Exception {
String tenantId = "t1tag";
Expand Down
Expand Up @@ -73,7 +73,8 @@ public void testMultiSingleQueriesMatcher() {
public void testReOrder() {
Map<String, String> tagsQuery = Maps.newHashMap();
tagsQuery.put("pod_hit", "norate"); // A
tagsQuery.put("pod_id", "pod1|pod2|pod3"); // AOR
tagsQuery.put("env", "qa|stage|prod"); // AOR
tagsQuery.put("pod_id", "pod1|pod2|pod3"); // openshift pod_id optimization
tagsQuery.put("time", "*"); // B
tagsQuery.put("!seek", ""); // C
tagsQuery.put("pod_name", "pod1|!pod2"); // B
Expand All @@ -84,8 +85,12 @@ public void testReOrder() {

assertEquals(1, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_A_COST).size());
assertEquals(1, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_A_OR_COST).size());
assertEquals(3, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_A_OR_COST).get(0).getTagValues().length);
assertEquals(3, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_A_OR_COST).get(0)
.getTagValues().length);
assertEquals(2, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_B_COST).size());
assertEquals(1, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_C_COST).size());
assertEquals(1, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_OPENSHIFT_OPTIMIZATION).size());
assertEquals(3, entries.get(SimpleTagQueryParser.QueryOptimizer.GROUP_OPENSHIFT_OPTIMIZATION).get(0)
.getTagValues().length);
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -254,6 +254,7 @@ class TenantITest extends RESTTest {
}

@Test
@Ignore
void deleteTenantHavingNoMetrics() {
def response = hawkularMetrics.post(path: "tenants",
headers: [
Expand Down

0 comments on commit ebbaf4b

Please sign in to comment.