Skip to content

Commit

Permalink
Port changes from PR#220 to rx-migration branch
Browse files Browse the repository at this point in the history
  • Loading branch information
tsegismont authored and John Sanda committed May 26, 2015
1 parent 64a9da2 commit 3f7bdf6
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;

import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.badRequest;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.serverError;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -51,7 +48,6 @@
import javax.ws.rs.core.Response.Status;

import org.antlr.v4.runtime.tree.ParseTreeWalker;
import org.hawkular.metrics.api.jaxrs.ApiError;
import org.hawkular.metrics.api.jaxrs.influx.query.InfluxQueryParseTreeWalker;
import org.hawkular.metrics.api.jaxrs.influx.query.parse.InfluxQueryParser;
import org.hawkular.metrics.api.jaxrs.influx.query.parse.InfluxQueryParser.QueryContext;
Expand All @@ -74,7 +70,6 @@
import org.hawkular.metrics.api.jaxrs.influx.query.validation.QueryValidator;
import org.hawkular.metrics.api.jaxrs.influx.write.validation.InfluxObjectValidator;
import org.hawkular.metrics.api.jaxrs.influx.write.validation.InvalidObjectException;
import org.hawkular.metrics.api.jaxrs.util.StringValue;
import org.hawkular.metrics.core.api.Gauge;
import org.hawkular.metrics.core.api.GaugeData;
import org.hawkular.metrics.core.api.Metric;
Expand All @@ -86,9 +81,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import rx.Observable;
import rx.Observer;

