Skip to content

Commit

Permalink
Fetch by metric name from Cassandra always and enable regexp filterin…
Browse files Browse the repository at this point in the history
…g for tag values + additional helpers for negation and all queries
  • Loading branch information
Michael Burman committed Jul 31, 2015
1 parent 570d837 commit a8a24e3
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.hawkular.metrics.core.impl;

import static java.util.Comparator.comparingLong;
import static java.util.stream.Collectors.toSet;

import static org.hawkular.metrics.core.api.MetricType.AVAILABILITY;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
Expand All @@ -43,6 +42,7 @@
import java.util.concurrent.Executors;
import java.util.function.Predicate;

import java.util.regex.Pattern;
import org.hawkular.metrics.core.api.AvailabilityBucketDataPoint;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.BucketedOutput;
Expand All @@ -60,8 +60,8 @@
import org.hawkular.metrics.core.api.RetentionSettings;
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.core.api.TenantAlreadyExistsException;
import org.hawkular.metrics.core.impl.transformers.TagsIndexResultSetTransformer;
import org.hawkular.metrics.core.impl.transformers.ItemsToSetTransformer;
import org.hawkular.metrics.core.impl.transformers.TagsIndexRowTransformer;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskService;
Expand Down Expand Up @@ -460,57 +460,53 @@ public Observable<Metric> findMetrics(String tenantId, MetricType type) {
return typeObservable.flatMap(t -> dataAccess.findMetricsInMetricsIndex(tenantId, t))
.flatMap(Observable::from)
.map(row -> new Metric(new MetricId(tenantId, type, row.getString(0), Interval.parse(row.getString(1))),
row.getMap(2, String.class, String.class), row.getInt(3)));
row.getMap(2, String.class, String.class), row.getInt(3)));
}

@Override
public Observable<Metric> findMetricsWithFilters(String tenantId, Map<String, String> tagsQueries, MetricType
type) {

// SearchQueries that are fetched with tag name
Set<Map.Entry<String, String>> tnames = tagsQueries.entrySet().stream()
.filter(e -> e.getValue().equals("*")).collect(toSet());

// SearchQueries that are fetched with tag name and value
Set<Map.Entry<String, String>> tvalues = tagsQueries.entrySet().stream()
.filter(e -> !e.getValue().equals("*")).collect(toSet());

// Define queries separately for tname and tname,tvalue Cassandra queries
Observable<Set<MetricId>> nameMatches = Observable.from(tnames)
.flatMap(e -> dataAccess.findMetricsByTagName(tenantId, e.getKey())
.compose(new TagsIndexResultSetTransformer(tenantId, type))
.compose(new ItemsToSetTransformer<MetricId>()));

Observable<Set<MetricId>> valueMatches = Observable.from(tvalues).flatMap(e -> {
String[] values = e.getValue().split("\\|");
return Observable.from(values)
.flatMap(v -> dataAccess.findMetricsByTagNameValue(tenantId, e.getKey(), v)
.compose(new TagsIndexResultSetTransformer(tenantId, type))
.compose(new ItemsToSetTransformer<MetricId>()))
.reduce((s1, s2) -> {
s1.addAll(s2);
return s1;
});
});

// We should not process empty Observables if they were never called (otherwise our intersection is not right)
Observable<Set<MetricId>> indexes;
if (!tvalues.isEmpty() && !tnames.isEmpty()) {
indexes = nameMatches.mergeWith(valueMatches);
} else if (tvalues.isEmpty()) {
indexes = nameMatches;
} else {
indexes = valueMatches;
}

// Take intersection of every processed metric index set and fetch metric definitions
return indexes
// Fetch everything from the tagsQueries
return Observable.from(tagsQueries.entrySet())
.flatMap(entry -> {
// Special case "*" allowed and ! indicates match shouldn't happen
boolean positive = (!entry.getValue().startsWith("!"));
Pattern p = filterPattern(entry.getValue());

return Observable.just(entry)
.flatMap(e -> dataAccess.findMetricsByTagName(tenantId, e.getKey())
.flatMap(Observable::from)
.filter(r -> positive == p.matcher(r.getString(3)).matches()) // XNOR
.compose(new TagsIndexRowTransformer(tenantId, type))
.compose(new ItemsToSetTransformer<MetricId>())
.reduce((s1, s2) -> {
s1.addAll(s2);
return s1;
}));
})
.reduce((s1, s2) -> {
s1.retainAll(s2);
return s1;
})
.flatMap(Observable::from)
.flatMap(i -> findMetric(i));
.flatMap(this::findMetric);
}

/**
* Allow special cases to Pattern matching, such as "*" -> ".*" and ! indicating the match shouldn't
* happen. The first ! indicates the rest of the pattern should not match.
*
* @param inputRegexp Regexp given by the user
* @return Pattern modified to allow special cases in the query language
*/
private Pattern filterPattern(String inputRegexp) {
if (inputRegexp.equals("*")) {
inputRegexp = ".*";
} else if(inputRegexp.startsWith("!")) {
inputRegexp = inputRegexp.substring(1);
}
return Pattern.compile(inputRegexp); // Catch incorrect patterns..
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,32 @@
*/
package org.hawkular.metrics.core.impl.transformers;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import rx.Observable;

/**
* Transforms ResultSets from metrics_tags_idx to a MetricId. Requires the following order on select:
* Transforms ResultSets's Rows from metrics_tags_idx to a MetricId. Requires the following order on select:
* type, metric, interval
*
* @author Michael Burman
*/
public class TagsIndexResultSetTransformer implements Observable.Transformer<ResultSet, MetricId> {
public class TagsIndexRowTransformer implements Observable.Transformer<Row, MetricId> {

private MetricType type;
private String tenantId;

public TagsIndexResultSetTransformer(String tenantId, MetricType type) {
public TagsIndexRowTransformer(String tenantId, MetricType type) {
this.type = type;
this.tenantId = tenantId;
}

@Override
public Observable<MetricId> call(Observable<ResultSet> resultSetObservable) {
public Observable<MetricId> call(Observable<Row> resultSetObservable) {
return resultSetObservable
.flatMap(Observable::from)
// .flatMap(Observable::from)
.filter(r -> (type == null
&& MetricType.userTypes().contains(MetricType.fromCode(r.getInt(0))))
|| MetricType.fromCode(r.getInt(0)) == type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,20 @@ public void createAndFindMetricsWithTags() throws Exception {
.toList().toBlocking().lastOrDefault(null);
assertEquals(metrics.size(), 1, "Only metric m3 should have been returned");
assertEquals(metrics.get(0), m3, "m3 did not match the original inserted metric");

// Test for NOT operator
metrics = metricsService.findMetricsWithFilters("t1", ImmutableMap.of("a2","!4"), GAUGE)
.toList().toBlocking().lastOrDefault(null);
assertEquals(metrics.size(), 2, "Only metrics m3-m4 should have been returned");

metrics = metricsService.findMetricsWithFilters("t1", ImmutableMap.of("a1", "2", "a2","!4"), GAUGE)
.toList().toBlocking().lastOrDefault(null);
assertEquals(metrics.size(), 2, "Only metrics m3-m4 should have been returned");

metrics = metricsService.findMetricsWithFilters("t1", ImmutableMap.of("a2","!4|3"), GAUGE)
.toList().toBlocking().lastOrDefault(null);
assertEquals(metrics.size(), 1, "Only metrics m3 should have been returned");
assertEquals(metrics.get(0), m3, "m3 did not match the original inserted metric");
}

@Test
Expand Down

0 comments on commit a8a24e3

Please sign in to comment.