Skip to content

Commit

Permalink
Merge pull request #496 from jsanda/hwkmetrics-384
Browse files Browse the repository at this point in the history
[HWKMETRICS-384] Add string metric type
  • Loading branch information
Stefan Negrea committed Apr 29, 2016
2 parents 771aaf3 + 16fec29 commit 02b9def
Show file tree
Hide file tree
Showing 14 changed files with 738 additions and 3 deletions.
Expand Up @@ -25,6 +25,7 @@
import static org.hawkular.metrics.model.MetricType.AVAILABILITY;
import static org.hawkular.metrics.model.MetricType.COUNTER;
import static org.hawkular.metrics.model.MetricType.GAUGE;
import static org.hawkular.metrics.model.MetricType.STRING;

import java.net.URI;
import java.util.Map;
Expand Down Expand Up @@ -215,10 +216,13 @@ public void addMetricsData(
metricsRequest.getAvailabilities(), AVAILABILITY);
Observable<Metric<Long>> counters = Functions.metricToObservable(tenantId, metricsRequest.getCounters(),
COUNTER);
Observable<Metric<String>> strings = Functions.metricToObservable(tenantId, metricsRequest.getStrings(),
STRING);

metricsService.addDataPoints(GAUGE, gauges)
.mergeWith(metricsService.addDataPoints(AVAILABILITY, availabilities))
.mergeWith(metricsService.addDataPoints(COUNTER, counters))
.mergeWith(metricsService.addDataPoints(STRING, strings))
.subscribe(
aVoid -> {
},
Expand Down
@@ -0,0 +1,169 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.api.jaxrs.handler;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import static org.hawkular.metrics.api.jaxrs.filter.TenantFilter.TENANT_HEADER_NAME;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.badRequest;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.serverError;
import static org.hawkular.metrics.model.MetricType.STRING;

import java.util.List;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

import org.hawkular.metrics.api.jaxrs.handler.observer.ResultSetObserver;
import org.hawkular.metrics.api.jaxrs.util.ApiUtils;
import org.hawkular.metrics.core.service.Functions;
import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.core.service.Order;
import org.hawkular.metrics.model.ApiError;
import org.hawkular.metrics.model.DataPoint;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.param.TimeRange;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import rx.Observable;

/**
* @author jsanda
*/
@Path("/strings")
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Api(tags = "String", description = "This resource is experimental and changes may be made to it in subsequent " +
"releases that are not backwards compatible.")
public class StringHandler {

@Inject
private MetricsService metricsService;

@HeaderParam(TENANT_HEADER_NAME)
private String tenantId;

@POST
@Path("/{id}/raw")
@ApiOperation(value = "Add data for a single string metric.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Adding data succeeded."),
@ApiResponse(code = 400, message = "Missing or invalid payload", response = ApiError.class),
@ApiResponse(code = 500, message = "Unexpected error happened while storing the data",
response = ApiError.class)
})
public void addStringForMetric(
@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id,
@ApiParam(value = "List of string datapoints", required = true) List<DataPoint<String>> data
) {
Observable<Metric<String>> metrics = Functions.dataPointToObservable(tenantId, id, data,
STRING);
Observable<Void> observable = metricsService.addDataPoints(STRING, metrics);
observable.subscribe(new ResultSetObserver(asyncResponse));
}

@POST
@Path("/raw")
@ApiOperation(value = "Add metric data for multiple string metrics in a single call.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Adding data succeeded."),
@ApiResponse(code = 400, message = "Missing or invalid payload", response = ApiError.class),
@ApiResponse(code = 500, message = "Unexpected error happened while storing the data",
response = ApiError.class)
})
public void addStringData(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "List of string metrics", required = true)
@JsonDeserialize()
List<Metric<String>> availabilities
) {
Observable<Metric<String>> metrics = Functions.metricToObservable(tenantId, availabilities,
STRING);
Observable<Void> observable = metricsService.addDataPoints(STRING, metrics);
observable.subscribe(new ResultSetObserver(asyncResponse));
}

