Skip to content

Commit

Permalink
[HWKMETRICS-292] initial support for aggregating data across multiple…
Browse files Browse the repository at this point in the history
… metrics

This commit also includes a change to NumericDataPointCollector to facilitate
testing. The order of input of values will effect the results of percentiles.
This makes testing the new API difficult because we are fetching data points
from multiple metrics concurrently which means the order of input is
non-deterministic. The test hook allows for using a different implementation in
which the order of inputs does not matter.
  • Loading branch information
John Sanda committed Oct 2, 2015
1 parent ae6d0db commit e8c1531
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,47 @@ public Response findGaugeData(
}
}

@GET
@Path("/data")
public Response findGaugeData(
@QueryParam("start") final Long start,
@QueryParam("end") final Long end,

This comment has been minimized.

Copy link
@stefannegrea

stefannegrea Oct 5, 2015

Contributor

Can we add support for a list of metric ids too? Not sure if we should do it one or the other; or obtain a list of metric based on both. But I see a use case where somebody wants to run this over a predefined list of metric ids.

@QueryParam("tags") Tags tags,
@QueryParam("buckets") Integer bucketsCount,
@QueryParam("bucketDuration") Duration bucketDuration) {

long now = System.currentTimeMillis();
long startTime = start == null ? now - EIGHT_HOURS : start;
long endTime = end == null ? now : end;

if (bucketsCount == null && bucketDuration == null) {
return badRequest(new ApiError("Not supported!"));
} else if (bucketsCount != null && bucketDuration != null) {
return badRequest(new ApiError("Both buckets and bucketDuration parameters are used"));
} else {
Buckets buckets;
try {
if (bucketsCount != null) {
buckets = Buckets.fromCount(startTime, endTime, bucketsCount);
} else {
buckets = Buckets.fromStep(startTime, endTime, bucketDuration.toMillis());
}
} catch (IllegalArgumentException e) {
return badRequest(new ApiError("Bucket: " + e.getMessage()));
}

try {
return metricsService
.findGaugeStats(tenantId, tags.getTags(), startTime, endTime, buckets)
.map(ApiUtils::collectionToResponse)
.toBlocking()
.lastOrDefault(null);
} catch (Exception e) {
return ApiUtils.serverError(e);
}
}
}

@GET
@Produces(APPLICATION_JSON)
@Path("/{id}/periods")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,41 @@ public void findGaugeData(
}
}

@GET
@Path("/data")
public void findGaugeData(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Defaults to now - 8 hours") @QueryParam("start") final Long start,
@ApiParam(value = "Defaults to now") @QueryParam("end") final Long end,
@ApiParam(value = "List of tags filters", required = false) @QueryParam("tags") Tags tags,
@ApiParam(value = "Total number of buckets") @QueryParam("buckets") Integer bucketsCount,
@ApiParam(value = "Bucket duration") @QueryParam("bucketDuration") Duration bucketDuration) {

long now = System.currentTimeMillis();
long startTime = start == null ? now - EIGHT_HOURS : start;
long endTime = end == null ? now : end;

if (bucketsCount == null && bucketDuration == null) {
asyncResponse.resume(badRequest(new ApiError("Not supported!")));
} else if (bucketsCount != null && bucketDuration != null) {
asyncResponse.resume(badRequest(new ApiError("Both buckets and bucketDuration parameters are used")));
} else {
Buckets buckets;
try {
if (bucketsCount != null) {
buckets = Buckets.fromCount(startTime, endTime, bucketsCount);
} else {
buckets = Buckets.fromStep(startTime, endTime, bucketDuration.toMillis());
}
metricsService.findGaugeStats(tenantId, tags.getTags(), startTime, endTime, buckets)
.map(ApiUtils::collectionToResponse)
.subscribe(asyncResponse::resume, t -> asyncResponse.resume(ApiUtils.serverError(t)));
} catch (IllegalArgumentException e) {
asyncResponse.resume(badRequest(new ApiError("Bucket: " + e.getMessage())));
}
}
}

