Skip to content

Commit

Permalink
[HWKMETRICS-74] initial port of metric creation
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 14, 2015
1 parent bfcc952 commit 7632ceb
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,15 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import org.hawkular.metrics.api.jaxrs.ApiError;
import org.hawkular.metrics.api.jaxrs.callback.MetricCreatedCallback;
import org.hawkular.metrics.api.jaxrs.param.Duration;
import org.hawkular.metrics.api.jaxrs.param.Tags;
import org.hawkular.metrics.api.jaxrs.request.TagRequest;
Expand All @@ -59,17 +66,11 @@
import org.hawkular.metrics.core.api.BucketedOutput;
import org.hawkular.metrics.core.api.Buckets;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricAlreadyExistsException;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.MetricsService;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import rx.Observable;

/**
* @author Stefan Negrea
Expand Down Expand Up @@ -105,10 +106,26 @@ public void createAvailabilityMetric(@Suspended final AsyncResponse asyncRespons
return;
}
metric.setTenantId(tenantId);
ListenableFuture<Void> future = metricsService.createMetric(metric);
URI created = uriInfo.getBaseUriBuilder().path("/availability/{id}").build(metric.getId().getName());
MetricCreatedCallback metricCreatedCallback = new MetricCreatedCallback(asyncResponse, created);
Futures.addCallback(future, metricCreatedCallback);
Observable<Void> metricCreated = metricsService.createMetric(metric);
URI location = uriInfo.getBaseUriBuilder().path("/availability/{id}").build(metric.getId().getName());
// MetricCreatedCallback metricCreatedCallback = new MetricCreatedCallback(asyncResponse, created);
// Futures.addCallback(future, metricCreatedCallback);
metricCreated.subscribe(
nullArg -> {},
t -> {
if (t instanceof MetricAlreadyExistsException) {
MetricAlreadyExistsException exception = (MetricAlreadyExistsException) t;
String message = "A metric with name [" + exception.getMetric().getId().getName() + "] " +
"already exists";
asyncResponse.resume(Response.status(Status.CONFLICT).entity(new ApiError(message)).build());
} else {
String message = "Failed to create metric due to an unexpected error: "
+ Throwables.getRootCause(t).getMessage();
asyncResponse.resume(Response.serverError().entity(new ApiError(message)).build());
}
},
() -> asyncResponse.resume(Response.created(location).build())
);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import javax.ws.rs.core.UriInfo;

import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.wordnik.swagger.annotations.Api;
Expand All @@ -59,7 +60,6 @@
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import org.hawkular.metrics.api.jaxrs.ApiError;
import org.hawkular.metrics.api.jaxrs.callback.MetricCreatedCallback;
import org.hawkular.metrics.api.jaxrs.param.Duration;
import org.hawkular.metrics.api.jaxrs.param.Tags;
import org.hawkular.metrics.api.jaxrs.request.TagRequest;
Expand All @@ -70,6 +70,7 @@
import org.hawkular.metrics.core.api.GaugeBucketDataPoint;
import org.hawkular.metrics.core.api.GaugeData;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricAlreadyExistsException;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.MetricsService;
Expand Down Expand Up @@ -110,10 +111,26 @@ public void createGaugeMetric(@Suspended final AsyncResponse asyncResponse,
return;
}
metric.setTenantId(tenantId);
ListenableFuture<Void> future = metricsService.createMetric(metric);
URI created = uriInfo.getBaseUriBuilder().path("/gauges/{id}").build(metric.getId().getName());
MetricCreatedCallback metricCreatedCallback = new MetricCreatedCallback(asyncResponse, created);
Futures.addCallback(future, metricCreatedCallback);
Observable<Void> metricCreated = metricsService.createMetric(metric);
URI location = uriInfo.getBaseUriBuilder().path("/gauges/{id}").build(metric.getId().getName());
// MetricCreatedCallback metricCreatedCallback = new MetricCreatedCallback(asyncResponse, created);
// Futures.addCallback(future, metricCreatedCallback);
metricCreated.subscribe(
nullArg -> {},
t -> {
if (t instanceof MetricAlreadyExistsException) {
MetricAlreadyExistsException exception = (MetricAlreadyExistsException) t;
String message = "A metric with name [" + exception.getMetric().getId().getName() + "] " +
"already exists";
asyncResponse.resume(Response.status(Status.CONFLICT).entity(new ApiError(message)).build());
} else {
String message = "Failed to create metric due to an unexpected error: "
+ Throwables.getRootCause(t).getMessage();
asyncResponse.resume(Response.serverError().entity(new ApiError(message)).build());
}
},
() -> asyncResponse.resume(Response.created(location).build())
);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public interface MetricsService {

ListenableFuture<List<Tenant>> getTenants();

ListenableFuture<Void> createMetric(Metric<?> metric);
Observable<Void> createMetric(Metric<?> metric);

ListenableFuture<Optional<Metric<?>>> findMetric(String tenantId, MetricType type, MetricId id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,36 +453,49 @@ private List<String> loadTenantIds() {
}

@Override
public ListenableFuture<Void> createMetric(final Metric<?> metric) {
public Observable<Void> createMetric(final Metric<?> metric) {
ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric);
return Futures.transform(future, new AsyncFunction<ResultSet, Void>() {
@Override
public ListenableFuture<Void> apply(ResultSet resultSet) {
if (!resultSet.wasApplied()) {
throw new MetricAlreadyExistsException(metric);
}
// TODO Need error handling if either of the following updates fail
// If adding tags/retention fails, then we want to report the error to the
// client. Updating the retentions_idx table could also fail. We need to
// report that failure as well.
ResultSetFuture metadataFuture = dataAccess.addTagsAndDataRetention(metric);
ResultSetFuture tagsFuture = dataAccess.insertIntoMetricsTagsIndex(metric, metric.getTags());

List<ResultSetFuture> futures = new ArrayList<>();
futures.add(metadataFuture);
futures.add(tagsFuture);

if (metric.getDataRetention() != null) {
ResultSetFuture dataRetentionFuture = dataAccess.updateRetentionsIndex(metric);
dataRetentions.put(new DataRetentionKey(metric), metric.getDataRetention());
futures.add(dataRetentionFuture);
}

ListenableFuture<List<ResultSet>> insertsFuture = Futures.allAsList(futures);

return Futures.transform(insertsFuture, Functions.TO_VOID);
}
}, metricsTasks);
Observable<ResultSet> indexUpdated = RxUtil.from(future, metricsTasks);
return Observable.create(subscriber -> {
indexUpdated.subscribe(
resultSet -> {
if (!resultSet.wasApplied()) {
subscriber.onError(new MetricAlreadyExistsException(metric));

} else {
// TODO Need error handling if either of the following updates fail
// If adding tags/retention fails, then we want to report the error to the
// client. Updating the retentions_idx table could also fail. We need to
// report that failure as well.
ResultSetFuture metadataFuture = dataAccess.addTagsAndDataRetention(metric);
Observable<ResultSet> metadataUpdated = RxUtil.from(metadataFuture, metricsTasks);
ResultSetFuture tagsFuture = dataAccess.insertIntoMetricsTagsIndex(metric,
metric.getTags());
Observable<ResultSet> tagsUpdated = RxUtil.from(tagsFuture, metricsTasks);
Observable<ResultSet> metricUpdates;

if (metric.getDataRetention() != null) {
ResultSetFuture dataRetentionFuture = dataAccess.updateRetentionsIndex(metric);
Observable<ResultSet> dataRetentionUpdated = RxUtil.from(dataRetentionFuture,
metricsTasks);
dataRetentions.put(new DataRetentionKey(metric), metric.getDataRetention());
metricUpdates = Observable.merge(metadataUpdated, tagsUpdated, dataRetentionUpdated);
} else {
metricUpdates = Observable.merge(metadataUpdated, tagsUpdated);
}

metricUpdates.subscribe(
resultSets -> {},
// The error handling is the same as it was with Guava futures. That is, if any
// future fails, we treat the entire client request as a failure. We probably
// eventually want to implement more fine-grained error handling where we can
// notify the subscriber of what exactly fails.
subscriber::onError,
subscriber::onCompleted
);
}
});
});
}

@Override
Expand Down Expand Up @@ -549,7 +562,7 @@ public Observable<Void> addGaugeData(Observable<Gauge> gaugeObservable) {
// This is a first, rough cut at an RxJava implementation for storing metric data. This code will change but
// for now the goal is 1) replace ListenableFuture in the API with Observable and 2) make sure tests still
// pass. This means that the behavior basically remains the same as before. The idea here is to implement a
// pub/sub workflow.
// pub/sub workflow.

return Observable.create(subscriber -> {
Map<Gauge, ResultSetFuture> insertsMap = new HashMap<>();
Expand All @@ -565,7 +578,8 @@ public Observable<Void> addGaugeData(Observable<Gauge> gaugeObservable) {
ListenableFuture<List<ResultSet>> insertsFuture = Futures.allAsList(inserts);
Observable<List<ResultSet>> insertsObservable = RxUtil.from(insertsFuture, metricsTasks);
insertsObservable.subscribe(
resultSets -> {},
resultSets -> {
},
subscriber::onError,
subscriber::onCompleted
);
Expand Down

0 comments on commit 7632ceb

Please sign in to comment.