Skip to content

Commit

Permalink
Migrating MetricsServiceCassandra Availability and tag related functions
Browse files Browse the repository at this point in the history
to RxJava
  • Loading branch information
Michael Burman committed May 21, 2015
1 parent 27be701 commit 7e36093
Show file tree
Hide file tree
Showing 14 changed files with 536 additions and 556 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.api.jaxrs;

import com.datastax.driver.core.ResultSet;

import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;

import rx.Observer;

/**
* Observer that returns empty 200 if everything went alright and ApiError if there was an exception.
*
* @author miburman
*/
public class ResultSetObserver implements Observer<ResultSet> {

private AsyncResponse asyncResponse;

public ResultSetObserver(AsyncResponse asyncResponse) {
this.asyncResponse = asyncResponse;
}

@Override
public void onCompleted() {
asyncResponse.resume(Response.ok().build());
}

@Override
public void onError(Throwable t) {
asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build());
}

@Override
public void onNext(ResultSet rows) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
Expand All @@ -50,6 +49,7 @@
import javax.ws.rs.core.UriInfo;

import org.hawkular.metrics.api.jaxrs.ApiError;
import org.hawkular.metrics.api.jaxrs.ResultSetObserver;
import org.hawkular.metrics.api.jaxrs.param.Duration;
import org.hawkular.metrics.api.jaxrs.param.Tags;
import org.hawkular.metrics.api.jaxrs.request.TagRequest;
Expand All @@ -64,6 +64,7 @@
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.MetricsService;

import com.datastax.driver.core.ResultSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.wordnik.swagger.annotations.Api;
Expand All @@ -72,6 +73,8 @@
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;

import rx.Observable;

