Skip to content

Commit

Permalink
Merge pull request #221 from tsegismont/grafana_testing_rx-migration
Browse files Browse the repository at this point in the history
Merging now that PR #220 has been merged into master.
  • Loading branch information
tsegismont committed May 22, 2015
2 parents 7e36093 + 4754206 commit 36467c8
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ booleanExpression: operand '=' operand #eqExpression
;

operand: prefix? name #nameOperand
| TIMESPAN #absoluteMomentOperand
| ID '(' ')' DASH TIMESPAN #pastMomentOperand
| ID '(' ')' PLUS TIMESPAN #futureMomentOperand
| ID '(' ')' #presentMomentOperand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package org.hawkular.metrics.api.jaxrs.influx;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Transfer object which is returned by Influx queries
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class InfluxObject {
private final String name;
private final List<String> columns;
Expand All @@ -35,7 +38,7 @@ private InfluxObject(@JsonProperty("name") String name, @JsonProperty("columns")
@JsonProperty("points") List<List<?>> points) {
this.name = name;
this.columns = columns;
this.points = points;
this.points = points == null ? Collections.emptyList() : points;
}

public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import static java.util.concurrent.TimeUnit.SECONDS;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.badRequest;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.serverError;
import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -47,7 +48,6 @@
import javax.ws.rs.core.Response.Status;

import org.antlr.v4.runtime.tree.ParseTreeWalker;
import org.hawkular.metrics.api.jaxrs.ApiError;
import org.hawkular.metrics.api.jaxrs.influx.query.InfluxQueryParseTreeWalker;
import org.hawkular.metrics.api.jaxrs.influx.query.parse.InfluxQueryParser;
import org.hawkular.metrics.api.jaxrs.influx.query.parse.InfluxQueryParser.QueryContext;
Expand All @@ -70,7 +70,6 @@
import org.hawkular.metrics.api.jaxrs.influx.query.validation.QueryValidator;
import org.hawkular.metrics.api.jaxrs.influx.write.validation.InfluxObjectValidator;
import org.hawkular.metrics.api.jaxrs.influx.write.validation.InvalidObjectException;
import org.hawkular.metrics.api.jaxrs.util.StringValue;
import org.hawkular.metrics.core.api.Gauge;
import org.hawkular.metrics.core.api.GaugeData;
import org.hawkular.metrics.core.api.Metric;
Expand All @@ -82,9 +81,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import rx.Observable;
import rx.Observer;

/**
* Some support for InfluxDB clients like Grafana.
Expand Down Expand Up @@ -119,26 +121,21 @@ public void write(
) {

if (influxObjects == null) {
asyncResponse.resume(badRequest(new ApiError("Null objects")));
asyncResponse.resume(errorResponse(BAD_REQUEST, "Null objects"));
return;
}

try {
objectValidator.validateInfluxObjects(influxObjects);
} catch (InvalidObjectException e) {
asyncResponse.resume(badRequest(new ApiError(e.getMessage())));
asyncResponse.resume(errorResponse(BAD_REQUEST, Throwables.getRootCause(e).getMessage()));
return;
}

Observable<Gauge> input = Observable.from(influxObjects)
.map(InfluxSeriesHandler::influxObjectToGauge)
.map(gauge -> (Gauge) gauge.setTenantId(tenantId));
metricsService.addGaugeData(input).subscribe(
(aVoid) -> {
},
t -> asyncResponse.resume(serverError(t)),
() -> asyncResponse.resume(Response.ok().build())
);
metricsService.addGaugeData(input).subscribe(new WriteObserver(asyncResponse));
}

private static Gauge influxObjectToGauge(InfluxObject influxObject) {
Expand Down Expand Up @@ -168,19 +165,19 @@ public void query(
) {

if (queryString == null || queryString.isEmpty()) {
asyncResponse.resume(Response.status(Status.BAD_REQUEST).entity("Missing query").build());
asyncResponse.resume(errorResponse(BAD_REQUEST, "Missing query"));
return;
}

InfluxQueryParser queryParser = parserFactory.newInstanceForQuery(queryString);
QueryContext queryContext = queryParser.query();

QueryContext queryContext;
QueryType queryType;
try {
queryContext = queryParser.query();
queryType = new QueryTypeVisitor().visit(queryContext);
} catch (QueryParseException e) {
StringValue errMsg = new StringValue("Syntactically incorrect query: " + e.getMessage());
asyncResponse.resume(Response.status(Status.BAD_REQUEST).entity(errMsg).build());
asyncResponse.resume(errorResponse(BAD_REQUEST, "Syntactically incorrect query: " + e.getMessage()));
return;
} catch (Exception e) {
asyncResponse.resume(e);
Expand All @@ -195,29 +192,25 @@ public void query(
select(asyncResponse, tenantId, queryContext.selectQuery());
break;
default:
StringValue errMsg = new StringValue("Query not yet supported: " + queryString);
asyncResponse.resume(Response.status(Status.BAD_REQUEST).entity(errMsg).build());
asyncResponse.resume(errorResponse(BAD_REQUEST, "Query not yet supported: " + queryString));
}
}

private void listSeries(AsyncResponse asyncResponse, String tenantId) {
metricsService.findMetrics(tenantId, MetricType.GAUGE)
.map(InfluxSeriesHandler::metricToInfluxObject)
.toList()
.subscribe(
objects -> asyncResponse.resume(Response.ok(objects).build()),
asyncResponse::resume
);
.map(InfluxSeriesHandler::metricsListToListSeries)
.subscribe(new ReadObserver(asyncResponse));
}

private static InfluxObject metricToInfluxObject(Metric<?> metric) {
List<String> columns = new ArrayList<>(2);
columns.add("time");
columns.add("sequence_number");
columns.add("val");
return new InfluxObject.Builder(metric.getId().getName(), columns)
.withForeseenPoints(0)
.createInfluxObject();
private static List<InfluxObject> metricsListToListSeries(List<Metric<?>> metrics) {
List<String> columns = ImmutableList.of("time", "name");
InfluxObject.Builder builder = new InfluxObject.Builder("list_series_result", columns)
.withForeseenPoints(metrics.size());
for (Metric metric : metrics) {
builder.addPoint(ImmutableList.of(0, metric.getId().getName()));
}
return ImmutableList.of(builder.createInfluxObject());
}

private void select(AsyncResponse asyncResponse, String tenantId, SelectQueryContext selectQueryContext) {
Expand All @@ -230,8 +223,7 @@ private void select(AsyncResponse asyncResponse, String tenantId, SelectQueryCon
try {
queryValidator.validateSelectQuery(queryDefinitions);
} catch (IllegalQueryException e) {
StringValue errMsg = new StringValue("Illegal query: " + e.getMessage());
asyncResponse.resume(Response.status(Status.BAD_REQUEST).entity(errMsg).build());
asyncResponse.resume(errorResponse(BAD_REQUEST, "Illegal query: " + e.getMessage()));
return;
}

Expand All @@ -244,8 +236,7 @@ private void select(AsyncResponse asyncResponse, String tenantId, SelectQueryCon
timeInterval = toIntervalTranslator.toInterval(whereClause);
}
if (timeInterval == null) {
StringValue errMsg = new StringValue("Invalid time interval");
asyncResponse.resume(Response.status(Status.BAD_REQUEST).entity(errMsg).build());
asyncResponse.resume(errorResponse(BAD_REQUEST, "Invalid time interval"));
return;
}
String columnName = getColumnName(queryDefinitions);
Expand Down Expand Up @@ -318,8 +309,8 @@ tenantId, new MetricId(metric),
).subscribe(
objects -> {
if (objects == null) {
StringValue val = new StringValue("Metric with id [" + metric + "] not found. ");
asyncResponse.resume(Response.status(404).entity(val).build());
String msg = "Metric with id [" + metric + "] not found. ";
asyncResponse.resume(errorResponse(NOT_FOUND, msg));
} else {
ResponseBuilder builder = Response.ok(objects);
asyncResponse.resume(builder.build());
Expand Down Expand Up @@ -527,4 +518,53 @@ private double quantil(List<GaugeData> in, double val) {
return bla.get((int) Math.ceil(x - 1));
}
}

private Response errorResponse(Status status, String message) {
return Response.status(status).entity(message).type(TEXT_PLAIN_TYPE).build();
}

private class WriteObserver implements Observer<Void> {
private final AsyncResponse asyncResponse;

public WriteObserver(AsyncResponse asyncResponse) {
this.asyncResponse = asyncResponse;
}

@Override
public void onCompleted() {
asyncResponse.resume(Response.ok().build());
}

@Override
public void onError(Throwable t) {
asyncResponse.resume(errorResponse(INTERNAL_SERVER_ERROR, Throwables.getRootCause(t).getMessage()));
}

@Override
public void onNext(Void aVoid) {
}
}

private class ReadObserver implements Observer<List<InfluxObject>> {
private final AsyncResponse asyncResponse;

public ReadObserver(AsyncResponse asyncResponse) {
this.asyncResponse = asyncResponse;
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable t) {
asyncResponse.resume(errorResponse(INTERNAL_SERVER_ERROR, Throwables.getRootCause(t).getMessage()));
}

@Override
public void onNext(List<InfluxObject> influxObjects) {
asyncResponse.resume(Response.ok(influxObjects).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.antlr.v4.runtime.misc.NotNull;
import org.hawkular.metrics.api.jaxrs.influx.query.parse.InfluxQueryBaseListener;
import org.hawkular.metrics.api.jaxrs.influx.query.parse.InfluxQueryParser;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -256,6 +258,18 @@ public void exitNameOperand(@NotNull NameOperandContext ctx) {
name = null;
}

@Override
public void exitAbsoluteMomentOperand(InfluxQueryParser.AbsoluteMomentOperandContext ctx) {
String timespan = ctx.TIMESPAN().getText();
int amount = Integer.parseInt(timespan.substring(0, timespan.length() - 1));
char unitId = timespan.charAt(timespan.length() - 1);
InfluxTimeUnit unit = InfluxTimeUnit.findById(unitId);
if (unit == null) {
throw new RuntimeException("Unknown time unit: " + unitId);
}
operandQueue.addLast(new DateOperand(new Instant(unit.convertTo(TimeUnit.MILLISECONDS, amount))));
}

@Override
public void exitPastMomentOperand(@NotNull PastMomentOperandContext ctx) {
String functionName = ctx.ID().getText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ select mean(a.value) from c as a where time < now() - 50w
select a.value as b from c as a where time < '2011-07-28' and time > now() + 50w
select a.value as b from c as a where '2011-07-28' < a.time and now() + 50w > a.time
select a.value as b from c as a where '2011-07-28' > a.time and now() + 50w < a.time
select * from test where time > 1501560s and time < 4560546h
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ select percentile("d", .95) from a
SeleCt dup.time as now, _agjsqdha(clui) as p from z as bi group by time(15s)
SeleCt dup.time as now, _agjsqdha(clui) as p from z as bi group by time(15s) where time > now() -15s
SeleCt dup.time as now, _agjsqdha(clui, -5, -33.1) as p from z as bi group by time(15s) where time > now() -15s and a=-.33
select * from test where time > 1501560s and time < 4560546h
SeleCt dup.a fROm "z" as bi order desc
SeleCt dup.b fROm "z" as bi order asc
2 changes: 1 addition & 1 deletion clients/ptranslator/log4j-dev.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
</appender>

<root>
<level value="TRACE"/>
<level value="DEBUG"/>
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</root>
Expand Down

0 comments on commit 36467c8

Please sign in to comment.