Skip to content

Commit

Permalink
Enforce metric type in #addDataPoints
Browse files Browse the repository at this point in the history
  • Loading branch information
tsegismont committed Aug 25, 2015
1 parent e861996 commit e145d97
Show file tree
Hide file tree
Showing 15 changed files with 95 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public Response addAvailabilityForMetric(
requestToAvailabilityDataPoints(data));

try {
metricsService.addDataPoints(Observable.just(metric)).toBlocking().lastOrDefault(null);
metricsService.addDataPoints(AVAILABILITY, Observable.just(metric)).toBlocking().lastOrDefault(null);
return Response.ok().build();
} catch (Exception e) {
return serverError(e);
Expand All @@ -250,7 +250,7 @@ public Response addAvailabilityData(
List<Availability> availabilities
) {
try {
metricsService.addDataPoints(requestToAvailabilities(tenantId, availabilities)).toBlocking()
metricsService.addDataPoints(AVAILABILITY, requestToAvailabilities(tenantId, availabilities)).toBlocking()
.lastOrDefault(null);
return Response.ok().build();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public Response addData(@ApiParam(value = "List of metrics", required = true) Li
) {
Observable<Metric<Long>> metrics = requestToCounters(tenantId, counters);
try {
metricsService.addDataPoints(metrics).toBlocking().lastOrDefault(null);
metricsService.addDataPoints(COUNTER, metrics).toBlocking().lastOrDefault(null);
return Response.ok().build();
} catch (Exception e) {
return ApiUtils.serverError(e);
Expand All @@ -174,7 +174,7 @@ public Response addData(
) {
Metric<Long> metric = new Metric<>(new MetricId<>(tenantId, COUNTER, id), requestToCounterDataPoints(data));
try {
metricsService.addDataPoints(Observable.just(metric)).toBlocking().lastOrDefault(null);
metricsService.addDataPoints(COUNTER, Observable.just(metric)).toBlocking().lastOrDefault(null);
return Response.ok().build();
} catch (Exception e) {
return ApiUtils.serverError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public Response addDataForMetric(
) {
Metric<Double> metric = new Metric<>(new MetricId<>(tenantId, GAUGE, id), requestToGaugeDataPoints(data));
try {
metricsService.addDataPoints(Observable.just(metric)).toBlocking().lastOrDefault(null);
metricsService.addDataPoints(GAUGE, Observable.just(metric)).toBlocking().lastOrDefault(null);
return Response.ok().build();
} catch (Exception e) {
return ApiUtils.serverError(e);
Expand All @@ -240,7 +240,7 @@ public Response addGaugeData(@ApiParam(value = "List of metrics", required = tru
) {
Observable<Metric<Double>> metrics = requestToGauges(tenantId, gauges);
try {
metricsService.addDataPoints(metrics).toBlocking().lastOrDefault(null);
metricsService.addDataPoints(GAUGE, metrics).toBlocking().lastOrDefault(null);
return Response.ok().build();
} catch (Exception e) {
return ApiUtils.serverError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.requestToAvailabilities;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.requestToCounters;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.requestToGauges;
import static org.hawkular.metrics.core.api.MetricType.AVAILABILITY;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.GAUGE;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -144,18 +147,18 @@ public Response addMetricsData(
Collection<Observable<Void>> observables = new ArrayList<>();
if (gauges != null && !gauges.isEmpty()) {
gauges = ImmutableList.copyOf(gauges);
observables.add(metricsService
.addDataPoints(requestToGauges(tenantId, gauges).subscribeOn(Schedulers.computation())));
observables.add(metricsService.addDataPoints(GAUGE, requestToGauges(tenantId, gauges)
.subscribeOn(Schedulers.computation())));
}
if (counters != null && !counters.isEmpty()) {
counters = ImmutableList.copyOf(counters);
observables.add(metricsService
.addDataPoints(requestToCounters(tenantId, counters).subscribeOn(Schedulers.computation())));
observables.add(metricsService.addDataPoints(COUNTER, requestToCounters(tenantId, counters)
.subscribeOn(Schedulers.computation())));
}
if (availabilities != null && !availabilities.isEmpty()) {
availabilities = ImmutableList.copyOf(availabilities);
observables.add(metricsService
.addDataPoints(requestToAvailabilities(tenantId, ImmutableList.copyOf(availabilities))
.addDataPoints(AVAILABILITY, requestToAvailabilities(tenantId, availabilities)
.subscribeOn(Schedulers.computation())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ public void addAvailabilityForMetric(
) {
Metric<AvailabilityType> metric = new Metric<>(new MetricId<>(tenantId, AVAILABILITY, id),
requestToAvailabilityDataPoints(data));
metricsService.addDataPoints(Observable.just(metric)).subscribe(new ResultSetObserver(asyncResponse));
metricsService.addDataPoints(AVAILABILITY, Observable.just(metric))
.subscribe(new ResultSetObserver(asyncResponse));
}

@POST
Expand All @@ -219,7 +220,7 @@ public void addAvailabilityData(
@ApiParam(value = "List of availability metrics", required = true)
List<Availability> availabilities
) {
metricsService.addDataPoints(requestToAvailabilities(tenantId, availabilities))
metricsService.addDataPoints(AVAILABILITY, requestToAvailabilities(tenantId, availabilities))
.subscribe(new ResultSetObserver(asyncResponse));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void addData(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "List of metrics", required = true) List<Counter> counters
) {
Observable<Metric<Long>> metrics = requestToCounters(tenantId, counters);
Observable<Void> observable = metricsService.addDataPoints(metrics);
Observable<Void> observable = metricsService.addDataPoints(COUNTER, metrics);
observable.subscribe(new ResultSetObserver(asyncResponse));
}

Expand All @@ -164,7 +164,7 @@ public void addData(
) {
Metric<Long> metric = new Metric<>(new MetricId<>(tenantId, COUNTER, id),
requestToCounterDataPoints(data));
Observable<Void> observable = metricsService.addDataPoints(Observable.just(metric));
Observable<Void> observable = metricsService.addDataPoints(COUNTER, Observable.just(metric));
observable.subscribe(new ResultSetObserver(asyncResponse));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void addDataForMetric(
List<GaugeDataPoint> data
) {
Metric<Double> metric = new Metric<>(new MetricId<>(tenantId, GAUGE, id), requestToGaugeDataPoints(data));
Observable<Void> observable = metricsService.addDataPoints(Observable.just(metric));
Observable<Void> observable = metricsService.addDataPoints(GAUGE, Observable.just(metric));
observable.subscribe(new ResultSetObserver(asyncResponse));
}

Expand All @@ -215,7 +215,7 @@ public void addGaugeData(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "List of metrics", required = true) List<Gauge> gauges
) {
Observable<Metric<Double>> metrics = requestToGauges(tenantId, gauges);
Observable<Void> observable = metricsService.addDataPoints(metrics);
Observable<Void> observable = metricsService.addDataPoints(GAUGE, metrics);
observable.subscribe(new ResultSetObserver(asyncResponse));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.requestToAvailabilities;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.requestToCounters;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.requestToGauges;
import static org.hawkular.metrics.core.api.MetricType.AVAILABILITY;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.GAUGE;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -147,19 +150,18 @@ public void addMetricsData(
Collection<Observable<Void>> observables = new ArrayList<>();
if (gauges != null && !gauges.isEmpty()) {
gauges = ImmutableList.copyOf(gauges);
observables.add(metricsService
.addDataPoints(requestToGauges(tenantId, gauges).subscribeOn(Schedulers.computation())));
observables.add(metricsService.addDataPoints(GAUGE, requestToGauges(tenantId, gauges)
.subscribeOn(Schedulers.computation())));
}
if (counters != null && !counters.isEmpty()) {
counters = ImmutableList.copyOf(counters);
observables.add(metricsService
.addDataPoints(requestToCounters(tenantId, counters).subscribeOn(Schedulers.computation())));
observables.add(metricsService.addDataPoints(COUNTER, requestToCounters(tenantId, counters)
.subscribeOn(Schedulers.computation())));
}
if (availabilities != null && !availabilities.isEmpty()) {
availabilities = ImmutableList.copyOf(availabilities);
observables.add(metricsService
.addDataPoints(requestToAvailabilities(tenantId, ImmutableList.copyOf(availabilities))
.subscribeOn(Schedulers.computation())));
observables.add(metricsService.addDataPoints(AVAILABILITY,
requestToAvailabilities(tenantId, availabilities)).subscribeOn(Schedulers.computation()));
}

Observable.merge(observables).subscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void write(

Observable<Metric<Double>> input = Observable.from(influxObjects)
.map(influxObject -> influxToGauge(tenantId, influxObject));
metricsService.addDataPoints(input).subscribe(new WriteObserver(asyncResponse));
metricsService.addDataPoints(GAUGE, input).subscribe(new WriteObserver(asyncResponse));
}

private static Metric<Double> influxToGauge(String tenantId, InfluxObject influxObject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ <T> Observable<Metric<T>> findMetricsWithFilters(String tenantId, Map<String, St
/**
* Insert data points for the specified {@code metrics}.
*
*
* @param metricType type of all metrics emitted by {@code metrics}
* @param metrics the sources of data points
*
* @return an {@link Observable} emitting just one item on complete
*/
<T> Observable<Void> addDataPoints(Observable<Metric<T>> metrics);
<T> Observable<Void> addDataPoints(MetricType<T> metricType, Observable<Metric<T>> metrics);

/**
* Fetch data points for a single metric.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void call(Task2 task) {
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(new MetricId<>(tenant, COUNTER_RATE, counter.getId().getName()),
singletonList(new DataPoint<>(start, rate)))));
Observable<Void> updates = metricsService.addDataPoints(rates);
Observable<Void> updates = metricsService.addDataPoints(COUNTER_RATE, rates);

CountDownLatch latch = new CountDownLatch(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.hawkular.metrics.core.impl.Functions.makeSafe;
import static org.joda.time.Duration.standardMinutes;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -90,6 +92,7 @@

import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.Func3;

/**
Expand Down Expand Up @@ -562,7 +565,9 @@ public Observable<Void> deleteTags(Metric<?> metric, Map<String, String> tags) {
}

@Override
public <T> Observable<Void> addDataPoints(Observable<Metric<T>> metrics) {
public <T> Observable<Void> addDataPoints(MetricType<T> metricType, Observable<Metric<T>> metrics) {
checkArgument(metricType != null, "metricType is null");

// We write to both the data and the metrics_idx tables. Each metric can have one or more data points. We
// currently write a separate batch statement for each metric.
//
Expand All @@ -581,38 +586,41 @@ public <T> Observable<Void> addDataPoints(Observable<Metric<T>> metrics) {
// we periodically update it in the background, so we will still be aware of metrics that have not been
// explicitly created, just not necessarily right away.

Meter meter;
Func2<Metric<T>, Integer, Observable<Integer>> inserter;
if (metricType == GAUGE || metricType == COUNTER_RATE) {
meter = gaugeInserts;
inserter = (metric, ttl) -> {
@SuppressWarnings("unchecked")
Metric<Double> gauge = (Metric<Double>) metric;
return dataAccess.insertGaugeData(gauge, ttl);
};
} else if (metricType == AVAILABILITY) {
meter = availabilityInserts;
inserter = (metric, ttl) -> {
@SuppressWarnings("unchecked")
Metric<AvailabilityType> avail = (Metric<AvailabilityType>) metric;
return dataAccess.insertAvailabilityData(avail, ttl);
};
} else if (metricType == COUNTER) {
meter = counterInserts;
inserter = (metric, ttl) -> {
@SuppressWarnings("unchecked")
Metric<Long> counter = (Metric<Long>) metric;
return dataAccess.insertCounterData(counter, ttl);
};
} else {
throw new UnsupportedOperationException(metricType.getText());
}

Observable<Integer> updates = metrics
.filter(metric -> !metric.getDataPoints().isEmpty())
.flatMap(metric -> {
MetricType<T> metricType = metric.getId().getType();
if (metricType == GAUGE || metricType == COUNTER_RATE) {
@SuppressWarnings("unchecked")
Metric<Double> gauge = (Metric<Double>) metric;
return dataAccess.insertGaugeData(gauge, getTTL(gauge.getId()))
.doOnNext(gaugeInserts::mark);
}
if (metricType == AVAILABILITY) {
@SuppressWarnings("unchecked")
Metric<AvailabilityType> avail = (Metric<AvailabilityType>) metric;
return dataAccess.insertAvailabilityData(avail, getTTL(avail.getId()))
.doOnNext(availabilityInserts::mark);
}
if (metricType == COUNTER) {
@SuppressWarnings("unchecked")
Metric<Long> counter = (Metric<Long>) metric;
return dataAccess.insertCounterData(counter, getTTL(counter.getId()))
.doOnNext(counterInserts::mark);
} else {
throw new UnsupportedOperationException(metricType.getText());
}
});
.flatMap(metric -> inserter.call(metric, getTTL(metric.getId())))
.doOnNext(meter::mark);

Observable<Integer> tenantUpdates = updateTenantBuckets(metrics);

// I am intentionally return zero for the number index updates because I want to measure and compare the
// throughput inserting data with and without the index updates. This will give us a better idea of how much
// over there is with the index updates.
Observable<Integer> indexUpdates = dataAccess.updateMetricsIndex(metrics).map(count -> 0);
Observable<Integer> indexUpdates = dataAccess.updateMetricsIndex(metrics);
return Observable.concat(updates, indexUpdates, tenantUpdates)
.takeLast(1)
.map(count -> null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void generateRates() {
doAction(() -> metricsService.createMetric(c2));
doAction(() -> metricsService.createMetric(c3));

doAction(() -> metricsService.addDataPoints(Observable.from(asList(
doAction(() -> metricsService.addDataPoints(COUNTER, Observable.from(asList(
new Metric<>(c1.getId(), asList(new DataPoint<>(start.getMillis(), 10L),
new DataPoint<>(start.plusSeconds(30).getMillis(), 25L))),
new Metric<>(c2.getId(), asList(new DataPoint<>(start.getMillis(), 100L),
Expand Down

0 comments on commit e145d97

Please sign in to comment.