Skip to content

Commit

Permalink
[HWKMETRICS-180] findMetricsWithTags supports now MatchType (ANY / ALL)
Browse files Browse the repository at this point in the history
[HWKMETRICS-180] Add tagFiltering query language that allows more complex querying capabilities against tags in the metric definition table
  • Loading branch information
Michael Burman committed Jul 30, 2015
1 parent 531e090 commit 9c13bbb
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ public void findMetrics(
required = false,
allowableValues = "[gauge, availability, counter]")
@QueryParam("type") MetricType metricType,
@ApiParam(value = "List of tags", required = false) @QueryParam("tags") Tags tags) {
@ApiParam(value = "List of tags filters", required = false) @QueryParam("tags") Tags tags) {

if (metricType != null && !MetricType.userTypes().contains(metricType)) {
asyncResponse.resume(badRequest(new ApiError("Incorrect type param")));
return;
}

Observable<Metric> metricObservable = (tags == null) ? metricsService.findMetrics(tenantId, metricType)
: metricsService.findMetricsWithTags(tenantId, tags.getTags(), metricType);
: metricsService.findMetricsWithFilters(tenantId, tags.getTags(), metricType);

metricObservable
.map(MetricDefinition::new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,21 @@ public interface MetricsService {
Observable<Metric> findMetrics(String tenantId, MetricType type);

/**
* Returns tenant's metric definitions. The results can be filtered using a type and tags. Use findMetrics
* if you don't intend to use tags for filtering.
* Find tenant's metrics with filtering abilities. The filtering can take place at the type level or at the
* tag level. The following tags-filtering capabilities are provided in tagsQueries:
*
* @param tags Tag names and values that are used to filter definitions
* @param type Optional MetricType, if null is given all matching metrics are returned
* key: tagName ; value: * -> Find all metrics with tag tagName and any value
* key: tagName ; value: tagValue -> Find all metrics with tag tagName and having value tagValue
* key: tagName ; value: t1|t2|.. -> Find all metrics with tag tagName and having any of the values
* t1 or t2 etc
*
* @param tenantId
* @param tagsQueries If tagsQueries is empty, empty Observable is returned, use findMetrics(tenantId, type) instead
* @param type If type is null, no type filtering is used
* @return Metric's that are filtered with given conditions
*/
Observable<Metric> findMetricsWithTags(String tenantId, Map<String, String> tags, MetricType type);
Observable<Metric> findMetricsWithFilters(String tenantId, Map<String, String> tagsQueries, MetricType
type);

Observable<Optional<Map<String, String>>> getMetricTags(MetricId id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Observable<ResultSet> insertAvailabilityTag(String tag, String tagValue,

Observable<ResultSet> deleteFromMetricsTagsIndex(Metric metric, Map<String, String> tags);

Observable<ResultSet> findMetricsByTag(String tenantId, String tag);
Observable<ResultSet> findMetricsByTagName(String tenantId, String tag);

Observable<ResultSet> findMetricsFromTagsIndex(String tenantId, Map<String, String> tags);
Observable<ResultSet> findMetricsByTagNameValue(String tenantId, String tag, String tvalue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement findMetricsByTagName;

private PreparedStatement findMetricsFromTagsIndex;
private PreparedStatement findMetricsByTagNameValue;

public DataAccessImpl(Session session) {
this.session = session;
Expand Down Expand Up @@ -326,11 +326,11 @@ protected void initPreparedStatements() {
"WHERE tenant_id = ? AND tname = ? AND tvalue = ? AND type = ? AND metric = ? AND interval = ?");

findMetricsByTagName = session.prepare(
"SELECT tvalue, type, metric, interval " +
"SELECT type, metric, interval, tvalue " +
"FROM metrics_tags_idx " +
"WHERE tenant_id = ? AND tname = ?");

findMetricsFromTagsIndex = session.prepare(
findMetricsByTagNameValue = session.prepare(
"SELECT type, metric, interval " +
"FROM metrics_tags_idx " +
"WHERE tenant_id = ? AND tname = ? AND tvalue = ?");
Expand Down Expand Up @@ -676,14 +676,13 @@ private Observable<ResultSet> executeTagsBatch(Map<String, String> tags,
}

@Override
public Observable<ResultSet> findMetricsByTag(String tenantId, String tag) {
public Observable<ResultSet> findMetricsByTagName(String tenantId, String tag) {
return rxSession.execute(findMetricsByTagName.bind(tenantId, tag));
}

@Override
public Observable<ResultSet> findMetricsFromTagsIndex(String tenantId, Map<String, String> tags) {
return Observable.from(tags.entrySet())
.flatMap(e -> rxSession.execute(findMetricsFromTagsIndex.bind(tenantId, e.getKey(), e.getValue())));
public Observable<ResultSet> findMetricsByTagNameValue(String tenantId, String tag, String tvalue) {
return rxSession.execute(findMetricsByTagNameValue.bind(tenantId, tag, tvalue));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.hawkular.metrics.core.impl;

import static java.util.Comparator.comparingLong;
import static java.util.stream.Collectors.toSet;

import static org.hawkular.metrics.core.api.MetricType.AVAILABILITY;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
Expand Down Expand Up @@ -59,6 +60,9 @@
import org.hawkular.metrics.core.api.RetentionSettings;
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.core.api.TenantAlreadyExistsException;
import org.hawkular.metrics.core.impl.tags.MetricIndex;
import org.hawkular.metrics.core.impl.transformers.TagsIndexResultSetTransformer;
import org.hawkular.metrics.core.impl.transformers.ItemsToSetTransformer;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskService;
Expand Down Expand Up @@ -441,6 +445,10 @@ private Observable<ResultSet> updateRetentionsIndex(Metric metric) {
return dataRetentionUpdated;
}

private Observable<Metric> findMetric(final String tenantId, final MetricIndex index) {
return findMetric(tenantId, index.getType(), index.getId());
}

@Override
public Observable<Metric> findMetric(final MetricId id) {
return dataAccess.findMetric(id)
Expand All @@ -461,14 +469,53 @@ public Observable<Metric> findMetrics(String tenantId, MetricType type) {
}

@Override
public Observable<Metric> findMetricsWithTags(String tenantId, Map<String, String> tags, MetricType type) {
return dataAccess.findMetricsFromTagsIndex(tenantId, tags)
public Observable<Metric> findMetricsWithFilters(String tenantId, Map<String, String> tagsQueries, MetricType
type) {

// SearchQueries that are fetched with tag name
Set<Map.Entry<String, String>> tnames = tagsQueries.entrySet().stream()
.filter(e -> e.getValue().equals("*")).collect(toSet());

// SearchQueries that are fetched with tag name and value
Set<Map.Entry<String, String>> tvalues = tagsQueries.entrySet().stream()
.filter(e -> !e.getValue().equals("*")).collect(toSet());

// Define queries separately for tname and tname,tvalue Cassandra queries
Observable<Set<MetricIndex>> nameMatches = Observable.from(tnames)
.flatMap(e -> dataAccess.findMetricsByTagName(tenantId, e.getKey())
.compose(new TagsIndexResultSetTransformer(type))
.compose(new ItemsToSetTransformer<MetricIndex>()));

Observable<Set<MetricIndex>> valueMatches = Observable.from(tvalues).flatMap(e -> {
String[] values = e.getValue().split("\\|");
return Observable.from(values)
.flatMap(v -> dataAccess.findMetricsByTagNameValue(tenantId, e.getKey(), v)
.compose(new TagsIndexResultSetTransformer(type))
.compose(new ItemsToSetTransformer<MetricIndex>()))
.reduce((s1, s2) -> {
s1.addAll(s2);
return s1;
});
});

// We should not process empty Observables if they were never called (otherwise our intersection is not right)
Observable<Set<MetricIndex>> indexes;
if (!tvalues.isEmpty() && !tnames.isEmpty()) {
indexes = nameMatches.mergeWith(valueMatches);
} else if (tvalues.isEmpty()) {
indexes = nameMatches;
} else {
indexes = valueMatches;
}

// Take intersection of every processed metric index set and fetch metric definitions
return indexes
.reduce((s1, s2) -> {
s1.retainAll(s2);
return s1;
})
.flatMap(Observable::from)
.filter(r -> (type == null && MetricType.userTypes().contains(MetricType.fromCode(r.getInt(0))))
|| MetricType.fromCode(r.getInt(0)) == type)
.distinct(r -> Integer.valueOf(r.getInt(0)).toString() + r.getString(1) + r.getString(2))
.flatMap(r -> findMetric(new MetricId(tenantId, MetricType.fromCode(r.getInt(0)), r.getString
(1), Interval.parse(r.getString(2))))); // We'll need to fetch type here from the Cassandra..
.flatMap(i -> findMetric(tenantId, i));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl.tags;

import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;

/**
* Represents required information from MetricIndex to transform later on to Metric.
*
* HWKMETRICS-114 might render this obsolete
*
* @author Michael Burman
*/
public class MetricIndex {
private final MetricType type;
private final MetricId id;

public MetricIndex(MetricType type, MetricId id) {
this.type = type;
this.id = id;
}

public MetricType getType() {
return type;
}

public MetricId getId() {
return id;
}

@Override
public boolean equals(Object obj) {
if(obj instanceof MetricIndex) {
MetricIndex i = (MetricIndex) obj;
return i.getId().getName().equals(this.getId().getName())
&& i.getType().equals(this.getType());
}
return false;
}

@Override
public int hashCode() {
return this.getId().getName().hashCode() + this.getType().hashCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl.transformers;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import rx.Observable;
import rx.functions.Func1;

/**
* RxJava Composer, transforms emitted items to a Set of items
*
* @author Michael Burman
*/
public class ItemsToSetTransformer<T> implements Observable.Transformer<T, Set<T>> {

@Override
public Observable<Set<T>> call(Observable<T> metricIndexObservable) {
return metricIndexObservable
.toList()
.switchIfEmpty(Observable.from(new HashSet<>()))
.map((Func1<List<T>, HashSet<T>>) HashSet::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl.transformers;

import com.datastax.driver.core.ResultSet;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.impl.tags.MetricIndex;
import rx.Observable;

/**
* Transforms ResultSets from metrics_tags_idx to a MetricIndex. Requires the following order on select:
* type, metric, interval
*
* HWKMETRICS-114 might require changes to this
*
* @author Michael Burman
*/
public class TagsIndexResultSetTransformer implements Observable.Transformer<ResultSet, MetricIndex> {

private MetricType type;

public TagsIndexResultSetTransformer(MetricType type) {
this.type = type;
}

@Override
public Observable<MetricIndex> call(Observable<ResultSet> resultSetObservable) {
return resultSetObservable
.flatMap(Observable::from)
.filter(r -> (type == null
&& MetricType.userTypes().contains(MetricType.fromCode(r.getInt(0))))
|| MetricType.fromCode(r.getInt(0)) == type)
.map(r -> new MetricIndex(MetricType.fromCode(r.getInt(0)), new MetricId(r.getString
(1), Interval.parse(r.getString(2)))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ public Observable<ResultSet> deleteFromMetricsTagsIndex(Metric metric, Map<Strin
}

@Override
public Observable<ResultSet> findMetricsByTag(String tenantId, String tag) {
return delegate.findMetricsByTag(tenantId, tag);
public Observable<ResultSet> findMetricsByTagName(String tenantId, String tag) {
return delegate.findMetricsByTagName(tenantId, tag);
}

@Override
public Observable<ResultSet> findMetricsFromTagsIndex(String tenantId, Map<String, String> tags) {
return delegate.findMetricsFromTagsIndex(tenantId, tags);
public Observable<ResultSet> findMetricsByTagNameValue(String tenantId, String tag, String tvalue) {
return delegate.findMetricsByTagNameValue(tenantId, tag, tvalue);
}
}

0 comments on commit 9c13bbb

Please sign in to comment.