@GET
@Path("/{id}/raw")
@ApiOperation(value = "Retrieve string data.", response = DataPoint.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Successfully fetched string data."),
@ApiResponse(code = 204, message = "No string data was found."),
@ApiResponse(code = 500, message = "Unexpected error occurred while fetching string data.",
response = ApiError.class)
})
public void findRawStringData(
@Suspended AsyncResponse asyncResponse,
@PathParam("id") String id,
@ApiParam(value = "Defaults to now - 8 hours") @QueryParam("start") Long start,
@ApiParam(value = "Defaults to now") @QueryParam("end") Long end,
@ApiParam(value = "Set to true to return only distinct, contiguous values")
@QueryParam("distinct") @DefaultValue("false") Boolean distinct,
@ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
@ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
) {

TimeRange timeRange = new TimeRange(start, end);
if (!timeRange.isValid()) {
asyncResponse.resume(badRequest(new ApiError(timeRange.getProblem())));
return;
}

MetricId<String> metricId = new MetricId<>(tenantId, STRING, id);
if (limit != null) {
if (order == null) {
if (start == null && end != null) {
order = Order.DESC;
} else if (start != null && end == null) {
order = Order.ASC;
} else {
order = Order.DESC;
}
}
} else {
limit = 0;
}

if (order == null) {
order = Order.DESC;
}

metricsService
.findStringData(metricId, timeRange.getStart(), timeRange.getEnd(), distinct, limit, order)
.toList()
.map(ApiUtils::collectionToResponse)
.subscribe(asyncResponse::resume, t -> asyncResponse.resume(serverError(t)));
}

}
Expand Up @@ -62,12 +62,16 @@ public interface DataAccess {

Observable<Integer> insertGaugeData(Metric<Double> metric, int ttl);

Observable<Integer> insertStringData(Metric<String> metric, int ttl, int maxSize);

Observable<Integer> insertCounterData(Metric<Long> counter, int ttl);

Observable<Row> findCounterData(MetricId<Long> id, long startTime, long endTime, int limit, Order order);

Observable<Row> findGaugeData(MetricId<Double> id, long startTime, long endTime, int limit, Order order);

Observable<Row> findStringData(MetricId<String> id, long startTime, long endTime, int limit, Order order);

Observable<Row> findAvailabilityData(MetricId<AvailabilityType> id, long startTime, long endTime, int limit,
Order order);

Expand Down
Expand Up @@ -23,6 +23,7 @@
import static org.hawkular.metrics.model.MetricType.AVAILABILITY;
import static org.hawkular.metrics.model.MetricType.COUNTER;
import static org.hawkular.metrics.model.MetricType.GAUGE;
import static org.hawkular.metrics.model.MetricType.STRING;

import java.nio.ByteBuffer;
import java.util.Map;
Expand Down Expand Up @@ -89,6 +90,10 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement insertCounterDataWithTags;

private PreparedStatement insertStringData;

private PreparedStatement insertStringDataWithTags;

private PreparedStatement findCounterDataExclusive;

private PreparedStatement findCounterDataExclusiveWithLimit;
Expand All @@ -105,6 +110,14 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement findGaugeDataByDateRangeExclusiveWithLimitASC;

private PreparedStatement findStringDataByDateRangeExclusive;

private PreparedStatement findStringDataByDateRangeExclusiveWithLimit;

private PreparedStatement findStringDataByDateRangeExclusiveASC;

private PreparedStatement findStringDataByDateRangeExclusiveWithLimitASC;

private PreparedStatement findAvailabilityByDateRangeInclusive;

private PreparedStatement deleteGaugeMetric;
Expand Down Expand Up @@ -217,6 +230,18 @@ protected void initPreparedStatements() {
"SET n_value = ?, tags = ? " +
"WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = ? AND time = ? ");

insertStringData = session.prepare(
"UPDATE data " +
"USING TTL ? " +
"SET s_value = ? " +
"WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = ? AND time = ?");

insertStringDataWithTags = session.prepare(
"UPDATE data " +
"USING TTL ? " +
"SET s_value = ?, tags = ? " +
"WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = ? AND time = ? ");

insertCounterData = session.prepare(
"UPDATE data " +
"USING TTL ? " +
Expand Down Expand Up @@ -249,6 +274,26 @@ protected void initPreparedStatements() {
" AND time < ? ORDER BY time ASC" +
" LIMIT ?");

findStringDataByDateRangeExclusive = session.prepare(
"SELECT time, data_retention, s_value, tags FROM data " +
"WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = ? AND time >= ? AND time < ?");

findStringDataByDateRangeExclusiveWithLimit = session.prepare(
"SELECT time, data_retention, s_value, tags FROM data " +
" WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = ? AND time >= ? AND time < ?" +
" LIMIT ?");

findStringDataByDateRangeExclusiveASC = session.prepare(
"SELECT time, data_retention, s_value, tags FROM data " +
"WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = ? AND time >= ?" +
" AND time < ? ORDER BY time ASC");

findStringDataByDateRangeExclusiveWithLimitASC = session.prepare(
"SELECT time, data_retention, s_value, tags FROM data" +
" WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = ? AND time >= ?" +
" AND time < ? ORDER BY time ASC" +
" LIMIT ?");

findCounterDataExclusive = session.prepare(
"SELECT time, data_retention, l_value, tags FROM data " +
" WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = ? AND time >= ? AND time < ? ");
Expand Down Expand Up @@ -443,6 +488,26 @@ public Observable<Integer> insertGaugeData(Metric<Double> gauge, int ttl) {
.flatMap(batch -> rxSession.execute(batch).map(resultSet -> batch.size()));
}

@Override public Observable<Integer> insertStringData(Metric<String> metric, int ttl, int maxSize) {
return Observable.from(metric.getDataPoints())
.map(dataPoint -> {
if (maxSize != -1 && dataPoint.getValue().length() > maxSize) {
throw new IllegalArgumentException(dataPoint + " exceeds max string length of " + maxSize +
" characters");
}

if (dataPoint.getTags().isEmpty()) {
return bindDataPoint(insertStringData, metric, dataPoint.getValue(), dataPoint.getTimestamp(),
ttl);
} else {
return bindDataPoint(insertStringDataWithTags, metric, dataPoint.getValue(),
dataPoint.getTags(), dataPoint.getTimestamp(), ttl);
}
})
.compose(new BatchStatementTransformer())
.flatMap(batch -> rxSession.execute(batch).map(resultSet -> batch.size()));
}

@Override
public Observable<Integer> insertCounterData(Metric<Long> counter, int ttl) {
return Observable.from(counter.getDataPoints())
Expand Down Expand Up @@ -522,6 +587,29 @@ public Observable<Row> findGaugeData(MetricId<Double> id, long startTime, long e
}
}

@Override
public Observable<Row> findStringData(MetricId<String> id, long startTime, long endTime, int limit, Order order) {
if (order == Order.ASC) {
if (limit <= 0) {
return rxSession.executeAndFetch(findStringDataByDateRangeExclusiveASC.bind(id.getTenantId(),
STRING.getCode(), id.getName(), DPART, getTimeUUID(startTime), getTimeUUID(endTime)));
} else {
return rxSession.executeAndFetch(findStringDataByDateRangeExclusiveWithLimitASC.bind(
id.getTenantId(), GAUGE.getCode(), id.getName(), DPART, getTimeUUID(startTime),
getTimeUUID(endTime), limit));
}
} else {
if (limit <= 0) {
return rxSession.executeAndFetch(findStringDataByDateRangeExclusive.bind(id.getTenantId(),
STRING.getCode(), id.getName(), DPART, getTimeUUID(startTime), getTimeUUID(endTime)));
} else {
return rxSession.executeAndFetch(findStringDataByDateRangeExclusiveWithLimit.bind(id.getTenantId(),
STRING.getCode(), id.getName(), DPART, getTimeUUID(startTime), getTimeUUID(endTime),
limit));
}
}
}

@Override
public Observable<Row> findAvailabilityData(MetricId<AvailabilityType> id, long startTime, long endTime,
int limit, Order order) {
Expand Down
Expand Up @@ -61,6 +61,13 @@ private enum AVAILABILITY_COLS {
TAGS
}

private enum STRING_COLS {
TIME,
DATA_RETENTION,
VALUE,
TAGS
}

private Functions() {
}

Expand All @@ -80,6 +87,13 @@ public static DataPoint<Long> getCounterDataPoint(Row row) {
row.getMap(COUNTER_COLS.TAGS.ordinal(), String.class, String.class));
}

public static DataPoint<String> getStringDataPoint(Row row) {
return new DataPoint<>(
UUIDs.unixTimestamp(row.getUUID(STRING_COLS.TIME.ordinal())),
row.getString(STRING_COLS.VALUE.ordinal()),
row.getMap(STRING_COLS.TAGS.ordinal(), String.class, String.class));
}

public static DataPoint<AvailabilityType> getAvailabilityDataPoint(Row row) {
return new DataPoint<>(
UUIDs.unixTimestamp(row.getUUID(AVAILABILITY_COLS.TIME.ordinal())),
Expand Down
Expand Up @@ -233,6 +233,9 @@ Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<Availabili
Observable<List<AvailabilityBucketPoint>> findAvailabilityStats(MetricId<AvailabilityType> metricId, long start,
long end, Buckets buckets);

Observable<DataPoint<String>> findStringData(MetricId<String> id, long start, long end, boolean distinct,
int limit, Order order);

/**
* Check if a metric has been stored in the system.
*/
Expand Down

0 comments on commit 02b9def

Please sign in to comment.