Skip to content

Commit

Permalink
Some gauge methods converted to Observables
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Burman authored and John Sanda committed May 26, 2015
1 parent cfd8093 commit 19613fd
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,61 +346,56 @@ public void findPeriods(
@ApiParam(value = "A threshold against which values are compared", required = true)
@QueryParam("threshold") double threshold,
@ApiParam(value = "A comparison operation to perform between values and the threshold."
+ " Supported operations include ge, gte, lt, lte, and eq", required = true)
+ " Supported operations include ge, gte, lt, lte, and eq", required = true,
allowableValues = "[ge, gte, lt, lte, eq]")
@QueryParam("op") String operator
) {
executeAsync(
asyncResponse,
() -> {
long now = System.currentTimeMillis();
Long startTime = start;
Long endTime = end;
if (start == null) {
startTime = now - EIGHT_HOURS;
}
if (end == null) {
endTime = now;
}
long now = System.currentTimeMillis();
Long startTime = start;
Long endTime = end;
if (start == null) {
startTime = now - EIGHT_HOURS;
}
if (end == null) {
endTime = now;
}

Predicate<Double> predicate;
switch (operator) {
case "lt":
predicate = d -> d < threshold;
break;
case "lte":
predicate = d -> d <= threshold;
break;
case "eq":
predicate = d -> d == threshold;
break;
case "neq":
predicate = d -> d != threshold;
break;
case "gt":
predicate = d -> d > threshold;
break;
case "gte":
predicate = d -> d >= threshold;
break;
default:
predicate = null;
}
Predicate<Double> predicate;
switch (operator) { // Why not enum?
case "lt":
predicate = d -> d < threshold;
break;
case "lte":
predicate = d -> d <= threshold;
break;
case "eq":
predicate = d -> d == threshold;
break;
case "neq":
predicate = d -> d != threshold;
break;
case "gt":
predicate = d -> d > threshold;
break;
case "gte":
predicate = d -> d >= threshold;
break;
default:
predicate = null;
}

if (predicate == null) {
return Futures.immediateFuture(
badRequest(
new ApiError(
"Invalid value for op parameter. Supported values are lt, "
+ "lte, eq, gt, gte."
)
)
);
} else {
ListenableFuture<List<long[]>> future = metricsService.getPeriods(tenantId, new MetricId(id),
predicate, startTime, endTime);
return Futures.transform(future, ApiUtils.MAP_COLLECTION);
}
});
if (predicate == null) {
asyncResponse.resume(badRequest(
new ApiError(
"Invalid value for op parameter. Supported values are lt, "
+ "lte, eq, gt, gte."
)
));
} else {
metricsService.getPeriods(tenantId, new MetricId(id), predicate, startTime, endTime)
.map(ApiUtils::collectionToResponse)
.subscribe(asyncResponse::resume, t -> asyncResponse.resume(ApiUtils.serverError(t)));
}
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private void select(AsyncResponse asyncResponse, String tenantId, SelectQueryCon
}
String columnName = getColumnName(queryDefinitions);

Observable.from(metricsService.idExists(metric))
metricsService.idExists(metric)
.flatMap(
idExists -> {
if (idExists != Boolean.TRUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Observable<BucketedOutput<AvailabilityBucketDataPoint>> findAvailabilityStats(
ListenableFuture<List<Counter>> findCounters(String group, List<String> counterNames);

/** Check if a metric with the passed {id} has been stored in the system */
ListenableFuture<Boolean> idExists(String id);
Observable<Boolean> idExists(String id);

Observable<ResultSet> tagGaugeData(Gauge metric, Map<String, String> tags,
long start, long end);
Expand Down Expand Up @@ -160,6 +160,6 @@ Observable<Map<MetricId, Set<AvailabilityData>>> findAvailabilityByTags(String t
* @return Each element in the list is a two element array. The first element is the start time inclusive for which
* the predicate matches, and the second element is the end time inclusive for which the predicate matches.
*/
ListenableFuture<List<long[]>> getPeriods(String tenantId, MetricId id, Predicate<Double> predicate, long start,
long end);
Observable<List<long[]>> getPeriods(String tenantId, MetricId id, Predicate<Double> predicate, long start,
long end);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Observable<ResultSet> updateTagsInMetricsIndex(Metric<?> metric, Map<String, Str

Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime);

ResultSetFuture findData(Gauge metric, long startTime, long endTime, Order order);
Observable<ResultSet> findData(Gauge metric, long startTime, long endTime, Order order);

Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime,
boolean includeWriteTime);
Expand All @@ -85,9 +85,9 @@ Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, lon

Observable<ResultSet> findData(Availability metric, long timestamp);

ResultSetFuture deleteGuageMetric(String tenantId, String metric, Interval interval, long dpart);
Observable<ResultSet> deleteGaugeMetric(String tenantId, String metric, Interval interval, long dpart);

ResultSetFuture findAllGuageMetrics();
Observable<ResultSet> findAllGaugeMetrics();

Observable<ResultSet> insertGaugeTag(String tag, String tagValue, Gauge metric, Observable<GaugeData> data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,15 +479,15 @@ public Observable<ResultSet> findData(String tenantId, MetricId id, long startTi
}

@Override
public ResultSetFuture findData(Gauge metric, long startTime, long endTime, Order order) {
public Observable<ResultSet> findData(Gauge metric, long startTime, long endTime, Order order) {
if (order == Order.ASC) {
return session.executeAsync(findGaugeDataByDateRangeExclusiveASC.bind(metric.getTenantId(),
MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(),
metric.getDpart(), TimeUUIDUtils.getTimeUUID(startTime), TimeUUIDUtils.getTimeUUID(endTime)));
return rxSession.execute(findGaugeDataByDateRangeExclusiveASC.bind(metric.getTenantId(),
MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(),
metric.getDpart(), TimeUUIDUtils.getTimeUUID(startTime), TimeUUIDUtils.getTimeUUID(endTime)));
} else {
return session.executeAsync(findGaugeDataByDateRangeExclusive.bind(metric.getTenantId(),
MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(),
metric.getDpart(), TimeUUIDUtils.getTimeUUID(startTime), TimeUUIDUtils.getTimeUUID(endTime)));
return rxSession.execute(findGaugeDataByDateRangeExclusive.bind(metric.getTenantId(),
MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(),
metric.getDpart(), TimeUUIDUtils.getTimeUUID(startTime), TimeUUIDUtils.getTimeUUID(endTime)));
}
}

Expand Down Expand Up @@ -545,14 +545,14 @@ public Observable<ResultSet> findData(Availability metric, long timestamp) {
}

@Override
public ResultSetFuture deleteGuageMetric(String tenantId, String metric, Interval interval, long dpart) {
return session.executeAsync(deleteGaugeMetric.bind(tenantId, MetricType.GAUGE.getCode(), metric,
interval.toString(), dpart));
public Observable<ResultSet> deleteGaugeMetric(String tenantId, String metric, Interval interval, long dpart) {
return rxSession.execute(deleteGaugeMetric.bind(tenantId, MetricType.GAUGE.getCode(), metric,
interval.toString(), dpart));
}

@Override
public ResultSetFuture findAllGuageMetrics() {
return session.executeAsync(findGaugeMetrics.bind());
public Observable<ResultSet> findAllGaugeMetrics() {
return rxSession.execute(findGaugeMetrics.bind());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -596,19 +595,11 @@ public Observable<BucketedOutput<AvailabilityBucketDataPoint>> findAvailabilityS
}

@Override
public ListenableFuture<Boolean> idExists(final String id) {
ResultSetFuture future = dataAccess.findAllGuageMetrics();
return Futures.transform(future, new Function<ResultSet, Boolean>() {
@Override
public Boolean apply(ResultSet resultSet) {
for (Row row : resultSet) {
if (id.equals(row.getString(2))) {
return true;
}
}
return false;
}
}, metricsTasks);
public Observable<Boolean> idExists(final String id) {
return dataAccess.findAllGaugeMetrics().flatMap(Observable::from)
.filter(row -> id.equals(row.getString(2)))
.map(r -> Boolean.TRUE)
.defaultIfEmpty(Boolean.FALSE);
}

@Override
Expand Down Expand Up @@ -707,13 +698,11 @@ public Observable<Map<MetricId, Set<AvailabilityData>>> findAvailabilityByTags(S
}

@Override
public ListenableFuture<List<long[]>> getPeriods(String tenantId, MetricId id, Predicate<Double> predicate,
long start, long end) {
ResultSetFuture resultSetFuture = dataAccess.findData(new Gauge(tenantId, id), start, end, Order.ASC);
ListenableFuture<List<GaugeData>> dataFuture = Futures.transform(resultSetFuture,
Functions.MAP_GAUGE_DATA, metricsTasks);

return Futures.transform(dataFuture, (List<GaugeData> data) -> {
public Observable<List<long[]>> getPeriods(String tenantId, MetricId id, Predicate<Double> predicate,
long start, long end) {
return dataAccess.findData(new Gauge(tenantId, id), start, end, Order.ASC).flatMap(Observable::from).map
(Functions::getGaugeData)
.toList().map(data -> {
List<long[]> periods = new ArrayList<>(data.size());
long[] period = null;
GaugeData previous = null;
Expand All @@ -735,9 +724,8 @@ public ListenableFuture<List<long[]>> getPeriods(String tenantId, MetricId id, P
period[1] = previous.getTimestamp();
periods.add(period);
}

return periods;
}, metricsTasks);
});
}

private int getTTL(Metric<?> metric) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public Observable<ResultSet> findData(String tenantId, MetricId id, long startTi
}

@Override
public ResultSetFuture findData(Gauge metric, long startTime, long endTime, Order order) {
public Observable<ResultSet> findData(Gauge metric, long startTime, long endTime, Order order) {
return delegate.findData(metric, startTime, endTime, order);
}

Expand Down Expand Up @@ -158,13 +158,13 @@ public Observable<ResultSet> findData(Availability metric, long timestamp) {
}

@Override
public ResultSetFuture deleteGuageMetric(String tenantId, String metric, Interval interval, long dpart) {
return delegate.deleteGuageMetric(tenantId, metric, interval, dpart);
public Observable<ResultSet> deleteGaugeMetric(String tenantId, String metric, Interval interval, long dpart) {
return delegate.deleteGaugeMetric(tenantId, metric, interval, dpart);
}

@Override
public ResultSetFuture findAllGuageMetrics() {
return delegate.findAllGuageMetrics();
public Observable<ResultSet> findAllGaugeMetrics() {
return delegate.findAllGaugeMetrics();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,8 +910,8 @@ public void getPeriodsAboveThreshold() throws Exception {

metricsService.addGaugeData(Observable.just(m1)).toBlocking().lastOrDefault(null);

List<long[]> actual = getUninterruptibly(metricsService.getPeriods(tenantId, m1.getId(),
value -> value > threshold, start.getMillis(), now().getMillis()));
List<long[]> actual = metricsService.getPeriods(tenantId, m1.getId(),
value -> value > threshold, start.getMillis(), now().getMillis()).toBlocking().lastOrDefault(null);
List<long[]> expected = asList(
new long[] {start.plusMinutes(2).getMillis(), start.plusMinutes(3).getMillis()},
new long[] {start.plusMinutes(6).getMillis(), start.plusMinutes(6).getMillis()},
Expand Down

0 comments on commit 19613fd

Please sign in to comment.