@GET
@Path("/{id}/periods")
@ApiOperation(value = "Retrieve periods for which the condition holds true for each consecutive data point.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ <T> Observable<Metric<T>> findMetricsWithFilters(String tenantId, Map<String, St
<T> Observable<T> findGaugeData(MetricId<Double> id, Long start, Long end,
Func1<Observable<DataPoint<Double>>, Observable<T>>... funcs);

Observable<List<NumericBucketPoint>> findGaugeStats(MetricId<Double> metricId, long start, long end, Buckets
buckets);
Observable<List<NumericBucketPoint>> findGaugeStats(MetricId<Double> metricId, long start, long end,
Buckets buckets);

Observable<List<NumericBucketPoint>> findGaugeStats(String tenantId, Map<String, String> tagFilters, long start,
long end, Buckets buckets);

Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<AvailabilityType> id, long start, long end,
boolean distinct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,13 @@ public Observable<List<NumericBucketPoint>> findGaugeStats(MetricId<Double> metr
return bucketize(findDataPoints(metricId, start, end), buckets);
}

@Override
public Observable<List<NumericBucketPoint>> findGaugeStats(String tenantId, Map<String, String> tagFilters,
long start, long end, Buckets buckets) {
return bucketize(findMetricsWithFilters(tenantId, tagFilters, GAUGE)
.flatMap(metric -> findDataPoints(metric.getId(), start, end)), buckets);
}

private Observable<List<NumericBucketPoint>> bucketize(Observable<? extends DataPoint<? extends Number>> dataPoints,
Buckets buckets) {
return dataPoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.hawkular.metrics.core.impl;

import java.util.function.Function;

import org.apache.commons.math3.stat.descriptive.moment.Mean;
import org.apache.commons.math3.stat.descriptive.rank.Max;
import org.apache.commons.math3.stat.descriptive.rank.Min;
Expand All @@ -30,14 +32,36 @@
* @author Thomas Segismont
*/
final class NumericDataPointCollector {

/**
* This is a test hook. See {@link Percentile} for details.
*/
static Function<Double, Percentile> createPercentile = p -> new Percentile() {

PSquarePercentile percentile = new PSquarePercentile(p);

// Note that these methods need to be synchronized because PSquarePecentile is not synchronized and we need to
// support concurrent access.

@Override
public synchronized void addValue(double value) {
percentile.increment(value);
}

@Override
public synchronized double getResult() {
return percentile.getResult();
}
};

private final Buckets buckets;
private final int bucketIndex;

private Min min = new Min();
private Mean average = new Mean();
private PSquarePercentile median = new PSquarePercentile(50.0);
private Percentile median = createPercentile.apply(50.0);
private Max max = new Max();
private PSquarePercentile percentile95th = new PSquarePercentile(95.0);
private Percentile percentile95th = createPercentile.apply(95.0);

NumericDataPointCollector(Buckets buckets, int bucketIndex) {
this.buckets = buckets;
Expand All @@ -48,9 +72,9 @@ void increment(DataPoint<? extends Number> dataPoint) {
Number value = dataPoint.getValue();
min.increment(value.doubleValue());
average.increment(value.doubleValue());
median.increment(value.doubleValue());
median.addValue(value.doubleValue());
max.increment(value.doubleValue());
percentile95th.increment(value.doubleValue());
percentile95th.addValue(value.doubleValue());
}

NumericBucketPoint toBucketPoint() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl;

/**
* We use the org.apache.commons.math3.stat.descriptive.rank.PSquarePercentile class for computing percentiles. The
* order in which values are added to PSquarePercentile can effect the results. This can make automated testing
* difficult in scenarios in which values are added from different threads concurrently. For those scenarios
* org.apache.commons.math3.stat.descriptive.rank.Percentile works better as it stores all values in memory and the
* order in which it consumes values does not effect the result. Unfortunately, these two classes do not share a common
* interface that we can use, so interface is used to facilitate testing.
*
* @author jsanda
*/
public interface Percentile {

void addValue(double value);

double getResult();

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void initSession() {
.addContactPoints(nodeAddresses.split(","))
// Due to JAVA-500 and JAVA-509 we need to explicitly set the protocol to V3.
// These bugs are fixed upstream and will be in version 2.1.3 of the driver.
.withProtocolVersion(ProtocolVersion.V3)
.withProtocolVersion(ProtocolVersion.V4)
.build();
session = cluster.connect(getKeyspace());
rxSession = new RxSessionImpl(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.PatternSyntaxException;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -98,13 +99,17 @@ public class MetricsServiceITest extends MetricsITest {

private DateTimeService dateTimeService;

private Function<Double, Percentile> defaultCreatePercentile;

@BeforeClass
public void initClass() {
initSession();

dataAccess = new DataAccessImpl(session);
dateTimeService = new DateTimeService();

defaultCreatePercentile = NumericDataPointCollector.createPercentile;

metricsService = new MetricsServiceImpl();
metricsService.setDataAccess(dataAccess);
metricsService.setTaskScheduler(new FakeTaskScheduler());
Expand Down Expand Up @@ -135,6 +140,7 @@ public void initMethod() {
session.execute("TRUNCATE metrics_tags_idx");
session.execute("TRUNCATE tenants_by_time");
metricsService.setDataAccess(dataAccess);
NumericDataPointCollector.createPercentile = defaultCreatePercentile;
}

@Test
Expand Down Expand Up @@ -690,6 +696,89 @@ public void findRateStats() {
assertNumericBucketsEquals(actual, expected);
}

@Test
public void findGaugeStatsByTags() {
NumericDataPointCollector.createPercentile = p -> new Percentile() {

List<Double> values = new ArrayList<>();
org.apache.commons.math3.stat.descriptive.rank.Percentile percentile =
new org.apache.commons.math3.stat.descriptive.rank.Percentile(p);

@Override public void addValue(double value) {
values.add(value);
}

@Override public double getResult() {
org.apache.commons.math3.stat.descriptive.rank.Percentile percentile =
new org.apache.commons.math3.stat.descriptive.rank.Percentile(p);
double[] array = new double[values.size()];
for (int i = 0; i < array.length; ++i) {
array[i] = values.get(i++);
}
percentile.setData(array);

return percentile.getQuantile();
}
};

String tenantId = "findGaugeStatsByTags";
DateTime start = now().minusMinutes(10);

Metric<Double> m1 = new Metric<>(new MetricId<>(tenantId, GAUGE, "M1"), asList(
new DataPoint<>(start.getMillis(), 12.23),
new DataPoint<>(start.plusMinutes(1).getMillis(), 9.745),
new DataPoint<>(start.plusMinutes(2).getMillis(), 14.01),
new DataPoint<>(start.plusMinutes(3).getMillis(), 16.18),
new DataPoint<>(start.plusMinutes(4).getMillis(), 18.94)
));
doAction(() -> metricsService.addDataPoints(GAUGE, Observable.just(m1)));
doAction(() -> metricsService.addTags(m1, ImmutableMap.of("type", "cpu_usage", "node", "server1")));

Metric<Double> m2 = new Metric<>(new MetricId<>(tenantId, GAUGE, "M2"), asList(
new DataPoint<>(start.getMillis(), 15.47),
new DataPoint<>(start.plusMinutes(1).getMillis(), 8.08),
new DataPoint<>(start.plusMinutes(2).getMillis(), 14.39),
new DataPoint<>(start.plusMinutes(3).getMillis(), 17.76),
new DataPoint<>(start.plusMinutes(4).getMillis(), 17.502)
));
doAction(() -> metricsService.addDataPoints(GAUGE, Observable.just(m2)));
doAction(() -> metricsService.addTags(m2, ImmutableMap.of("type", "cpu_usage", "node", "server2")));

Metric<Double> m3 = new Metric<>(new MetricId<>(tenantId, GAUGE, "M3"), asList(
new DataPoint<>(start.getMillis(), 11.456),
new DataPoint<>(start.plusMinutes(1).getMillis(), 18.32)
));
doAction(() -> metricsService.addDataPoints(GAUGE, Observable.just(m3)));
doAction(() -> metricsService.addTags(m3, ImmutableMap.of("type", "cpu_usage", "node", "server3")));

Buckets buckets = Buckets.fromCount(start.getMillis(), start.plusMinutes(5).getMillis(), 1);
Map<String, String> tagFilters = ImmutableMap.of("type", "cpu_usage", "node", "server1|server2");

List<List<NumericBucketPoint>> actual = getOnNextEvents(() -> metricsService.findGaugeStats(tenantId,
tagFilters, start.getMillis(), start.plusMinutes(5).getMillis(), buckets));

assertEquals(actual.size(), 1);

NumericBucketPoint.Builder builder = new NumericBucketPoint.Builder(start.getMillis(),
start.plusMinutes(5).getMillis());
m1.getDataPoints().forEach(dataPoint -> updateBuilder(builder, dataPoint));

List<NumericBucketPoint> expected = getOnNextEvents(() ->
Observable.concat(Observable.from(m1.getDataPoints()), Observable.from(m2.getDataPoints()))
.collect(() -> new NumericDataPointCollector(buckets, 0), NumericDataPointCollector::increment)
.map(NumericDataPointCollector::toBucketPoint));

assertNumericBucketsEquals(actual.get(0), expected);
}

private static void updateBuilder(NumericBucketPoint.Builder builder, DataPoint<Double> dataPoint) {
builder.setAvg(dataPoint.getValue())
.setMax(dataPoint.getValue())
.setMin(dataPoint.getValue())
.setMedian(dataPoint.getValue())
.setPercentile95th(dataPoint.getValue());
}

private static void assertNumericBucketsEquals(List<NumericBucketPoint> actual, List<NumericBucketPoint> expected) {
String msg = String.format("%nExpected:%n%s%nActual:%n%s%n", expected, actual);
assertEquals(actual.size(), expected.size(), msg);
Expand Down

0 comments on commit e8c1531

Please sign in to comment.