Skip to content

Commit

Permalink
Merge pull request #918 from rubenvp8510/HWKMETRICS-757
Browse files Browse the repository at this point in the history
Remove MetricId -> Metric enrichment from places where it is not needed
  • Loading branch information
John Sanda committed Mar 12, 2018
2 parents 0461671 + 5c67069 commit 13bd91d
Show file tree
Hide file tree
Showing 24 changed files with 171 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.model.BucketPoint;
import org.hawkular.metrics.model.Buckets;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;
import org.hawkular.metrics.model.Percentile;
Expand Down Expand Up @@ -591,8 +590,7 @@ private <T> Observable<MetricId<T>> findMetricsByNameOrTags(String tenantId, Lis
}

// Tags case
return metricsService.findMetricsWithFilters(tenantId, type, tags)
.map(Metric::getMetricId);
return metricsService.findMetricIdentifiersWithFilters(tenantId, type, tags);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ public void run() {
for (String dataId : dataIds) {
String tagQuery = tags.get(dataId);
List<Metric<Object>> dataIdMetrics = metricsService
.findMetricsWithFilters(mgt.getTenantId(), null, tagQuery)
.findMetricIdentifiersWithFilters(mgt.getTenantId(), null, tagQuery)
.flatMap(metricsService::findMetric)
.toList()
.toBlocking().firstOrDefault(Collections.emptyList());
dataIdMap.put(dataId, new HashSet<>(dataIdMetrics));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public void getMetrics(

Observable<Metric<AvailabilityType>> metricObservable = null;
if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), AVAILABILITY, tags);
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), AVAILABILITY, tags)
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), AVAILABILITY);
}
Expand Down Expand Up @@ -509,8 +510,7 @@ public void getRawDataByTag(
@ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
@ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
) {
metricsService.findMetricsWithFilters(getTenant(), AVAILABILITY, tags)
.map(Metric::getMetricId)
metricsService.findMetricIdentifiersWithFilters(getTenant(), AVAILABILITY, tags)
.toList()
.flatMap(metricIds -> TimeAndSortParams.<AvailabilityType>deferredBuilder(start, end)
.fromEarliest(fromEarliest, metricIds, this::findTimeRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public void getMetrics(

Observable<Metric<Long>> metricObservable = null;
if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), COUNTER, tags);
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER, tags)
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), COUNTER);
}
Expand Down Expand Up @@ -898,8 +899,7 @@ public void getRawDataByTag(
@ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
@ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
) {
metricsService.findMetricsWithFilters(getTenant(), COUNTER, tags)
.map(Metric::getMetricId)
metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER, tags)
.toList()
.flatMap(metricIds -> TimeAndSortParams.<Long>deferredBuilder(start, end)
.fromEarliest(fromEarliest, metricIds, this::findTimeRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public void getMetrics(

Observable<Metric<Double>> metricObservable = null;
if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), GAUGE, tags);
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, tags)
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), GAUGE);
}
Expand Down Expand Up @@ -907,8 +908,7 @@ public void getRawDataByTag(
@ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
@ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
) {
metricsService.findMetricsWithFilters(getTenant(), GAUGE, tags)
.map(Metric::getMetricId)
metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, tags)
.toList()
.flatMap(metricIds -> TimeAndSortParams.<Double>deferredBuilder(start, end)
.fromEarliest(fromEarliest, metricIds, this::findTimeRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,9 @@ public <T> void findMetrics(
Observable<Metric<T>> metricObservable;

if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), metricType, tags);
if (!Strings.isNullOrEmpty(id)) {
metricObservable = metricObservable.filter(metricsService.idFilter(id));
}
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), metricType, tags)
.filter(metricsService.idFilter(id))
.flatMap(metricsService::findMetric);
} else {
if(!Strings.isNullOrEmpty(id)) {
// HWKMETRICS-461
Expand All @@ -226,7 +225,7 @@ public <T> void findMetrics(
String[] ids = id.split("\\|");
metricObservable = Observable.from(ids)
.map(idPart -> new MetricId<>(getTenant(), metricType, idPart))
.flatMap(mId -> metricsService.findMetric(mId));
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), metricType);
}
Expand Down Expand Up @@ -406,49 +405,45 @@ private Observable<Map<String, Map<String, List<? extends BucketPoint>>>> doStat
namedBucketPoints.bucketPoints));
}
} else {
Observable<Metric<Double>> gauges;
Observable<Metric<Long>> counters;
Observable<MetricId<Double>> gauges;
Observable<MetricId<Long>> counters;

if (types.isEmpty()) {
gaugeStats = getGaugeStatsFromTags(bucketsConfig, percentiles, query.getTags());
counterStats = getCounterStatsFromTags(bucketsConfig, percentiles, query.getTags());
availabilityStats = getAvailabilityStatsFromTags(bucketsConfig, query.getTags());
} else {
if (types.contains(GAUGE) && types.contains(GAUGE_RATE)) {
gauges = metricsService.findMetricsWithFilters(getTenant(), GAUGE, query.getTags()).cache();
gauges = metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, query.getTags()).cache();
gaugeStats = gauges.flatMap(gauge ->
metricsService.findGaugeStats(gauge.getMetricId(), bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(gauge.getMetricId().getName(),
metricsService.findGaugeStats(gauge, bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(gauge.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
Observable<MetricId<Double>> ids = gauges.map(Metric::getMetricId);
gaugeRateStats = getRateStats(ids, bucketsConfig, percentiles);
gaugeRateStats = getRateStats(gauges, bucketsConfig, percentiles);
} else if (types.contains(GAUGE)) {
gaugeStats = getGaugeStatsFromTags(bucketsConfig, percentiles, query.getTags());
} else {
gauges = metricsService.findMetricsWithFilters(getTenant(), GAUGE, query.getTags());
Observable<MetricId<Double>> ids = gauges.map(Metric::getMetricId);
gaugeRateStats = getRateStats(ids, bucketsConfig, percentiles);
gauges = metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, query.getTags());
gaugeRateStats = getRateStats(gauges, bucketsConfig, percentiles);
}

if (types.contains(COUNTER) && types.contains(COUNTER_RATE)) {
counters = metricsService.findMetricsWithFilters(getTenant(), COUNTER, query.getTags())
counters = metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER, query.getTags())
.cache();
counterStats = counters.flatMap(counter ->
metricsService.findCounterStats(counter.getMetricId(), bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(counter.getMetricId().getName(),
metricsService.findCounterStats(counter, bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(counter.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
Observable<MetricId<Long>> ids = counters.map(Metric::getMetricId);
counterRateStats = getRateStats(ids, bucketsConfig, percentiles);
counterRateStats = getRateStats(counters, bucketsConfig, percentiles);
} else if (types.contains(COUNTER)) {
counterStats = getCounterStatsFromTags(bucketsConfig, percentiles, query.getTags());
} else {
counters = metricsService.findMetricsWithFilters(getTenant(), COUNTER, query.getTags());
Observable<MetricId<Long>> ids = counters.map(Metric::getMetricId);
counterRateStats = getRateStats(ids, bucketsConfig, percentiles);
counters = metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER, query.getTags());
counterRateStats = getRateStats(counters, bucketsConfig, percentiles);
}

if (types.contains(AVAILABILITY)) {
Expand Down Expand Up @@ -491,35 +486,36 @@ private void checkRequiredParams(StatsQueryRequest query) {

private Observable<Map<String, List<? extends BucketPoint>>> getCounterStatsFromTags(BucketConfig bucketsConfig,
List<Percentile> percentiles, String tags) {
Observable<Metric<Long>> counters = metricsService.findMetricsWithFilters(getTenant(), COUNTER, tags);
Observable<MetricId<Long>> counters = metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER,
tags);
return counters.flatMap(counter ->
metricsService.findCounterStats(counter.getMetricId(), bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(counter.getMetricId().getName(),
metricsService.findCounterStats(counter, bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(counter.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
}

private Observable<Map<String, List<? extends BucketPoint>>> getGaugeStatsFromTags(BucketConfig bucketsConfig,
List<Percentile> percentiles, String tags) {
Observable<Metric<Double>> gauges = metricsService.findMetricsWithFilters(getTenant(), GAUGE, tags);
Observable<MetricId<Double>> gauges = metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, tags);
return gauges.flatMap(gauge ->
metricsService.findGaugeStats(gauge.getMetricId(), bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(gauge.getMetricId().getName(),
metricsService.findGaugeStats(gauge, bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(gauge.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
}

private Observable<Map<String, List<? extends BucketPoint>>> getAvailabilityStatsFromTags(
BucketConfig bucketsConfig, String tags) {
Observable<Metric<AvailabilityType>> availabilities = metricsService.findMetricsWithFilters(getTenant(),
AVAILABILITY, tags);
Observable<MetricId<AvailabilityType>> availabilities = metricsService.findMetricIdentifiersWithFilters
(getTenant(), AVAILABILITY, tags);
Observable<Map<String, List<? extends BucketPoint>>> availabilityStats;
availabilityStats = availabilities.flatMap(availability -> metricsService.findAvailabilityStats(
availability.getMetricId(), bucketsConfig.getTimeRange().getStart(),
availability, bucketsConfig.getTimeRange().getStart(),
bucketsConfig.getTimeRange().getEnd(), bucketsConfig.getBuckets())
.map(bucketPoints -> new NamedBucketPoints<>(availability.getMetricId().getName(),
.map(bucketPoints -> new NamedBucketPoints<>(availability.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ <T> Observable<MetricId<T>> findMetricsByNameOrTag(List<String> metricNames, Str
.map(id -> new MetricId<>(getTenant(), type, id));
}

return metricsService.findMetricsWithFilters(getTenant(), type, tags)
.map(Metric::getMetricId);
return metricsService.findMetricIdentifiersWithFilters(getTenant(), type, tags);
}

<T> Observable<TimeRange> findTimeRange(String start, String end, Boolean fromEarliest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public void getMetrics(

Observable<Metric<String>> metricObservable = null;
if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), STRING, tags);
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), STRING, tags)
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), STRING);
}
Expand Down Expand Up @@ -371,8 +372,7 @@ public void getRawDataByTag(
@ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
@ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
) {
metricsService.findMetricsWithFilters(getTenant(), STRING, tags)
.map(Metric::getMetricId)
metricsService.findMetricIdentifiersWithFilters(getTenant(), STRING, tags)
.toList()
.flatMap(metricIds -> TimeAndSortParams.<String>deferredBuilder(start, end)
.fromEarliest(fromEarliest, metricIds, this::findTimeRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ public Completable call(JobDetails jobDetails) {
Stopwatch stopwatch = Stopwatch.createStarted();
logger.info("Starting compression of timestamps (UTC) between " + startOfSlice + " - " + endOfSlice);

Observable<? extends MetricId<?>> metricIds = metricsService.findAllMetrics()
.map(Metric::getMetricId)
Observable<? extends MetricId<?>> metricIds = metricsService.findAllMetricIdentifiers()
.filter(m -> (m.getType() == GAUGE || m.getType() == COUNTER || m.getType() == AVAILABILITY));

PublishSubject<Metric<?>> subject = PublishSubject.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ public interface DataAccess {

<T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType<T> type);

Observable<Row> findAllMetricsInData();

Observable<Integer> insertGaugeDatas(Observable<Metric<Double>> gauges,
Function<MetricId<Double>, Integer> ttlFunc);

Observable<Integer> insertGaugeData(Metric<Double> metric);

Observable<Row> findAllMetricIdentifiersInData();

Observable<Integer> insertGaugeData(Metric<Double> metric, int ttl);

Observable<Integer> insertStringDatas(Observable<Metric<String>> strings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType


@Override
public Observable<Row> findAllMetricsInData() {
public Observable<Row> findAllMetricIdentifiersInData() {
return rxSession.executeAndFetch(findAllMetricsInData.bind())
.concatWith(rxSession.executeAndFetch(findAllMetricsInDataCompressed.bind()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public interface MetricsService {
*/
Observable<Void> createMetric(Metric<?> metric, boolean overwrite);

Observable<Metric<?>> findAllMetrics();
Observable<MetricId<?>> findAllMetricIdentifiers();

<T> Observable<Metric<T>> findMetric(MetricId<T> id);

Expand All @@ -124,9 +124,9 @@ public interface MetricsService {
* @param tenantId The id of the tenant to which the metrics belong
* @param type If type is null, no type filtering is used
* @param tagsQuery If tagsQueries is empty, empty Observable is returned, use findMetrics(tenantId, type) instead
* @return Metric's that are filtered with given conditions
* @return MetricIds that are filtered with given conditions
*/
<T> Observable<Metric<T>> findMetricsWithFilters(String tenantId, MetricType<T> type, String tags);
<T> Observable<MetricId<T>> findMetricIdentifiersWithFilters(String tenantId, MetricType<T> type, String tags);

/**
* Returns distinct tag values for a given tag query (using the same query format as {@link
Expand Down Expand Up @@ -354,7 +354,7 @@ Observable<List<NumericBucketPoint>> findRateStats(MetricId<? extends Number> id
*/
Observable<Metric<?>> insertedDataEvents();

<T> Func1<Metric<T>, Boolean> idFilter(String regexp);
<T> Func1<MetricId<T>, Boolean> idFilter(String regexp);

<T> Observable<Void> updateMetricExpiration(Metric<T> metric);
}

0 comments on commit 13bd91d

Please sign in to comment.