diff --git a/core/metrics-core-service/src/main/java/org/hawkular/metrics/core/service/MetricsServiceImpl.java b/core/metrics-core-service/src/main/java/org/hawkular/metrics/core/service/MetricsServiceImpl.java index b937816d9..e507bc2d6 100644 --- a/core/metrics-core-service/src/main/java/org/hawkular/metrics/core/service/MetricsServiceImpl.java +++ b/core/metrics-core-service/src/main/java/org/hawkular/metrics/core/service/MetricsServiceImpl.java @@ -827,45 +827,13 @@ public Observable> findNumericStats( checkArgument(metricType == GAUGE || metricType == GAUGE_RATE || metricType == COUNTER || metricType == COUNTER_RATE, "Invalid metric type: %s", metricType); - if (!stacked) { - if (COUNTER == metricType || GAUGE == metricType) { - return findMetricsWithFilters(tenantId, metricType, tagFilters) - .flatMap(metric -> findDataPoints(metric.getMetricId(), start, end, 0, Order.DESC)) - .compose(new NumericBucketPointTransformer(buckets, percentiles)); - } else { - MetricType mtype = metricType == GAUGE_RATE ? GAUGE : COUNTER; - return findMetricsWithFilters(tenantId, mtype, tagFilters) - .flatMap(metric -> findRateData(metric.getMetricId(), start, end, 0, ASC)) - .compose(new NumericBucketPointTransformer(buckets, percentiles)); - } - } else { - Observable> individualStats; - if (COUNTER == metricType || GAUGE == metricType) { - individualStats = findMetricsWithFilters(tenantId, metricType, tagFilters) - .map(metric -> { - return findDataPoints(metric.getMetricId(), start, end, 0, Order.DESC) - .compose(new NumericBucketPointTransformer(buckets, percentiles)) - .flatMap(Observable::from); - }); - } else { - MetricType mtype = metricType == GAUGE_RATE ? GAUGE : COUNTER; - individualStats = findMetricsWithFilters(tenantId, mtype, tagFilters) - .map(metric -> { - return findRateData(metric.getMetricId(), start, end, 0, ASC) - .compose(new NumericBucketPointTransformer(buckets, percentiles)) - .flatMap(Observable::from); - }); - } + final boolean isRate = (COUNTER != metricType && GAUGE != metricType); + final MetricType mType = isRate ? (metricType == GAUGE_RATE ? GAUGE : COUNTER) : metricType; + final Observable> metricIds = findMetricsWithFilters(tenantId, mType, tagFilters) + .map(Metric::getMetricId); - return Observable.merge(individualStats) - .groupBy(BucketPoint::getStart) - .flatMap(group -> group.collect(SumNumericBucketPointCollector::new, - SumNumericBucketPointCollector::increment)) - .map(SumNumericBucketPointCollector::toBucketPoint) - .toMap(NumericBucketPoint::getStart) - .map(pointMap -> NumericBucketPoint.toList(pointMap, buckets)); - } - }; + return findNumericStats(metricIds, start, end, buckets, percentiles, stacked, isRate); + } @Override public Observable> findNumericStats(String tenantId, @@ -876,37 +844,42 @@ public Observable> findNumericStats( checkArgument(metricType == GAUGE || metricType == GAUGE_RATE || metricType == COUNTER || metricType == COUNTER_RATE, "Invalid metric type: %s", metricType); + final boolean isRate = (COUNTER != metricType && GAUGE != metricType); + final MetricType mType = isRate ? (metricType == GAUGE_RATE ? GAUGE : COUNTER) : metricType; + final Observable> metricIds = Observable.from(metrics) + .map(name -> new MetricId<>(tenantId, mType, name)); + + return findNumericStats(metricIds, start, end, buckets, percentiles, stacked, isRate); + } + + private Observable> findNumericStats( + Observable> metrics, long start, long end, Buckets buckets, List + percentiles, boolean stacked, boolean isRate) { + if (!stacked) { - if (COUNTER == metricType || GAUGE == metricType) { - return Observable.from(metrics) - .flatMap(metricName -> findDataPoints(new MetricId<>(tenantId, metricType, metricName), start, - end, 0, Order.DESC)) + if (!isRate) { + return metrics + .flatMap(metricId -> findDataPoints(metricId, start, end, 0, Order.DESC)) .compose(new NumericBucketPointTransformer(buckets, percentiles)); } else { - MetricType mtype = metricType == GAUGE_RATE ? GAUGE : COUNTER; - return Observable.from(metrics) - .flatMap(metricName -> findRateData(new MetricId<>(tenantId, mtype, metricName), start, end, 0, - ASC)) + return metrics + .flatMap(metricId -> findRateData(metricId, start, end, 0, ASC)) .compose(new NumericBucketPointTransformer(buckets, percentiles)); } } else { Observable> individualStats; - if (COUNTER == metricType || GAUGE == metricType) { - individualStats = Observable.from(metrics) - .map(metricName -> { - return findDataPoints(new MetricId<>(tenantId, metricType, metricName), start, end, 0, - Order.DESC) - .compose(new NumericBucketPointTransformer(buckets, percentiles)) - .flatMap(Observable::from); - }); + if (!isRate) { + individualStats = metrics.map(metricId -> { + return findDataPoints(metricId, start, end, 0, Order.DESC) + .compose(new NumericBucketPointTransformer(buckets, percentiles)) + .flatMap(Observable::from); + }); } else { - MetricType mtype = metricType == GAUGE_RATE ? GAUGE : COUNTER; - individualStats = Observable.from(metrics) - .map(metricName -> { - return findRateData(new MetricId<>(tenantId, mtype, metricName), start, end, 0, ASC) - .compose(new NumericBucketPointTransformer(buckets, percentiles)) - .flatMap(Observable::from); - }); + individualStats = metrics.map(metricId -> { + return findRateData(metricId, start, end, 0, ASC) + .compose(new NumericBucketPointTransformer(buckets, percentiles)) + .flatMap(Observable::from); + }); } return Observable.merge(individualStats)