Skip to content

Commit

Permalink
Merge pull request #289 from burmanm/hwkmetrics-180
Browse files Browse the repository at this point in the history
[HWKMETRICS-180] findMetricsWithTags supports now MatchType (ANY / ALL)
  • Loading branch information
jsanda committed Aug 4, 2015
2 parents 6371f34 + a61f66a commit 257d051
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 50 deletions.
Expand Up @@ -29,6 +29,7 @@
import java.util.Collection;
import java.util.List;

import java.util.regex.PatternSyntaxException;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -96,21 +97,27 @@ public void findMetrics(
required = false,
allowableValues = "[gauge, availability, counter]")
@QueryParam("type") MetricType metricType,
@ApiParam(value = "List of tags", required = false) @QueryParam("tags") Tags tags) {
@ApiParam(value = "List of tags filters", required = false) @QueryParam("tags") Tags tags) {

if (metricType != null && !MetricType.userTypes().contains(metricType)) {
asyncResponse.resume(badRequest(new ApiError("Incorrect type param")));
asyncResponse.resume(badRequest(new ApiError("Incorrect type param " + metricType.toString())));
return;
}

Observable<Metric> metricObservable = (tags == null) ? metricsService.findMetrics(tenantId, metricType)
: metricsService.findMetricsWithTags(tenantId, tags.getTags(), metricType);
: metricsService.findMetricsWithFilters(tenantId, tags.getTags(), metricType);

metricObservable
.map(MetricDefinition::new)
.toList()
.map(ApiUtils::collectionToResponse)
.subscribe(asyncResponse::resume, t -> asyncResponse.resume(ApiUtils.serverError(t)));
.subscribe(asyncResponse::resume, t -> {
if(t instanceof PatternSyntaxException) {
asyncResponse.resume(ApiUtils.badRequest(t));
} else {
asyncResponse.resume(ApiUtils.serverError(t));
}
});
}

@POST
Expand Down
Expand Up @@ -74,8 +74,8 @@ public static List<DataPoint<Double>> requestToGaugeDataPoints(List<GaugeDataPoi
}

public static Observable<Metric<Double>> requestToGauges(String tenantId, List<Gauge> gauges) {
return Observable.from(gauges).map(g ->
new Metric<>(new MetricId(tenantId, GAUGE, g.getId()), requestToGaugeDataPoints(g.getData())));
return Observable.from(gauges).map(g -> new Metric<>(new MetricId(tenantId, GAUGE, g.getId()),
requestToGaugeDataPoints(g.getData())));
}

