Skip to content

Commit

Permalink
Implementation of statistics computation based on RxJava
Browse files Browse the repository at this point in the history
This is a combination of 8 commits.

The first commit's message is:
Introduce BucketedOutputOperator and children

This is the 2nd commit message:

Update MetricsServiceImpl to use BucketedOutputOperator

This is the 3rd commit message:

Gauges buckets computation without custom operator

This is to show what the process could look like, code for avail has not
been updated, and code for counters not implemented.

In this attempt, no custom operator is used. Instead, N(=number of
buckets) requests for data are sent in parallel. Then the resulting
observables are flat mapped to form an observable of bucket points.

The bucket point observable emits just one value. The value is created
by processing data points as they arrive, instead of buffering
all the points for the bucket in an array.
This has a huge implication on stats computation. Some stats algos are
"storeless", which means you can get the exact result without buffering
the data. Examples are min, max, avg. But some are not, like percentile.

As a consequence, for median and 95th percentile, we need to use
an approximative algorithm, provided by
org.apache.commons.math3.stat.descriptive.rank.PSquarePercentile.
See http://www.cse.wustl.edu/~jain/papers/psqr.htm for a description of
the algorithm.
On the upside, this algorithm allows to compute stats when the number of
data points in the bucket is VERY large, with low memory consumption.
On the downside, the result might look a bit inaccurate when there VERY
few points in the buckets. It's also sensible to order of value
submission.

Note that the observable of bucket points will often be incorrectly
ordered, as there's no guarantee in the order the responses will be processed.

Side work:
- log throwable at TRACE level for 500 responses
- overloaded assertDoubleEquals so that caller can pass a failure
  message

This is the 4th commit message:

Refactor to use #collect for readability

This is the 5th commit message:

Refactor to use  as single request to C*

Sending N requests to C* might be bad wrt performance, especially if N(=number of buckets) is high

This implementation uses #groupBy. Not sure how to make that work with #window

This is the 6th commit message:

Remove unused code

This is the 7th commit message:

Apply logic to avail metrics

This is the 8th commit message:

Final step, refactoring for better readability
  • Loading branch information
