Skip to content

Commit

Permalink
Remove MetricId -> Metric enrichment from places where it is not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm committed Aug 3, 2017
1 parent 69eeb41 commit 50cd560
Show file tree
Hide file tree
Showing 23 changed files with 157 additions and 159 deletions.
Expand Up @@ -53,7 +53,6 @@
import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.model.BucketPoint;
import org.hawkular.metrics.model.Buckets;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;
import org.hawkular.metrics.model.Percentile;
Expand Down Expand Up @@ -591,8 +590,7 @@ private <T> Observable<MetricId<T>> findMetricsByNameOrTags(String tenantId, Lis
}

// Tags case
return metricsService.findMetricsWithFilters(tenantId, type, tags)
.map(Metric::getMetricId);
return metricsService.findMetricIdentifiersWithFilters(tenantId, type, tags);
}
}

Expand Down
Expand Up @@ -394,7 +394,8 @@ public void run() {
for (String dataId : dataIds) {
String tagQuery = tags.get(dataId);
List<Metric<Object>> dataIdMetrics = metricsService
.findMetricsWithFilters(mgt.getTenantId(), null, tagQuery)
.findMetricIdentifiersWithFilters(mgt.getTenantId(), null, tagQuery)
.flatMap(metricsService::findMetric)
.toList()
.toBlocking().firstOrDefault(Collections.emptyList());
dataIdMap.put(dataId, new HashSet<>(dataIdMetrics));
Expand Down
Expand Up @@ -146,7 +146,8 @@ public void getMetrics(

Observable<Metric<AvailabilityType>> metricObservable = null;
if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), AVAILABILITY, tags);
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), AVAILABILITY, tags)
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), AVAILABILITY);
}
Expand Down Expand Up @@ -509,8 +510,7 @@ public void getRawDataByTag(
@ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
@ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
) {
metricsService.findMetricsWithFilters(getTenant(), AVAILABILITY, tags)
.map(Metric::getMetricId)
metricsService.findMetricIdentifiersWithFilters(getTenant(), AVAILABILITY, tags)
.toList()
.flatMap(metricIds -> TimeAndSortParams.<AvailabilityType>deferredBuilder(start, end)
.fromEarliest(fromEarliest, metricIds, this::findTimeRange)
Expand Down
Expand Up @@ -152,7 +152,8 @@ public void getMetrics(

Observable<Metric<Long>> metricObservable = null;
if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), COUNTER, tags);
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER, tags)
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), COUNTER);
}
Expand Down Expand Up @@ -898,8 +899,7 @@ public void getRawDataByTag(
@ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
@ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
) {
metricsService.findMetricsWithFilters(getTenant(), COUNTER, tags)
.map(Metric::getMetricId)
metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER, tags)
.toList()
.flatMap(metricIds -> TimeAndSortParams.<Long>deferredBuilder(start, end)
.fromEarliest(fromEarliest, metricIds, this::findTimeRange)
Expand Down
Expand Up @@ -149,7 +149,8 @@ public void getMetrics(

Observable<Metric<Double>> metricObservable = null;
if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), GAUGE, tags);
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, tags)
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), GAUGE);
}
Expand Down Expand Up @@ -907,8 +908,7 @@ public void getRawDataByTag(
@ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
@ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
) {
metricsService.findMetricsWithFilters(getTenant(), GAUGE, tags)
.map(Metric::getMetricId)
metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, tags)
.toList()
.flatMap(metricIds -> TimeAndSortParams.<Double>deferredBuilder(start, end)
.fromEarliest(fromEarliest, metricIds, this::findTimeRange)
Expand Down
Expand Up @@ -212,10 +212,9 @@ public <T> void findMetrics(
Observable<Metric<T>> metricObservable;

if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), metricType, tags);
if (!Strings.isNullOrEmpty(id)) {
metricObservable = metricObservable.filter(metricsService.idFilter(id));
}
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), metricType, tags)
.filter(metricsService.idFilter(id))
.flatMap(metricsService::findMetric);
} else {
if(!Strings.isNullOrEmpty(id)) {
// HWKMETRICS-461
Expand All @@ -226,7 +225,7 @@ public <T> void findMetrics(
String[] ids = id.split("\\|");
metricObservable = Observable.from(ids)
.map(idPart -> new MetricId<>(getTenant(), metricType, idPart))
.flatMap(mId -> metricsService.findMetric(mId));
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), metricType);
}
Expand Down Expand Up @@ -406,49 +405,45 @@ private Observable<Map<String, Map<String, List<? extends BucketPoint>>>> doStat
namedBucketPoints.bucketPoints));
}
} else {
Observable<Metric<Double>> gauges;
Observable<Metric<Long>> counters;
Observable<MetricId<Double>> gauges;
Observable<MetricId<Long>> counters;

if (types.isEmpty()) {
gaugeStats = getGaugeStatsFromTags(bucketsConfig, percentiles, query.getTags());
counterStats = getCounterStatsFromTags(bucketsConfig, percentiles, query.getTags());
availabilityStats = getAvailabilityStatsFromTags(bucketsConfig, query.getTags());
} else {
if (types.contains(GAUGE) && types.contains(GAUGE_RATE)) {
gauges = metricsService.findMetricsWithFilters(getTenant(), GAUGE, query.getTags()).cache();
gauges = metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, query.getTags()).cache();
gaugeStats = gauges.flatMap(gauge ->
metricsService.findGaugeStats(gauge.getMetricId(), bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(gauge.getMetricId().getName(),
metricsService.findGaugeStats(gauge, bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(gauge.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
Observable<MetricId<Double>> ids = gauges.map(Metric::getMetricId);
gaugeRateStats = getRateStats(ids, bucketsConfig, percentiles);
gaugeRateStats = getRateStats(gauges, bucketsConfig, percentiles);
} else if (types.contains(GAUGE)) {
gaugeStats = getGaugeStatsFromTags(bucketsConfig, percentiles, query.getTags());
} else {
gauges = metricsService.findMetricsWithFilters(getTenant(), GAUGE, query.getTags());
Observable<MetricId<Double>> ids = gauges.map(Metric::getMetricId);
gaugeRateStats = getRateStats(ids, bucketsConfig, percentiles);
gauges = metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, query.getTags());
gaugeRateStats = getRateStats(gauges, bucketsConfig, percentiles);
}

if (types.contains(COUNTER) && types.contains(COUNTER_RATE)) {
counters = metricsService.findMetricsWithFilters(getTenant(), COUNTER, query.getTags())
counters = metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER, query.getTags())
.cache();
counterStats = counters.flatMap(counter ->
metricsService.findCounterStats(counter.getMetricId(), bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(counter.getMetricId().getName(),
metricsService.findCounterStats(counter, bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(counter.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
Observable<MetricId<Long>> ids = counters.map(Metric::getMetricId);
counterRateStats = getRateStats(ids, bucketsConfig, percentiles);
counterRateStats = getRateStats(counters, bucketsConfig, percentiles);
} else if (types.contains(COUNTER)) {
counterStats = getCounterStatsFromTags(bucketsConfig, percentiles, query.getTags());
} else {
counters = metricsService.findMetricsWithFilters(getTenant(), COUNTER, query.getTags());
Observable<MetricId<Long>> ids = counters.map(Metric::getMetricId);
counterRateStats = getRateStats(ids, bucketsConfig, percentiles);
counters = metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER, query.getTags());
counterRateStats = getRateStats(counters, bucketsConfig, percentiles);
}

if (types.contains(AVAILABILITY)) {
Expand Down Expand Up @@ -491,35 +486,36 @@ private void checkRequiredParams(StatsQueryRequest query) {

private Observable<Map<String, List<? extends BucketPoint>>> getCounterStatsFromTags(BucketConfig bucketsConfig,
List<Percentile> percentiles, String tags) {
Observable<Metric<Long>> counters = metricsService.findMetricsWithFilters(getTenant(), COUNTER, tags);
Observable<MetricId<Long>> counters = metricsService.findMetricIdentifiersWithFilters(getTenant(), COUNTER,
tags);
return counters.flatMap(counter ->
metricsService.findCounterStats(counter.getMetricId(), bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(counter.getMetricId().getName(),
metricsService.findCounterStats(counter, bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(counter.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
}

private Observable<Map<String, List<? extends BucketPoint>>> getGaugeStatsFromTags(BucketConfig bucketsConfig,
List<Percentile> percentiles, String tags) {
Observable<Metric<Double>> gauges = metricsService.findMetricsWithFilters(getTenant(), GAUGE, tags);
Observable<MetricId<Double>> gauges = metricsService.findMetricIdentifiersWithFilters(getTenant(), GAUGE, tags);
return gauges.flatMap(gauge ->
metricsService.findGaugeStats(gauge.getMetricId(), bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(gauge.getMetricId().getName(),
metricsService.findGaugeStats(gauge, bucketsConfig, percentiles)
.map(bucketPoints -> new NamedBucketPoints<>(gauge.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
}

private Observable<Map<String, List<? extends BucketPoint>>> getAvailabilityStatsFromTags(
BucketConfig bucketsConfig, String tags) {
Observable<Metric<AvailabilityType>> availabilities = metricsService.findMetricsWithFilters(getTenant(),
AVAILABILITY, tags);
Observable<MetricId<AvailabilityType>> availabilities = metricsService.findMetricIdentifiersWithFilters
(getTenant(), AVAILABILITY, tags);
Observable<Map<String, List<? extends BucketPoint>>> availabilityStats;
availabilityStats = availabilities.flatMap(availability -> metricsService.findAvailabilityStats(
availability.getMetricId(), bucketsConfig.getTimeRange().getStart(),
availability, bucketsConfig.getTimeRange().getStart(),
bucketsConfig.getTimeRange().getEnd(), bucketsConfig.getBuckets())
.map(bucketPoints -> new NamedBucketPoints<>(availability.getMetricId().getName(),
.map(bucketPoints -> new NamedBucketPoints<>(availability.getName(),
bucketPoints)))
.collect(HashMap::new, (statsMap, namedBucketPoints) ->
statsMap.put(namedBucketPoints.id, namedBucketPoints.bucketPoints));
Expand Down
Expand Up @@ -73,8 +73,7 @@ <T> Observable<MetricId<T>> findMetricsByNameOrTag(List<String> metricNames, Str
.map(id -> new MetricId<>(getTenant(), type, id));
}

return metricsService.findMetricsWithFilters(getTenant(), type, tags)
.map(Metric::getMetricId);
return metricsService.findMetricIdentifiersWithFilters(getTenant(), type, tags);
}

<T> Observable<TimeRange> findTimeRange(String start, String end, Boolean fromEarliest,
Expand Down
Expand Up @@ -135,7 +135,8 @@ public void getMetrics(

Observable<Metric<String>> metricObservable = null;
if (tags != null) {
metricObservable = metricsService.findMetricsWithFilters(getTenant(), STRING, tags);
metricObservable = metricsService.findMetricIdentifiersWithFilters(getTenant(), STRING, tags)
.flatMap(metricsService::findMetric);
} else {
metricObservable = metricsService.findMetrics(getTenant(), STRING);
}
Expand Down Expand Up @@ -371,8 +372,7 @@ public void getRawDataByTag(
@ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
@ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
) {
metricsService.findMetricsWithFilters(getTenant(), STRING, tags)
.map(Metric::getMetricId)
metricsService.findMetricIdentifiersWithFilters(getTenant(), STRING, tags)
.toList()
.flatMap(metricIds -> TimeAndSortParams.<String>deferredBuilder(start, end)
.fromEarliest(fromEarliest, metricIds, this::findTimeRange)
Expand Down
Expand Up @@ -121,8 +121,7 @@ public Completable call(JobDetails jobDetails) {
Stopwatch stopwatch = Stopwatch.createStarted();
logger.info("Starting compression of timestamps (UTC) between " + startOfSlice + " - " + endOfSlice);

Observable<? extends MetricId<?>> metricIds = metricsService.findAllMetrics()
.map(Metric::getMetricId)
Observable<? extends MetricId<?>> metricIds = metricsService.findAllMetricIdentifiers()
.filter(m -> (m.getType() == GAUGE || m.getType() == COUNTER || m.getType() == AVAILABILITY));

PublishSubject<Metric<?>> subject = PublishSubject.create();
Expand Down
Expand Up @@ -78,7 +78,7 @@ public interface DataAccess {

Observable<ResultSet> dropTempTable(long timestamp);

Observable<Row> findAllMetricsInData();
Observable<Row> findAllMetricIdentifiersInData();

<T> Observable<Integer> insertData(Observable<Metric<T>> metrics);

Expand Down
Expand Up @@ -881,7 +881,7 @@ private Observable<PreparedStatement> getPrepForAllTempTables(TempStatement ts)
}

@Override
public Observable<Row> findAllMetricsInData() {
public Observable<Row> findAllMetricIdentifiersInData() {
return getPrepForAllTempTables(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
.map(PreparedStatement::bind)
.flatMap(b -> rxSession.executeAndFetch(b))
Expand Down
Expand Up @@ -98,7 +98,7 @@ public interface MetricsService {
*/
Observable<Void> createMetric(Metric<?> metric, boolean overwrite);

Observable<Metric<?>> findAllMetrics();
Observable<MetricId<?>> findAllMetricIdentifiers();

<T> Observable<Metric<T>> findMetric(MetricId<T> id);

Expand All @@ -125,9 +125,9 @@ public interface MetricsService {
* @param tenantId The id of the tenant to which the metrics belong
* @param type If type is null, no type filtering is used
* @param tagsQuery If tagsQueries is empty, empty Observable is returned, use findMetrics(tenantId, type) instead
* @return Metric's that are filtered with given conditions
* @return MetricIds that are filtered with given conditions
*/
<T> Observable<Metric<T>> findMetricsWithFilters(String tenantId, MetricType<T> type, String tags);
<T> Observable<MetricId<T>> findMetricIdentifiersWithFilters(String tenantId, MetricType<T> type, String tags);

/**
* Returns distinct tag values for a given tag query (using the same query format as {@link
Expand Down Expand Up @@ -366,7 +366,7 @@ Observable<List<NumericBucketPoint>> findRateStats(MetricId<? extends Number> id
*/
Observable<Metric<?>> insertedDataEvents();

<T> Func1<Metric<T>, Boolean> idFilter(String regexp);
<T> Func1<MetricId<T>, Boolean> idFilter(String regexp);

<T> Observable<Void> updateMetricExpiration(MetricId<T> metric);
}

0 comments on commit 50cd560

Please sign in to comment.