/**
* Some support for InfluxDB clients like Grafana.
Expand Down Expand Up @@ -123,26 +121,21 @@ public void write(
) {

if (influxObjects == null) {
asyncResponse.resume(badRequest(new ApiError("Null objects")));
asyncResponse.resume(errorResponse(BAD_REQUEST, "Null objects"));
return;
}

try {
objectValidator.validateInfluxObjects(influxObjects);
} catch (InvalidObjectException e) {
asyncResponse.resume(badRequest(new ApiError(e.getMessage())));
asyncResponse.resume(errorResponse(BAD_REQUEST, Throwables.getRootCause(e).getMessage()));
return;
}

Observable<Gauge> input = Observable.from(influxObjects)
.map(InfluxSeriesHandler::influxObjectToGauge)
.map(gauge -> (Gauge) gauge.setTenantId(tenantId));
metricsService.addGaugeData(input).subscribe(
(aVoid) -> {
},
t -> asyncResponse.resume(serverError(t)),
() -> asyncResponse.resume(Response.ok().build())
);
metricsService.addGaugeData(input).subscribe(new WriteObserver(asyncResponse));
}

private static Gauge influxObjectToGauge(InfluxObject influxObject) {
Expand Down Expand Up @@ -172,7 +165,7 @@ public void query(
) {

if (queryString == null || queryString.isEmpty()) {
asyncResponse.resume(Response.status(Status.BAD_REQUEST).entity("Missing query").build());
asyncResponse.resume(errorResponse(BAD_REQUEST, "Missing query"));
return;
}

Expand Down Expand Up @@ -205,22 +198,19 @@ public void query(

private void listSeries(AsyncResponse asyncResponse, String tenantId) {
metricsService.findMetrics(tenantId, MetricType.GAUGE)
.map(InfluxSeriesHandler::metricToInfluxObject)
.toList()
.subscribe(
objects -> asyncResponse.resume(Response.ok(objects).build()),
asyncResponse::resume
);
.map(InfluxSeriesHandler::metricsListToListSeries)
.subscribe(new ReadObserver(asyncResponse));
}

private static InfluxObject metricToInfluxObject(Metric<?> metric) {
List<String> columns = new ArrayList<>(2);
columns.add("time");
columns.add("sequence_number");
columns.add("val");
return new InfluxObject.Builder(metric.getId().getName(), columns)
.withForeseenPoints(0)
.createInfluxObject();
private static List<InfluxObject> metricsListToListSeries(List<Metric<?>> metrics) {
List<String> columns = ImmutableList.of("time", "name");
InfluxObject.Builder builder = new InfluxObject.Builder("list_series_result", columns)
.withForeseenPoints(metrics.size());
for (Metric metric : metrics) {
builder.addPoint(ImmutableList.of(0, metric.getId().getName()));
}
return ImmutableList.of(builder.createInfluxObject());
}

private void select(AsyncResponse asyncResponse, String tenantId, SelectQueryContext selectQueryContext) {
Expand All @@ -233,8 +223,7 @@ private void select(AsyncResponse asyncResponse, String tenantId, SelectQueryCon
try {
queryValidator.validateSelectQuery(queryDefinitions);
} catch (IllegalQueryException e) {
StringValue errMsg = new StringValue("Illegal query: " + e.getMessage());
asyncResponse.resume(Response.status(Status.BAD_REQUEST).entity(errMsg).build());
asyncResponse.resume(errorResponse(BAD_REQUEST, "Illegal query: " + e.getMessage()));
return;
}

Expand All @@ -247,8 +236,7 @@ private void select(AsyncResponse asyncResponse, String tenantId, SelectQueryCon
timeInterval = toIntervalTranslator.toInterval(whereClause);
}
if (timeInterval == null) {
StringValue errMsg = new StringValue("Invalid time interval");
asyncResponse.resume(Response.status(Status.BAD_REQUEST).entity(errMsg).build());
asyncResponse.resume(errorResponse(BAD_REQUEST, "Invalid time interval"));
return;
}
String columnName = getColumnName(queryDefinitions);
Expand Down Expand Up @@ -321,8 +309,8 @@ tenantId, new MetricId(metric),
).subscribe(
objects -> {
if (objects == null) {
StringValue val = new StringValue("Metric with id [" + metric + "] not found. ");
asyncResponse.resume(Response.status(404).entity(val).build());
String msg = "Metric with id [" + metric + "] not found. ";
asyncResponse.resume(errorResponse(NOT_FOUND, msg));
} else {
ResponseBuilder builder = Response.ok(objects);
asyncResponse.resume(builder.build());
Expand Down Expand Up @@ -535,39 +523,48 @@ private Response errorResponse(Status status, String message) {
return Response.status(status).entity(message).type(TEXT_PLAIN_TYPE).build();
}

private class WriteCallback implements FutureCallback<Void> {
private class WriteObserver implements Observer<Void> {
private final AsyncResponse asyncResponse;

public WriteCallback(AsyncResponse asyncResponse) {
public WriteObserver(AsyncResponse asyncResponse) {
this.asyncResponse = asyncResponse;
}

@Override
public void onSuccess(Void result) {
public void onCompleted() {
asyncResponse.resume(Response.ok().build());
}

@Override
public void onFailure(Throwable t) {
public void onError(Throwable t) {
asyncResponse.resume(errorResponse(INTERNAL_SERVER_ERROR, Throwables.getRootCause(t).getMessage()));
}

@Override
public void onNext(Void aVoid) {
}
}

private class ReadCallback implements FutureCallback<List<InfluxObject>> {
private class ReadObserver implements Observer<List<InfluxObject>> {
private final AsyncResponse asyncResponse;

public ReadCallback(AsyncResponse asyncResponse) {
public ReadObserver(AsyncResponse asyncResponse) {
this.asyncResponse = asyncResponse;
}

@Override
public void onSuccess(List<InfluxObject> result) {
asyncResponse.resume(Response.ok(result).build());
public void onCompleted() {

}

@Override
public void onFailure(Throwable t) {
public void onError(Throwable t) {
asyncResponse.resume(errorResponse(INTERNAL_SERVER_ERROR, Throwables.getRootCause(t).getMessage()));
}

@Override
public void onNext(List<InfluxObject> influxObjects) {
asyncResponse.resume(Response.ok(influxObjects).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class InfluxITest extends RESTTest {
[
[
columns: ["time", "mean"],
name: timeseriesName,
name : timeseriesName,
points: []
]
],
Expand Down

0 comments on commit 3f7bdf6

Please sign in to comment.