Skip to content

Commit

Permalink
HWKMETRICS-144 Support server-side gauge data aggregate fetch
Browse files Browse the repository at this point in the history
- override findGaugeData with a new signature that can apply one or
  more functions to the fetched data (uses a single fetch)
- provide some built in aggregate functions to be used with the new
  findGaugeData signature out-of-box.
  • Loading branch information
jshaughn committed Jun 20, 2015
1 parent 7b0cbc3 commit 9476913
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 22 deletions.
5 changes: 5 additions & 0 deletions core/metrics-core-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
<artifactId>rxjava</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
</dependency>

<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.api;

import rx.Observable;
import rx.functions.Func1;
import rx.observables.MathObservable;

/**
* Predefined aggregate functions usable with
* {@link MetricsService#findGaugeData(String, MetricId, Long, Long, Func1...).
*
* @author john sanda
* @author jay shaughnessy
*/
public interface Aggregate {
Func1<Observable<DataPoint<Double>>, Observable<Double>> Average = data -> {
return MathObservable.averageDouble(data.map(DataPoint::getValue));
};

Func1<Observable<DataPoint<Double>>, Observable<Double>> Max = data -> {
return MathObservable.max(data.map(DataPoint::getValue));
};

Func1<Observable<DataPoint<Double>>, Observable<Double>> Min = data -> {
return MathObservable.min(data.map(DataPoint::getValue));
};

Func1<Observable<DataPoint<Double>>, Observable<Double>> Sum = data -> {
return MathObservable.sumDouble(data.map(DataPoint::getValue));
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.function.Predicate;

import rx.Observable;
import rx.Observer;
import rx.functions.Func1;

/**
* Interface that defines the functionality of the Metrics Service.
Expand Down Expand Up @@ -92,9 +92,20 @@ public interface MetricsService {

Observable<DataPoint<Double>> findGaugeData(String tenantId, MetricId id, Long start, Long end);

/**
* @param tenantId the tenantId
* @param id the metricId
* @param start time window start, in ms
* @param end time window end, in ms
* @param funcs one or more functions to operate on the fetched gauge data
* @return the <code>Observable</code> emits in the same ordering as <code>funcs<funcs>
* @see Aggregate
*/
<T> Observable<T> findGaugeData(String tenantId, MetricId id, Long start, Long end,
Func1<Observable<DataPoint<Double>>, Observable<T>>... funcs);

Observable<BucketedOutput<GaugeBucketDataPoint>> findGaugeStats(Metric<Double> metric, long start, long end,
Buckets buckets
);
Buckets buckets);

Observable<Void> addAvailabilityData(List<Metric<AvailabilityType>> metrics);

Expand All @@ -104,8 +115,7 @@ Observable<DataPoint<AvailabilityType>> findAvailabilityData(String tenantId, Me
boolean distinct);

Observable<BucketedOutput<AvailabilityBucketDataPoint>> findAvailabilityStats(Metric<AvailabilityType> metric,
long start, long end, Buckets buckets
);
long start, long end, Buckets buckets);

/** Check if a metric with the passed {id} has been stored in the system */
Observable<Boolean> idExists(String id);
Expand Down Expand Up @@ -167,5 +177,5 @@ Observable<Map<MetricId, Set<DataPoint<AvailabilityType>>>> findAvailabilityByTa
* the predicate matches, and the second element is the end time inclusive for which the predicate matches.
*/
Observable<List<long[]>> getPeriods(String tenantId, MetricId id, Predicate<Double> predicate, long start,
long end);
long end);
}
5 changes: 0 additions & 5 deletions core/metrics-core-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,22 @@ public Observable<DataPoint<Double>> findGaugeData(String tenantId, MetricId id,
.map(Functions::getGaugeDataPoint));
}

@Override
public <T> Observable<T> findGaugeData(String tenantId, MetricId id, Long start, Long end,
Func1<Observable<DataPoint<Double>>, Observable<T>>... funcs) {

Observable<DataPoint<Double>> dataCache = findGaugeData(tenantId, id, start, end).cache();
Observable<T> result = null;

for (Func1<Observable<DataPoint<Double>>, Observable<T>> func : funcs) {
result = (null == result) ?
func.call(dataCache) :
result.concatWith(func.call(dataCache));
}

return result;
}

@Override
public Observable<BucketedOutput<GaugeBucketDataPoint>> findGaugeStats(
Metric<Double> metric, long start, long end, Buckets buckets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;

import static org.hawkular.metrics.core.api.AvailabilityType.DOWN;
import static org.hawkular.metrics.core.api.AvailabilityType.UNKNOWN;
import static org.hawkular.metrics.core.api.AvailabilityType.UP;
Expand All @@ -45,17 +46,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.hawkular.metrics.core.api.Aggregate;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Interval;
Expand All @@ -70,6 +61,19 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

import rx.Observable;

/**
Expand Down Expand Up @@ -342,6 +346,34 @@ public void addAndFetchCounterData() throws Exception {
assertMetricIndexMatches(tenantId, COUNTER, singletonList(counter));
}

@Test
public void addAndFetchGaugeDataAggregates() throws Exception {
DateTime start = now().minusMinutes(30);
DateTime end = start.plusMinutes(20);
String tenantId = "t1";

metricsService.createTenant(new Tenant(tenantId)).toBlocking().lastOrDefault(null);

Metric<Double> m1 = new Metric<>(tenantId, GAUGE, new MetricId("m1"), asList(
new DataPoint<>(start.getMillis(), 10.0),
new DataPoint<>(start.plusMinutes(2).getMillis(), 20.0),
new DataPoint<>(start.plusMinutes(4).getMillis(), 30.0),
new DataPoint<>(end.getMillis(), 40.0)
));

Observable<Void> insertObservable = metricsService.addGaugeData(Observable.just(m1));
insertObservable.toBlocking().lastOrDefault(null);

Observable<Double> observable = metricsService
.findGaugeData(tenantId, new MetricId("m1"), start.getMillis(), (end.getMillis() + 1000),
Aggregate.Min, Aggregate.Max, Aggregate.Average, Aggregate.Sum);
List<Double> actual = toList(observable);
List<Double> expected = asList(10.0, 40.0, 25.0, 100.0);

assertEquals(actual, expected, "The data does not match the expected values");
assertMetricIndexMatches("t1", GAUGE, singletonList(m1));
}

@Test
public void verifyTTLsSetOnGaugeData() throws Exception {
DateTime start = now().minusMinutes(10);
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
<antlr.version>4.5</antlr.version>
<rxjava.version>1.0.10</rxjava.version>
<dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
<rxjava-math.version>1.0.0</rxjava-math.version>

<findbugs.version>3.0.0</findbugs.version>

Expand All @@ -131,6 +132,11 @@
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
<version>${rxjava-math.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down

0 comments on commit 9476913

Please sign in to comment.