tsegismont committed Aug 7, 2015
1 parent 38ff28c commit fba9fff
Show file tree
Hide file tree
Showing 20 changed files with 634 additions and 732 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.hawkular.metrics.api.jaxrs.request.TagRequest;
import org.hawkular.metrics.api.jaxrs.util.ApiUtils;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.BucketedOutput;
import org.hawkular.metrics.core.api.Buckets;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
Expand Down Expand Up @@ -278,9 +277,10 @@ public void findAvailabilityData(
Long startTime = start == null ? now - EIGHT_HOURS : start;
Long endTime = end == null ? now : end;

Metric<AvailabilityType> metric = new Metric<>(new MetricId(tenantId, AVAILABILITY, id));
MetricId metricId = new MetricId(tenantId, AVAILABILITY, id);

if (bucketsCount == null && bucketDuration == null) {
metricsService.findAvailabilityData(metric.getId(), startTime, endTime, distinct)
metricsService.findAvailabilityData(metricId, startTime, endTime, distinct)
.map(AvailabilityDataPoint::new)
.toList()
.map(ApiUtils::collectionToResponse)
Expand All @@ -305,8 +305,7 @@ public void findAvailabilityData(
return;
}

metricsService.findAvailabilityStats(metric, startTime, endTime, buckets).map(
BucketedOutput::getData)
metricsService.findAvailabilityStats(metricId, startTime, endTime, buckets)
.map(ApiUtils::collectionToResponse)
.subscribe(asyncResponse::resume, ApiUtils::serverError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.hawkular.metrics.api.jaxrs.request.MetricDefinition;
import org.hawkular.metrics.api.jaxrs.request.TagRequest;
import org.hawkular.metrics.api.jaxrs.util.ApiUtils;
import org.hawkular.metrics.core.api.BucketedOutput;
import org.hawkular.metrics.core.api.Buckets;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
Expand Down Expand Up @@ -265,30 +264,20 @@ public void findGaugeData(
@ApiParam(value = "Total number of buckets") @QueryParam("buckets") Integer bucketsCount,
@ApiParam(value = "Bucket duration") @QueryParam("bucketDuration") Duration bucketDuration) {

if (bucketsCount == null && bucketDuration == null) {
long now = System.currentTimeMillis();
long startTime = start == null ? now - EIGHT_HOURS : start;
long endTime = end == null ? now : end;
long now = System.currentTimeMillis();
long startTime = start == null ? now - EIGHT_HOURS : start;
long endTime = end == null ? now : end;

metricsService.findGaugeData(new MetricId(tenantId, GAUGE, id), startTime, endTime)
MetricId metricId = new MetricId(tenantId, GAUGE, id);

if (bucketsCount == null && bucketDuration == null) {
metricsService.findGaugeData(metricId, startTime, endTime)
.toList()
.map(ApiUtils::collectionToResponse)
.subscribe(asyncResponse::resume, t -> asyncResponse.resume(ApiUtils.serverError(t)));
} else if (bucketsCount != null && bucketDuration != null) {
asyncResponse.resume(
badRequest(
new ApiError(
"Both buckets and bucketDuration parameters are used"
)
)
);
asyncResponse.resume(badRequest(new ApiError("Both buckets and bucketDuration parameters are used")));
} else {
long now = System.currentTimeMillis();
long startTime = start == null ? now - EIGHT_HOURS : start;
long endTime = end == null ? now : end;

Metric<Double> metric = new Metric<>(new MetricId(tenantId, GAUGE, id));

Buckets buckets;
try {
if (bucketsCount != null) {
Expand All @@ -301,8 +290,7 @@ public void findGaugeData(
return;
}

metricsService.findGaugeStats(metric, startTime, endTime, buckets)
.map(BucketedOutput::getData)
metricsService.findGaugeStats(metricId, startTime, endTime, buckets)
.map(ApiUtils::collectionToResponse)
.subscribe(asyncResponse::resume, t -> asyncResponse.resume(ApiUtils.serverError(t)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.hawkular.metrics.api.jaxrs.util;

import static java.util.stream.Collectors.toList;

import static org.hawkular.metrics.core.api.MetricType.AVAILABILITY;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.GAUGE;
Expand All @@ -27,7 +28,6 @@

import javax.ws.rs.core.Response;

import com.google.common.base.Throwables;
import org.hawkular.metrics.api.jaxrs.ApiError;
import org.hawkular.metrics.api.jaxrs.model.Availability;
import org.hawkular.metrics.api.jaxrs.model.AvailabilityDataPoint;
Expand All @@ -39,12 +39,18 @@
import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;

import rx.Observable;

/**
* @author jsanda
*/
public class ApiUtils {
private static final Logger LOG = LoggerFactory.getLogger(ApiUtils.class);

private ApiUtils() {
}
Expand All @@ -54,6 +60,7 @@ public static Response collectionToResponse(Collection<?> collection) {
}

public static Response serverError(Throwable t, String message) {
LOG.trace("Server error response", t);
String errorMsg = message + ": " + Throwables.getRootCause(t).getMessage();
return Response.serverError().entity(new ApiError(errorMsg)).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,102 +19,70 @@
import static java.lang.Double.NaN;
import static java.lang.Double.isNaN;

import java.util.List;
import java.util.Map;

/**
* Statistics for availability data in a time range.
* {@link BucketPoint} for availability metrics.
*
* @author Thomas Segismont
*/
public class AvailabilityBucketDataPoint {
private long start;
private long end;
private long downtimeDuration;
private long lastDowntime;
private double uptimeRatio;
private long downtimeCount;

public AvailabilityBucketDataPoint(
long start,
long end,
long downtimeDuration,
long lastDowntime,
double uptimeRatio,
long downtimeCount
) {
this.start = start;
this.end = end;
public final class AvailabilityBucketPoint extends BucketPoint {
private final long downtimeDuration;
private final long lastDowntime;
private final double uptimeRatio;
private final long downtimeCount;

protected AvailabilityBucketPoint(long start, long end, long downtimeDuration, long lastDowntime, double
uptimeRatio, long downtimeCount) {
super(start, end);
this.downtimeDuration = downtimeDuration;
this.lastDowntime = lastDowntime;
this.uptimeRatio = uptimeRatio;
this.downtimeCount = downtimeCount;
}

public long getStart() {
return start;
}

public void setStart(long start) {
this.start = start;
}

public long getEnd() {
return end;
}

public void setEnd(long end) {
this.end = end;
}

public long getDowntimeDuration() {
return downtimeDuration;
}

public void setDowntimeDuration(long downtimeDuration) {
this.downtimeDuration = downtimeDuration;
}

public long getLastDowntime() {
return lastDowntime;
}

public void setLastDowntime(long lastDowntime) {
this.lastDowntime = lastDowntime;
}

public double getUptimeRatio() {
return uptimeRatio;
}

public void setUptimeRatio(double uptimeRatio) {
this.uptimeRatio = uptimeRatio;
}

public long getDowntimeCount() {
return downtimeCount;
}

public void setDowntimeCount(long downtimeCount) {
this.downtimeCount = downtimeCount;
}

@Override
public boolean isEmpty() {
return isNaN(uptimeRatio);
}

@Override
public String toString() {
return "AvailabilityBucketDataPoint[" +
"start=" + start +
", end=" + end +
", downtimeDuration=" + downtimeDuration +
", lastDowntime=" + lastDowntime +
", uptimeRatio=" + uptimeRatio +
", downtimeCount=" + downtimeCount +
']';
return "AvailabilityBucketPoint[" +
"start=" + getStart() +
", end=" + getEnd() +
", downtimeDuration=" + downtimeDuration +
", lastDowntime=" + lastDowntime +
", uptimeRatio=" + uptimeRatio +
", downtimeCount=" + downtimeCount +
", isEmpty=" + isEmpty() +
']';
}

/**
* Create {@link AvailabilityBucketDataPoint} instances following the builder pattern.
* @see BucketPoint#toList(Map, Buckets, java.util.function.BiFunction)
*/
public static List<AvailabilityBucketPoint> toList(Map<Long, AvailabilityBucketPoint> pointMap, Buckets buckets) {
return BucketPoint.toList(pointMap, buckets, (start, end) -> new Builder(start, end).build());
}

public static class Builder {
private final long start;
private final long end;
Expand All @@ -126,8 +94,8 @@ public static class Builder {
/**
* Creates a builder for an initially empty instance, configurable with the builder setters.
*
* @param start the start timestamp of this bucket data point
* @param end the end timestamp of this bucket data point
* @param start the start timestamp of this bucket point
* @param end the end timestamp of this bucket point
*/
public Builder(long start, long end) {
this.start = start;
Expand All @@ -154,15 +122,8 @@ public Builder setDowntimeCount(long downtimeCount) {
return this;
}

public AvailabilityBucketDataPoint build() {
return new AvailabilityBucketDataPoint(
start,
end,
downtimeDuration,
lastDowntime,
uptimeRatio,
downtimeCount
);
public AvailabilityBucketPoint build() {
return new AvailabilityBucketPoint(start, end, downtimeDuration, lastDowntime, uptimeRatio, downtimeCount);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.api;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

/**
* Statistics for metric data in a time range (bucket).
*
* @author Thomas Segismont
*/
public abstract class BucketPoint {
private final long start;
private final long end;

protected BucketPoint(long start, long end) {
this.start = start;
this.end = end;
}

/**
* @return start of the time range
*/
public long getStart() {
return start;
}

/**
* @return end of the time range
*/
public long getEnd() {
return end;
}

/**
* @return returns true if there was no data to build this bucket
*/
public abstract boolean isEmpty();

/**
* Converts bucket points indexed by start time into a list, ordered by start time. Blanks will be filled with
* empty bucket points.
*/
public static <T extends BucketPoint> List<T> toList(Map<Long, T> pointMap, Buckets buckets, BiFunction<Long,
Long, T> emptyBucketFactory) {
List<T> result = new ArrayList<>(buckets.getCount());
for (int index = 0; index < buckets.getCount(); index++) {
long from = buckets.getBucketStart(index);
T bucketPoint = pointMap.get(from);
if (bucketPoint == null) {
long to = from + buckets.getStep();
bucketPoint = emptyBucketFactory.apply(from, to);
}
result.add(bucketPoint);
}
return result;
}
}

0 comments on commit fba9fff

Please sign in to comment.