Skip to content

Commit

Permalink
Add QueryRpc class for handling timesieres queries
Browse files Browse the repository at this point in the history
Add TestQueryRpc class for unit tests
Add serializer methods parseQueryV1() and formatQueryV1()

Signed-off-by: Chris Larsen <clarsen@euphoriaaudio.com>
  • Loading branch information
manolama committed Apr 14, 2013
1 parent 5f6edb0 commit 8cb99b1
Show file tree
Hide file tree
Showing 7 changed files with 540 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ tsdb_SRC := \
src/tsd/LogsRpc.java \
src/tsd/PipelineFactory.java \
src/tsd/PutDataPointRpc.java \
src/tsd/QueryRpc.java \
src/tsd/RpcHandler.java \
src/tsd/StaticFileRpc.java \
src/tsd/SuggestRpc.java \
Expand Down Expand Up @@ -128,6 +129,7 @@ test_SRC := \
test/tsd/TestHttpJsonSerializer.java \
test/tsd/TestHttpQuery.java \
test/tsd/TestPutRpc.java \
test/tsd/TestQueryRpc.java \
test/tsd/TestSuggestRpc.java \
test/tsd/TestUniqueIdRpc.java \
test/uid/TestNoSuchUniqueId.java \
Expand Down
1 change: 1 addition & 0 deletions src/core/TSQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* to compile the query into {@link Query} objects for processing.
* <b>Note:</b> If using POJO deserialization, make sure to avoid setting the
* {@code start_time} and {@code end_time} fields.
* @since 2.0
*/
public final class TSQuery {

Expand Down
114 changes: 113 additions & 1 deletion src/tsd/HttpJsonSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.tsd;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -20,14 +22,21 @@
import java.util.TreeMap;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.stumbleupon.async.Deferred;

import net.opentsdb.core.DataPoint;
import net.opentsdb.core.DataPoints;
import net.opentsdb.core.IncomingDataPoint;
import net.opentsdb.core.TSDB;
import net.opentsdb.core.TSQuery;
import net.opentsdb.utils.JSON;

/**
Expand All @@ -39,7 +48,9 @@
* @since 2.0
*/
class HttpJsonSerializer extends HttpSerializer {

private static final Logger LOG =
LoggerFactory.getLogger(HttpJsonSerializer.class);

/** Type reference for incoming data points */
private static TypeReference<ArrayList<IncomingDataPoint>> TR_INCOMING =
new TypeReference<ArrayList<IncomingDataPoint>>() {};
Expand Down Expand Up @@ -159,6 +170,26 @@ public HashMap<String, List<String>> parseUidAssignV1() {
}
}

/**
* Parses a timeseries data query
* @return A TSQuery with data ready to validate
* @throws JSONException if parsing failed
* @throws BadRequestException if the content was missing or parsing failed
*/
public TSQuery parseQueryV1() {
final String json = query.getContent();
if (json == null || json.isEmpty()) {
throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
"Missing message content",
"Supply valid JSON formatted data in the body of your request");
}
try {
return JSON.parseToObject(json, TSQuery.class);
} catch (IllegalArgumentException iae) {
throw new BadRequestException("Unable to parse the given JSON", iae);
}
}

/**
* Formats the results of an HTTP data point storage request
* @param results A map of results. The map will consist of:
Expand Down Expand Up @@ -238,6 +269,87 @@ public ChannelBuffer formatUidAssignV1(final
return this.serializeJSON(response);
}

/**
* Format the results from a timeseries data query
* @param data_query The TSQuery object used to fetch the results
* @param results The data fetched from storage
* @return A ChannelBuffer object to pass on to the caller
*/
public ChannelBuffer formatQueryV1(final TSQuery data_query,
final List<DataPoints[]> results) {

final boolean as_arrays = this.query.hasQueryStringParam("arrays");

// todo - this should be streamed at some point since it could be HUGE
final ChannelBuffer response = ChannelBuffers.dynamicBuffer();
final OutputStream output = new ChannelBufferOutputStream(response);
try {
JsonGenerator json = JSON.getFactory().createGenerator(output);
json.writeStartArray();

for (DataPoints[] separate_dps : results) {
for (DataPoints dps : separate_dps) {
json.writeStartObject();

json.writeStringField("metric", dps.metricName());

json.writeFieldName("tags");
json.writeStartObject();
if (dps.getTags() != null) {
for (Map.Entry<String, String> tag : dps.getTags().entrySet()) {
json.writeStringField(tag.getKey(), tag.getValue());
}
}
json.writeEndObject();

json.writeFieldName("aggregated_tags");
json.writeStartArray();
if (dps.getAggregatedTags() != null) {
for (String atag : dps.getAggregatedTags()) {
json.writeString(atag);
}
}
json.writeEndArray();

// now the fun stuff, dump the data
json.writeFieldName("dps");

// default is to write a map, otherwise write arrays
if (as_arrays) {
json.writeStartArray();
for (final DataPoint dp : dps) {
json.writeStartArray();
json.writeNumber(dp.timestamp());
json.writeNumber(
dp.isInteger() ? dp.longValue() : dp.doubleValue());
json.writeEndArray();
}
json.writeEndArray();
} else {
json.writeStartObject();
for (final DataPoint dp : dps) {
json.writeNumberField(Long.toString(dp.timestamp()),
dp.isInteger() ? dp.longValue() : dp.doubleValue());
}
json.writeEndObject();
}

// close the results for this particular query
json.writeEndObject();
}
}

// close
json.writeEndArray();
json.close();

return response;
} catch (IOException e) {
LOG.error("Unexpected exception", e);
throw new RuntimeException(e);
}
}

/**
* Helper object for the format calls to wrap the JSON response in a JSONP
* function if requested. Used for code dedupe.
Expand Down
29 changes: 29 additions & 0 deletions src/tsd/HttpSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@

import com.stumbleupon.async.Deferred;

import net.opentsdb.core.DataPoints;
import net.opentsdb.core.IncomingDataPoint;
import net.opentsdb.core.TSDB;
import net.opentsdb.core.TSQuery;

/**
* Abstract base class for Serializers; plugins that handle converting requests
Expand Down Expand Up @@ -185,6 +187,18 @@ public HashMap<String, List<String>> parseUidAssignV1() {
" has not implemented parseUidAssignV1");
}

/**
* Parses a timeseries data query
* @return A TSQuery with data ready to validate
* @throws BadRequestException if the plugin has not implemented this method
*/
public TSQuery parseQueryV1() {
throw new BadRequestException(HttpResponseStatus.NOT_IMPLEMENTED,
"The requested API endpoint has not been implemented",
this.getClass().getCanonicalName() +
" has not implemented parseQueryV1");
}

/**
* Formats the results of an HTTP data point storage request
* @param results A map of results. The map will consist of:
Expand Down Expand Up @@ -284,6 +298,21 @@ public ChannelBuffer formatUidAssignV1(final
" has not implemented formatUidAssignV1");
}

/**
* Format the results from a timeseries data query
* @param query The TSQuery object used to fetch the results
* @param results The data fetched from storage
* @return A ChannelBuffer object to pass on to the caller
* @throws BadRequestException if the plugin has not implemented this method
*/
public ChannelBuffer formatQueryV1(final TSQuery query,
final List<DataPoints[]> results) {
throw new BadRequestException(HttpResponseStatus.NOT_IMPLEMENTED,
"The requested API endpoint has not been implemented",
this.getClass().getCanonicalName() +
" has not implemented formatQueryV1");
}

/**
* Formats a 404 error when an endpoint or file wasn't found
* <p>
Expand Down
Loading

0 comments on commit 8cb99b1

Please sign in to comment.