/**
* @author Stefan Negrea
*
Expand Down Expand Up @@ -144,13 +147,10 @@ public void getAvailabilityMetricTags(
@Suspended final AsyncResponse asyncResponse,
@PathParam("id") String id
) {
executeAsync(
asyncResponse,
() -> {
ListenableFuture<Map<String, String>> future = metricsService.getMetricTags(tenantId,
MetricType.AVAILABILITY, new MetricId(id));
return Futures.transform(future, ApiUtils.MAP_MAP);
});
metricsService.getMetricTags(tenantId, MetricType.AVAILABILITY, new MetricId(id)).subscribe(
optional -> asyncResponse.resume(ApiUtils.valueToResponse(optional)),
t -> asyncResponse.resume(ApiUtils.serverError(t)));
// @TODO Above is repeated code, refactor (GaugeHandler has it also)
}

@PUT
Expand All @@ -165,11 +165,8 @@ public void updateAvailabilityMetricTags(
@PathParam("id") String id,
@ApiParam(required = true) Map<String, String> tags
) {
executeAsync(asyncResponse, () -> {
Availability metric = new Availability(tenantId, new MetricId(id));
ListenableFuture<Void> future = metricsService.addTags(metric, tags);
return Futures.transform(future, ApiUtils.MAP_VOID);
});
Availability metric = new Availability(tenantId, new MetricId(id));
metricsService.addTags(metric, tags).subscribe(new ResultSetObserver(asyncResponse));
}

@DELETE
Expand All @@ -185,11 +182,8 @@ public void deleteAvailabilityMetricTags(
@PathParam("id") String id,
@ApiParam("Tag list") @PathParam("tags") Tags tags
) {
executeAsync(asyncResponse, () -> {
Availability metric = new Availability(tenantId, new MetricId(id));
ListenableFuture<Void> future = metricsService.deleteTags(metric, tags.getTags());
return Futures.transform(future, ApiUtils.MAP_VOID);
});
Availability metric = new Availability(tenantId, new MetricId(id));
metricsService.deleteTags(metric, tags.getTags()).subscribe(new ResultSetObserver(asyncResponse));
}

@POST
Expand Down Expand Up @@ -248,14 +242,19 @@ public void findAvailabilityDataByTags(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Tag list", required = true) @QueryParam("tags") Tags tags
) {
executeAsync(asyncResponse, () -> {
if (tags == null) {
return Futures.immediateFuture(badRequest(new ApiError("Missing tags query")));
}
ListenableFuture<Map<MetricId, Set<AvailabilityData>>> future;
future = metricsService.findAvailabilityByTags(tenantId, tags.getTags());
return Futures.transform(future, ApiUtils.MAP_MAP);
});
if (tags == null) {
asyncResponse.resume(badRequest(new ApiError("Missing tags query")));
} else {
// @TODO Repeated code, refactor (in GaugeHandler also)
metricsService.findAvailabilityByTags(tenantId, tags.getTags()).subscribe(m -> {
if (m.isEmpty()) {
asyncResponse.resume(Response.noContent());
} else {
asyncResponse.resume(Response.ok(m).build());
}
}, t -> asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build()));

}
}

@GET
Expand Down Expand Up @@ -339,19 +338,15 @@ public void tagAvailabilityData(
@PathParam("id") final String id,
@ApiParam(required = true) TagRequest params
) {
executeAsync(
asyncResponse,
() -> {
ListenableFuture<List<AvailabilityData>> future;
Availability metric = new Availability(tenantId, new MetricId(id));
if (params.getTimestamp() != null) {
future = metricsService.tagAvailabilityData(metric, params.getTags(), params.getTimestamp());
} else {
future = metricsService.tagAvailabilityData(metric, params.getTags(), params.getStart(),
params.getEnd());
}
return Futures.transform(future, (List<AvailabilityData> data) -> Response.ok().build());
});
Observable<ResultSet> resultSetObservable;
Availability metric = new Availability(tenantId, new MetricId(id));
if (params.getTimestamp() != null) {
resultSetObservable = metricsService.tagAvailabilityData(metric, params.getTags(), params.getTimestamp());
} else {
resultSetObservable = metricsService.tagAvailabilityData(metric, params.getTags(), params.getStart(),
params.getEnd());
}
resultSetObservable.subscribe(new ResultSetObserver(asyncResponse));
}

@GET
Expand All @@ -366,11 +361,14 @@ public void findTaggedAvailabilityData(
@Suspended final AsyncResponse asyncResponse,
@ApiParam("Tag list") @PathParam("tags") Tags tags
) {
executeAsync(asyncResponse, () -> {
ListenableFuture<Map<MetricId, Set<AvailabilityData>>> future;
future = metricsService.findAvailabilityByTags(tenantId, tags.getTags());
return Futures.transform(future, ApiUtils.MAP_MAP);
});
metricsService.findAvailabilityByTags(tenantId, tags.getTags())
.subscribe(m -> { // @TODO Repeated code, refactor and use Optional?
if (m.isEmpty()) {
asyncResponse.resume(Response.noContent());
} else {
asyncResponse.resume(Response.ok(m).build());
}
}, t -> asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import static org.hawkular.metrics.api.jaxrs.filter.TenantFilter.TENANT_HEADER_NAME;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.badRequest;
import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.emptyPayload;
Expand Down Expand Up @@ -51,7 +49,17 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import com.datastax.driver.core.ResultSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;

import org.hawkular.metrics.api.jaxrs.ApiError;
import org.hawkular.metrics.api.jaxrs.ResultSetObserver;
import org.hawkular.metrics.api.jaxrs.param.Duration;
import org.hawkular.metrics.api.jaxrs.param.Tags;
import org.hawkular.metrics.api.jaxrs.request.TagRequest;
Expand All @@ -66,15 +74,6 @@
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.MetricsService;

import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;

import rx.Observable;

/**
Expand Down Expand Up @@ -150,13 +149,11 @@ public void getGaugeMetricTags(
@Suspended final AsyncResponse asyncResponse,
@PathParam("id") String id
) {
executeAsync(
asyncResponse,
() -> {
ListenableFuture<Map<String, String>> future = metricsService.getMetricTags(tenantId, MetricType.GAUGE,
new MetricId(id));
return Futures.transform(future, ApiUtils.MAP_MAP);
});
metricsService.getMetricTags(tenantId, MetricType.GAUGE, new MetricId(id))
.subscribe(
optional -> asyncResponse.resume(ApiUtils.valueToResponse(optional)),
t ->asyncResponse.resume(ApiUtils.serverError(t))
);
}

@PUT
Expand All @@ -171,11 +168,8 @@ public void updateGaugeMetricTags(
@PathParam("id") String id,
@ApiParam(required = true) Map<String, String> tags
) {
executeAsync(asyncResponse, () -> {
Gauge metric = new Gauge(tenantId, new MetricId(id));
ListenableFuture<Void> future = metricsService.addTags(metric, tags);
return Futures.transform(future, ApiUtils.MAP_VOID);
});
Gauge metric = new Gauge(tenantId, new MetricId(id));
metricsService.addTags(metric, tags).subscribe(new ResultSetObserver(asyncResponse));
}

@DELETE
Expand All @@ -191,11 +185,8 @@ public void deleteGaugeMetricTags(
@PathParam("id") String id,
@ApiParam("Tag list") @PathParam("tags") Tags tags
) {
executeAsync(asyncResponse, () -> {
Gauge metric = new Gauge(tenantId, new MetricId(id));
ListenableFuture<Void> future = metricsService.deleteTags(metric, tags.getTags());
return Futures.transform(future, ApiUtils.MAP_VOID);
});
Gauge metric = new Gauge(tenantId, new MetricId(id));
metricsService.deleteTags(metric, tags.getTags()).subscribe(new ResultSetObserver(asyncResponse));
}

@POST
Expand Down Expand Up @@ -258,14 +249,17 @@ public void findGaugeDataByTags(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Tag list", required = true) @QueryParam("tags") Tags tags
) {
executeAsync(asyncResponse, () -> {
if (tags == null) {
return Futures.immediateFuture(badRequest(new ApiError("Missing tags query")));
}
ListenableFuture<Map<MetricId, Set<GaugeData>>> future;
future = metricsService.findGaugeDataByTags(tenantId, tags.getTags());
return Futures.transform(future, ApiUtils.MAP_MAP);
});
if (tags == null) {
asyncResponse.resume(badRequest(new ApiError("Missing tags query")));
} else {
metricsService.findGaugeDataByTags(tenantId, tags.getTags()).subscribe(m -> {
if (m.isEmpty()) {
asyncResponse.resume(Response.noContent());
} else {
asyncResponse.resume(Response.ok(m).build());
}
}, t -> asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build()));
}
}

@GET
Expand Down Expand Up @@ -422,43 +416,43 @@ public void findTaggedGaugeData(
@Suspended final AsyncResponse asyncResponse,
@ApiParam("Tag list") @PathParam("tags") Tags tags
) {
executeAsync(
asyncResponse,
() -> {
ListenableFuture<Map<MetricId, Set<GaugeData>>> queryFuture;
queryFuture = metricsService.findGaugeDataByTags(tenantId, tags.getTags());
ListenableFuture<Map<String, Set<GaugeData>>> resultFuture = Futures.transform(queryFuture,
new Function<Map<MetricId, Set<GaugeData>>, Map<String, Set<GaugeData>>>() {
@Override
public Map<String, Set<GaugeData>> apply(Map<MetricId, Set<GaugeData>> input) {
Map<String, Set<GaugeData>> result = new HashMap<>(input.size());
for (Map.Entry<MetricId, Set<GaugeData>> entry : input.entrySet()) {
result.put(entry.getKey().getName(), entry.getValue());
}
return result;
}
});
return Futures.transform(resultFuture, ApiUtils.MAP_MAP);
});
Observable<Map<MetricId, Set<GaugeData>>> gaugeDataByTags = metricsService.findGaugeDataByTags(tenantId,
tags.getTags());

gaugeDataByTags.map(input -> {
Map<String, Set<GaugeData>> result = new HashMap<>(input.size());
for (Map.Entry<MetricId, Set<GaugeData>> entry : input.entrySet()) {
result.put(entry.getKey().getName(), entry.getValue());
}
return result;
}).subscribe(m -> { // @TODO Repeated code, refactor and use Optional?
if (m.isEmpty()) {
asyncResponse.resume(Response.noContent());
} else {
asyncResponse.resume(Response.ok(m).build());
}
}, t -> asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build()));

}

@POST
@Path("/{id}/tag")
@ApiOperation(value = "Add or update gauge metric's tags.")
@ApiResponses(value = { @ApiResponse(code = 200, message = "Tags were modified successfully.") })
@ApiResponses(value = { @ApiResponse(code = 200, message = "Tags were modified successfully."),
@ApiResponse(code = 500, message = "Processing tags failed") })
public void tagGaugeData(
@Suspended final AsyncResponse asyncResponse,
@PathParam("id") final String id, @ApiParam(required = true) TagRequest params
) {
executeAsync(asyncResponse, () -> {
ListenableFuture<List<GaugeData>> future;
Gauge metric = new Gauge(tenantId, new MetricId(id));
if (params.getTimestamp() != null) {
future = metricsService.tagGaugeData(metric, params.getTags(), params.getTimestamp());
} else {
future = metricsService.tagGaugeData(metric, params.getTags(), params.getStart(), params.getEnd());
}
return Futures.transform(future, (List<GaugeData> data) -> Response.ok().build());
});
Observable<ResultSet> resultSetObservable;
Gauge metric = new Gauge(tenantId, new MetricId(id));
if (params.getTimestamp() != null) {
resultSetObservable = metricsService.tagGaugeData(metric, params.getTags(), params.getTimestamp());
} else {
resultSetObservable = metricsService.tagGaugeData(metric, params.getTags(), params.getStart(), params
.getEnd());
}

resultSetObservable.subscribe(new ResultSetObserver(asyncResponse));
}
}

0 comments on commit 7e36093

Please sign in to comment.