public static Observable<Metric<Long>> requestToCounters(String tenantId, List<Counter> counters) {
Expand Down Expand Up @@ -114,4 +114,9 @@ public static Response emptyPayload() {
public static Response badRequest(ApiError error) {
return Response.status(Response.Status.BAD_REQUEST).entity(error).build();
}

public static Response badRequest(Throwable t) {
ApiError error = new ApiError(t.getLocalizedMessage());
return badRequest(error);
}
}
Expand Up @@ -90,13 +90,21 @@ public interface MetricsService {
Observable<Metric> findMetrics(String tenantId, MetricType type);

/**
* Returns tenant's metric definitions. The results can be filtered using a type and tags. Use findMetrics
* if you don't intend to use tags for filtering.
* Find tenant's metrics with filtering abilities. The filtering can take place at the type level or at the
* tag level. The following tags-filtering capabilities are provided in tagsQueries:
*
* @param tags Tag names and values that are used to filter definitions
* @param type Optional MetricType, if null is given all matching metrics are returned
* key: tagName ; value: * -> Find all metrics with tag tagName and any value
* key: tagName ; value: tagValue -> Find all metrics with tag tagName and having value tagValue
* key: tagName ; value: t1|t2|.. -> Find all metrics with tag tagName and having any of the values
* t1 or t2 etc
*
* @param tenantId
* @param tagsQueries If tagsQueries is empty, empty Observable is returned, use findMetrics(tenantId, type) instead
* @param type If type is null, no type filtering is used
* @return Metric's that are filtered with given conditions
*/
Observable<Metric> findMetricsWithTags(String tenantId, Map<String, String> tags, MetricType type);
Observable<Metric> findMetricsWithFilters(String tenantId, Map<String, String> tagsQueries, MetricType
type);

Observable<Optional<Map<String, String>>> getMetricTags(MetricId id);

Expand Down
Expand Up @@ -117,7 +117,7 @@ Observable<ResultSet> insertAvailabilityTag(String tag, String tagValue,

Observable<ResultSet> deleteFromMetricsTagsIndex(Metric metric, Map<String, String> tags);

Observable<ResultSet> findMetricsByTag(String tenantId, String tag);
Observable<ResultSet> findMetricsByTagName(String tenantId, String tag);

Observable<ResultSet> findMetricsFromTagsIndex(String tenantId, Map<String, String> tags);
Observable<ResultSet> findMetricsByTagNameValue(String tenantId, String tag, String tvalue);
}
Expand Up @@ -143,7 +143,7 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement findMetricsByTagName;

private PreparedStatement findMetricsFromTagsIndex;
private PreparedStatement findMetricsByTagNameValue;

public DataAccessImpl(Session session) {
this.session = session;
Expand Down Expand Up @@ -326,11 +326,11 @@ protected void initPreparedStatements() {
"WHERE tenant_id = ? AND tname = ? AND tvalue = ? AND type = ? AND metric = ? AND interval = ?");

findMetricsByTagName = session.prepare(
"SELECT tvalue, type, metric, interval " +
"SELECT type, metric, interval, tvalue " +
"FROM metrics_tags_idx " +
"WHERE tenant_id = ? AND tname = ?");

findMetricsFromTagsIndex = session.prepare(
findMetricsByTagNameValue = session.prepare(
"SELECT type, metric, interval " +
"FROM metrics_tags_idx " +
"WHERE tenant_id = ? AND tname = ? AND tvalue = ?");
Expand Down Expand Up @@ -676,14 +676,13 @@ private Observable<ResultSet> executeTagsBatch(Map<String, String> tags,
}

@Override
public Observable<ResultSet> findMetricsByTag(String tenantId, String tag) {
public Observable<ResultSet> findMetricsByTagName(String tenantId, String tag) {
return rxSession.execute(findMetricsByTagName.bind(tenantId, tag));
}

@Override
public Observable<ResultSet> findMetricsFromTagsIndex(String tenantId, Map<String, String> tags) {
return Observable.from(tags.entrySet())
.flatMap(e -> rxSession.execute(findMetricsFromTagsIndex.bind(tenantId, e.getKey(), e.getValue())));
public Observable<ResultSet> findMetricsByTagNameValue(String tenantId, String tag, String tvalue) {
return rxSession.execute(findMetricsByTagNameValue.bind(tenantId, tag, tvalue));
}

@Override
Expand Down
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.Executors;
import java.util.function.Predicate;

import java.util.regex.Pattern;
import org.hawkular.metrics.core.api.AvailabilityBucketDataPoint;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.BucketedOutput;
Expand All @@ -59,6 +60,8 @@
import org.hawkular.metrics.core.api.RetentionSettings;
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.core.api.TenantAlreadyExistsException;
import org.hawkular.metrics.core.impl.transformers.ItemsToSetTransformer;
import org.hawkular.metrics.core.impl.transformers.TagsIndexRowTransformer;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskService;
Expand Down Expand Up @@ -457,18 +460,53 @@ public Observable<Metric> findMetrics(String tenantId, MetricType type) {
return typeObservable.flatMap(t -> dataAccess.findMetricsInMetricsIndex(tenantId, t))
.flatMap(Observable::from)
.map(row -> new Metric(new MetricId(tenantId, type, row.getString(0), Interval.parse(row.getString(1))),
row.getMap(2, String.class, String.class), row.getInt(3)));
row.getMap(2, String.class, String.class), row.getInt(3)));
}

@Override
public Observable<Metric> findMetricsWithTags(String tenantId, Map<String, String> tags, MetricType type) {
return dataAccess.findMetricsFromTagsIndex(tenantId, tags)
public Observable<Metric> findMetricsWithFilters(String tenantId, Map<String, String> tagsQueries, MetricType
type) {

// Fetch everything from the tagsQueries
return Observable.from(tagsQueries.entrySet())
.flatMap(entry -> {
// Special case "*" allowed and ! indicates match shouldn't happen
boolean positive = (!entry.getValue().startsWith("!"));
Pattern p = filterPattern(entry.getValue());

return Observable.just(entry)
.flatMap(e -> dataAccess.findMetricsByTagName(tenantId, e.getKey())
.flatMap(Observable::from)
.filter(r -> positive == p.matcher(r.getString(3)).matches()) // XNOR
.compose(new TagsIndexRowTransformer(tenantId, type))
.compose(new ItemsToSetTransformer<MetricId>())
.reduce((s1, s2) -> {
s1.addAll(s2);
return s1;
}));
})
.reduce((s1, s2) -> {
s1.retainAll(s2);
return s1;
})
.flatMap(Observable::from)
.filter(r -> (type == null && MetricType.userTypes().contains(MetricType.fromCode(r.getInt(0))))
|| MetricType.fromCode(r.getInt(0)) == type)
.distinct(r -> Integer.valueOf(r.getInt(0)).toString() + r.getString(1) + r.getString(2))
.flatMap(r -> findMetric(new MetricId(tenantId, MetricType.fromCode(r.getInt(0)), r.getString
(1), Interval.parse(r.getString(2))))); // We'll need to fetch type here from the Cassandra..
.flatMap(this::findMetric);
}

/**
* Allow special cases to Pattern matching, such as "*" -> ".*" and ! indicating the match shouldn't
* happen. The first ! indicates the rest of the pattern should not match.
*
* @param inputRegexp Regexp given by the user
* @return Pattern modified to allow special cases in the query language
*/
private Pattern filterPattern(String inputRegexp) {
if (inputRegexp.equals("*")) {
inputRegexp = ".*";
} else if(inputRegexp.startsWith("!")) {
inputRegexp = inputRegexp.substring(1);
}
return Pattern.compile(inputRegexp); // Catch incorrect patterns..
}

@Override
Expand Down
@@ -0,0 +1,39 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl.transformers;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import rx.Observable;
import rx.functions.Func1;

/**
* RxJava Composer, transforms emitted items to a Set of items
*
* @author Michael Burman
*/
public class ItemsToSetTransformer<T> implements Observable.Transformer<T, Set<T>> {

@Override
public Observable<Set<T>> call(Observable<T> metricIndexObservable) {
return metricIndexObservable
.toList()
.switchIfEmpty(Observable.from(new HashSet<>()))
.map((Func1<List<T>, HashSet<T>>) HashSet::new);
}
}
@@ -0,0 +1,51 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl.transformers;

import com.datastax.driver.core.Row;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import rx.Observable;

/**
* Transforms ResultSets's Rows from metrics_tags_idx to a MetricId. Requires the following order on select:
* type, metric, interval
*
* @author Michael Burman
*/
public class TagsIndexRowTransformer implements Observable.Transformer<Row, MetricId> {

private MetricType type;
private String tenantId;

public TagsIndexRowTransformer(String tenantId, MetricType type) {
this.type = type;
this.tenantId = tenantId;
}

@Override
public Observable<MetricId> call(Observable<Row> resultSetObservable) {
return resultSetObservable
// .flatMap(Observable::from)
.filter(r -> (type == null
&& MetricType.userTypes().contains(MetricType.fromCode(r.getInt(0))))
|| MetricType.fromCode(r.getInt(0)) == type)
.map(r -> new MetricId(tenantId, MetricType.fromCode(r.getInt(0)), r.getString
(1), Interval.parse(r.getString(2))));
}
}
Expand Up @@ -239,12 +239,12 @@ public Observable<ResultSet> deleteFromMetricsTagsIndex(Metric metric, Map<Strin
}

@Override
public Observable<ResultSet> findMetricsByTag(String tenantId, String tag) {
return delegate.findMetricsByTag(tenantId, tag);
public Observable<ResultSet> findMetricsByTagName(String tenantId, String tag) {
return delegate.findMetricsByTagName(tenantId, tag);
}

@Override
public Observable<ResultSet> findMetricsFromTagsIndex(String tenantId, Map<String, String> tags) {
return delegate.findMetricsFromTagsIndex(tenantId, tags);
public Observable<ResultSet> findMetricsByTagNameValue(String tenantId, String tag, String tvalue) {
return delegate.findMetricsByTagNameValue(tenantId, tag, tvalue);
}
}

0 comments on commit 257d051

Please sign in to comment.