Skip to content

Commit

Permalink
flatMap to the rescue and some javadocs
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Jun 22, 2015
1 parent 9a22c77 commit 02bb2e6
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,29 @@
* @author jay shaughnessy
*/
public interface Aggregate {
Func1<Observable<DataPoint<Double>>, Observable<Double>> Average = data -> {
return MathObservable.averageDouble(data.map(DataPoint::getValue));
};

Func1<Observable<DataPoint<Double>>, Observable<Double>> Max = data -> {
return MathObservable.max(data.map(DataPoint::getValue));
};
/**
* A function that emits a single item, the average of {@link DataPoint data points} as a double.
*/
Func1<Observable<DataPoint<Double>>, Observable<Double>> Average = data ->
MathObservable.averageDouble(data.map(DataPoint::getValue));

Func1<Observable<DataPoint<Double>>, Observable<Double>> Min = data -> {
return MathObservable.min(data.map(DataPoint::getValue));
};
/**
* A function that emits a single item, the max of {@link DataPoint data points} as a double.
*/
Func1<Observable<DataPoint<Double>>, Observable<Double>> Max = data ->
MathObservable.max(data.map(DataPoint::getValue));

/**
* A function that emits a single item, the min of {@link DataPoint data points} as a double.
*/
Func1<Observable<DataPoint<Double>>, Observable<Double>> Min = data ->
MathObservable.min(data.map(DataPoint::getValue));

/**
* A function that emits a single item, the sum of {@link DataPoint data points} as a double.
*/
Func1<Observable<DataPoint<Double>>, Observable<Double>> Sum = data ->
MathObservable.sumDouble(data.map(DataPoint::getValue));

Func1<Observable<DataPoint<Double>>, Observable<Double>> Sum = data -> {
return MathObservable.sumDouble(data.map(DataPoint::getValue));
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,27 @@ public interface MetricsService {

Observable<Void> addGaugeData(Observable<Metric<Double>> gaugeObservable);

/**
* Fetches data points for a gauge metric.
*
* @param tenantId The tenant to which the metric belongs
* @param id The metric name
* @param start The start time inclusive as aUnix timestamp in milliseconds
* @param end The end time exclusive as a Unix timestamp in milliseconds
* @return an {@link Observable} that emits {@link DataPoint data points}
*/
Observable<DataPoint<Double>> findGaugeData(String tenantId, MetricId id, Long start, Long end);

/**
* @param tenantId the tenantId
* @param id the metricId
* @param start time window start, in ms
* @param end time window end, in ms
* This method applies one or more functions to an Observable that emits data points of a gauge metric. The data
* points Observable is asynchronous. The functions however, are applied serially in the order specified.
*
* @param tenantId The tenant to which the metric belongs
* @param id The metric name
* @param start The start time inclusive as a Unix timestamp in milliseconds
* @param end The end time exclusive as a Unix timestamp in milliseconds
* @param funcs one or more functions to operate on the fetched gauge data
* @return the <code>Observable</code> emits in the same ordering as <code>funcs<funcs>
* @return An {@link Observable} that emits the results with the same ordering as <code>funcs<code>
* @see Aggregate
*/
<T> Observable<T> findGaugeData(String tenantId, MetricId id, Long start, Long end,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,15 +597,7 @@ public <T> Observable<T> findGaugeData(String tenantId, MetricId id, Long start,
Func1<Observable<DataPoint<Double>>, Observable<T>>... funcs) {

Observable<DataPoint<Double>> dataCache = findGaugeData(tenantId, id, start, end).cache();
Observable<T> result = null;

for (Func1<Observable<DataPoint<Double>>, Observable<T>> func : funcs) {
result = (null == result) ?
func.call(dataCache) :
result.concatWith(func.call(dataCache));
}

return result;
return Observable.from(funcs).flatMap(fn -> fn.call(dataCache));
}

@Override
Expand Down

0 comments on commit 02bb2e6

Please sign in to comment.