Skip to content

Commit

Permalink
[HWKMETRICS-74] port MetricsService.findMetrics to use observables
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 26, 2015
1 parent 455ec75 commit c31c56f
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.badRequest;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.executeAsync;

import java.util.ArrayList;
import java.util.List;

import javax.inject.Inject;
Expand All @@ -34,20 +35,20 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.base.Throwables;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import org.hawkular.metrics.api.jaxrs.ApiError;
import org.hawkular.metrics.api.jaxrs.request.MixedMetricsRequest;
import org.hawkular.metrics.api.jaxrs.util.ApiUtils;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.MetricsService;
import rx.Observable;


/**
Expand Down Expand Up @@ -79,22 +80,25 @@ public class MetricHandler {
public void findMetrics(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Queried metric type", required = true, allowableValues = "[gauge, availability]")
@QueryParam("type") String type
) {
executeAsync(
asyncResponse, () -> {
MetricType metricType = null;
try {
metricType = MetricType.fromTextCode(type);
} catch (IllegalArgumentException e) {
return badRequest(
new ApiError("[" + type + "] is not a valid type. Accepted values are gauge|avail|log")
);
}
ListenableFuture<List<Metric<?>>> future = metricsService.findMetrics(tenantId, metricType);
return Futures.transform(future, ApiUtils.MAP_COLLECTION);
}
);
@QueryParam("type") String type) {

try {
Observable<Metric<?>> metrics = metricsService.findMetrics(tenantId, MetricType.fromTextCode(type));
metrics.reduce(new ArrayList<>(), (ArrayList<Metric> list, Metric metric) -> {
list.add(metric);
return list;
}).map(list -> list.isEmpty() ? noContent() : Response.ok(list).build()).subscribe(
asyncResponse::resume,
t -> {
String msg = "Failed to perform operation due to an error: " +
Throwables.getRootCause(t).getMessage();
asyncResponse.resume(Response.serverError().entity(new ApiError(msg)).build());
});

} catch (IllegalArgumentException e) {
ApiError error = new ApiError("[" + type + "] is not a valid type. Accepted values are gauge|avail|log");
asyncResponse.resume(Response.status(Response.Status.BAD_REQUEST).entity(error).build());
}
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.antlr.v4.runtime.tree.ParseTreeWalker;
import org.hawkular.metrics.api.jaxrs.influx.query.InfluxQueryParseTreeWalker;
import org.hawkular.metrics.api.jaxrs.influx.query.parse.InfluxQueryParser;
Expand All @@ -67,8 +63,6 @@
import org.hawkular.metrics.api.jaxrs.influx.write.validation.InfluxObjectValidator;
import org.hawkular.metrics.api.jaxrs.util.StringValue;
import org.hawkular.metrics.core.api.GaugeData;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -176,30 +170,35 @@ public void query(
}

private void listSeries(AsyncResponse asyncResponse, String tenantId) {
ListenableFuture<List<Metric<?>>> future = metricsService.findMetrics(tenantId, MetricType.GAUGE);
ListenableFuture<List<InfluxObject>> resultFuture = Futures.transform(
future,
InfluxSeriesHandler::metricsListToListSeries
);
Futures.addCallback(resultFuture, new ReadCallback(asyncResponse));
// ListenableFuture<List<Metric<?>>> future = metricsService.findMetrics(tenantId, MetricType.GAUGE);
// Futures.addCallback(future, new FutureCallback<List<Metric<?>>>() {
// @Override
// public void onSuccess(List<Metric<?>> result) {
// List<InfluxObject> objects = new ArrayList<>(result.size());
//
// for (Metric metric : result) {
// List<String> columns = new ArrayList<>(2);
// columns.add("time");
// columns.add("sequence_number");
// columns.add("val");
// InfluxObject.Builder builder = new InfluxObject.Builder(metric.getId().getName(), columns)
// .withForeseenPoints(0);
// objects.add(builder.createInfluxObject());
// }
//
// ResponseBuilder builder = Response.ok(objects);
//
// asyncResponse.resume(builder.build());
// }
//
// @Override
// public void onFailure(Throwable t) {
// asyncResponse.resume(t);
// }
// });
}

for (Metric metric : result) {
List<String> columns = new ArrayList<>(2);
columns.add("time");
columns.add(columnName);

InfluxObject.Builder builder = new InfluxObject.Builder(metric, columns)
.withForeseenPoints(metrics.size());

for (GaugeData m : metrics) {
List<Object> data = new ArrayList<>();
data.add(m.getTimestamp());
data.add(m.getValue());
builder.addPoint(data);
}

objects.add(builder.createInfluxObject());
ResponseBuilder builder = Response.ok(objects);

asyncResponse.resume(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public enum State {

ListenableFuture<Optional<Metric<?>>> findMetric(String tenantId, MetricType type, MetricId id);

ListenableFuture<List<Metric<?>>> findMetrics(String tenantId, MetricType type);
Observable<Metric<?>> findMetrics(String tenantId, MetricType type);

ListenableFuture<Map<String, String>> getMetricTags(String tenantId, MetricType type, MetricId id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ ResultSetFuture updateTagsInMetricsIndex(Metric<?> metric, Map<String, String> a

<T extends Metric<?>> ResultSetFuture updateMetricsIndex(List<T> metrics);

ResultSetFuture findMetricsInMetricsIndex(String tenantId, MetricType type);
Observable<ResultSet> findMetricsInMetricsIndex(String tenantId, MetricType type);

ResultSetFuture insertData(Gauge metric, int ttl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,8 @@ public <T extends Metric<?>> ResultSetFuture updateMetricsIndex(List<T> metrics)
}

@Override
public ResultSetFuture findMetricsInMetricsIndex(String tenantId, MetricType type) {
return session.executeAsync(readMetricsIndex.bind(tenantId, type.getCode()));
public Observable<ResultSet> findMetricsInMetricsIndex(String tenantId, MetricType type) {
return rxSession.execute(readMetricsIndex.bind(tenantId, type.getCode()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public ListenableFuture<Void> apply(ResultSet resultSet) {
public ListenableFuture<List<Tenant>> getTenants() {
ListenableFuture<Stream<String>> tenantIdsFuture = Futures.transform(
dataAccess.findAllTenantIds(), (ResultSet input) ->
StreamSupport.stream(input.spliterator(), false).map(row -> row.getString(0)), metricsTasks);
StreamSupport.stream(input.spliterator(), false).map(row -> row.getString(0)), metricsTasks);

return Futures.transform(tenantIdsFuture, (Stream<String> input) ->
Futures.allAsList(input.map(dataAccess::findTenant).map(Functions::getTenant).collect(toList())));
Expand Down Expand Up @@ -409,9 +409,31 @@ public ListenableFuture<Optional<Metric<?>>> findMetric(final String tenantId, f
}

@Override
public ListenableFuture<List<Metric<?>>> findMetrics(String tenantId, MetricType type) {
ResultSetFuture future = dataAccess.findMetricsInMetricsIndex(tenantId, type);
return Futures.transform(future, new MetricsIndexMapper(tenantId, type), metricsTasks);
public Observable<Metric<?>> findMetrics(String tenantId, MetricType type) {
Observable<ResultSet> observable = dataAccess.findMetricsInMetricsIndex(tenantId, type);
if (type == MetricType.GAUGE) {
return observable.flatMap(Observable::from).map(row -> toGauge(tenantId, row));
} else {
return observable.flatMap(Observable::from).map(row -> toAvailability(tenantId, row));
}
}

private Gauge toGauge(String tenantId, Row row) {
return new Gauge(
tenantId,
new MetricId(row.getString(0), Interval.parse(row.getString(1))),
row.getMap(2, String.class, String.class),
row.getInt(3)
);
}

private Availability toAvailability(String tenantId, Row row) {
return new Availability(
tenantId,
new MetricId(row.getString(0), Interval.parse(row.getString(1))),
row.getMap(2, String.class, String.class),
row.getInt(3)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public <T extends Metric<?>> ResultSetFuture updateMetricsIndex(List<T> metrics)
}

@Override
public ResultSetFuture findMetricsInMetricsIndex(String tenantId, MetricType type) {
public Observable<ResultSet> findMetricsInMetricsIndex(String tenantId, MetricType type) {
return delegate.findMetricsInMetricsIndex(tenantId, type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,9 +959,8 @@ private void assertMetricEquals(Metric actual, Metric expected) {

private void assertMetricIndexMatches(String tenantId, MetricType type, List<? extends Metric> expected)
throws Exception {
ListenableFuture<List<Metric<?>>> metricsFuture = metricsService.findMetrics(tenantId, type);
List<Metric<?>> actualIndex = getUninterruptibly(metricsFuture);

List<Metric<?>> actualIndex = ImmutableList.copyOf(metricsService.findMetrics(tenantId, type).toBlocking()
.toIterable());
assertEquals(actualIndex, expected, "The metrics index results do not match");
}

Expand Down

0 comments on commit c31c56f

Please sign in to comment.