Skip to content

Commit

Permalink
[HWKMETRICS-74] update methods for fetching metric to use observables
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 26, 2015
1 parent 06e6883 commit 4ebac92
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import javax.inject.Inject;
Expand Down Expand Up @@ -115,8 +114,6 @@ public void createAvailabilityMetric(
metric.setTenantId(tenantId);
Observable<Void> metricCreated = metricsService.createMetric(metric);
URI location = uriInfo.getBaseUriBuilder().path("/availability/{id}").build(metric.getId().getName());
// MetricCreatedCallback metricCreatedCallback = new MetricCreatedCallback(asyncResponse, created);
// Futures.addCallback(future, metricCreatedCallback);
metricCreated.subscribe(
nullArg -> {},
t -> {
Expand All @@ -143,14 +140,14 @@ public void createAvailabilityMetric(
@ApiResponse(code = 204, message = "Query was successful, but no metrics definition is set."),
@ApiResponse(code = 500, message = "Unexpected error occurred while fetching metric's definition.",
response = ApiError.class) })
public void getAvailabilityMetric(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id) {
executeAsync(
asyncResponse,
() -> {
ListenableFuture<Optional<Metric<?>>> future = metricsService.findMetric(tenantId,
MetricType.AVAILABILITY, new MetricId(id));
return Futures.transform(future, ApiUtils.MAP_VALUE);
});
public void getAvailabilityMetric(@Suspended final AsyncResponse asyncResponse,
@HeaderParam("tenantId") String tenantId, @PathParam("id") String id) {

metricsService.findMetric(tenantId, MetricType.AVAILABILITY, new MetricId(id))
.subscribe(
optional -> asyncResponse.resume(ApiUtils.valueToResponse(optional)),
t -> asyncResponse.resume(ApiUtils.serverError(t))
);
}

@GET
Expand Down Expand Up @@ -389,4 +386,4 @@ public void findTaggedAvailabilityData(
});
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

Expand Down Expand Up @@ -131,9 +130,10 @@ public void createGaugeMetric(
"already exists";
asyncResponse.resume(Response.status(Status.CONFLICT).entity(new ApiError(message)).build());
} else {

String message = "Failed to create metric due to an unexpected error: "
+ Throwables.getRootCause(t).getMessage();
asyncResponse.resume(Response.serverError().entity(new ApiError(message)).build());
asyncResponse.resume(ApiUtils.serverError(t, message));
}
},
() -> asyncResponse.resume(Response.created(location).build())
Expand All @@ -148,14 +148,14 @@ public void createGaugeMetric(
@ApiResponse(code = 204, message = "Query was successful, but no metrics definition is set."),
@ApiResponse(code = 500, message = "Unexpected error occurred while fetching metric's definition.",
response = ApiError.class) })
public void getGaugeMetric(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id) {
executeAsync(
asyncResponse,
() -> {
ListenableFuture<Optional<Metric<?>>> future = metricsService.findMetric(tenantId, MetricType.GAUGE,
new MetricId(id));
return Futures.transform(future, ApiUtils.MAP_VALUE);
});
public void getGaugeMetric(@Suspended final AsyncResponse asyncResponse,
@HeaderParam("tenantId") String tenantId, @PathParam("id") String id) {

metricsService.findMetric(tenantId, MetricType.GAUGE, new MetricId(id))
.subscribe(
optional -> asyncResponse.resume(ApiUtils.valueToResponse(optional)),
t -> asyncResponse.resume(ApiUtils.serverError(t))
);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public static Response serverError(Throwable t) {
return Response.serverError().entity(new ApiError(Throwables.getRootCause(t).getMessage())).build();
}

public static Response valueToResponse(Optional<?> optional) {
return optional.map(value -> Response.ok(value).build()).orElse(noContent());
}

public static final Function<Void, Response> MAP_VOID = v -> Response.ok().build();

public static final Function<List<Void>, Response> MAP_LIST_VOID = v -> Response.ok().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public enum State {

Observable<Void> createMetric(Metric<?> metric);

ListenableFuture<Optional<Metric<?>>> findMetric(String tenantId, MetricType type, MetricId id);
Observable<Optional<? extends Metric<? extends MetricData>>> findMetric(String tenantId, MetricType type,
MetricId id);

Observable<Metric<?>> findMetrics(String tenantId, MetricType type);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public interface DataAccess {

ResultSetFuture insertMetricInMetricsIndex(Metric<?> metric);

ResultSetFuture findMetric(String tenantId, MetricType type, MetricId id, long dpart);
Observable<ResultSet> findMetric(String tenantId, MetricType type, MetricId id, long dpart);

ResultSetFuture addTagsAndDataRetention(Metric<?> metric);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,9 @@ private Map<String, String> getTags(Metric<? extends MetricData> metric) {
}

@Override
public ResultSetFuture findMetric(String tenantId, MetricType type, MetricId id, long dpart) {
return session.executeAsync(findMetric.bind(tenantId, type.getCode(), id.getName(),
id.getInterval().toString(), dpart));
public Observable<ResultSet> findMetric(String tenantId, MetricType type, MetricId id, long dpart) {
return rxSession.execute(findMetric.bind(tenantId, type.getCode(), id.getName(), id.getInterval().toString(),
dpart));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,22 +390,37 @@ public Observable<Void> createMetric(final Metric<?> metric) {
}

@Override
public ListenableFuture<Optional<Metric<?>>> findMetric(final String tenantId, final MetricType type,
final MetricId id) {
ResultSetFuture future = dataAccess.findMetric(tenantId, type, id, Metric.DPART);
return Futures.transform(future, (ResultSet resultSet) -> {
if (resultSet.isExhausted()) {
return Optional.empty();
}
Row row = resultSet.one();
if (type == MetricType.GAUGE) {
return Optional.of(new Gauge(tenantId, id, row.getMap(5, String.class, String.class),
row.getInt(6)));
} else {
return Optional.of(new Availability(tenantId, id, row.getMap(5, String.class, String.class),
row.getInt(6)));
}
}, metricsTasks);
public Observable<Optional<? extends Metric<? extends MetricData>>> findMetric(final String tenantId,
final MetricType type, final MetricId id) {

return dataAccess.findMetric(tenantId, type, id, Metric.DPART)
.flatMap(Observable::from)
.map(row -> {
if (type == MetricType.GAUGE) {
return Optional.of(new Gauge(tenantId, id, row.getMap(5, String.class, String.class),
row.getInt(6)));
} else {
return Optional.of(new Availability(tenantId, id, row.getMap(5, String.class, String.class),
row.getInt(6)));
}
})
.defaultIfEmpty(Optional.empty());


// ResultSetFuture future = dataAccess.findMetric(tenantId, type, id, Metric.DPART);
// return Futures.transform(future, (ResultSet resultSet) -> {
// if (resultSet.isExhausted()) {
// return Optional.empty();
// }
// Row row = resultSet.one();
// if (type == MetricType.GAUGE) {
// return Optional.of(new Gauge(tenantId, id, row.getMap(5, String.class, String.class),
// row.getInt(6)));
// } else {
// return Optional.of(new Availability(tenantId, id, row.getMap(5, String.class, String.class),
// row.getInt(6)));
// }
// }, metricsTasks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ResultSetFuture insertMetricInMetricsIndex(Metric metric) {
}

@Override
public ResultSetFuture findMetric(String tenantId, MetricType type, MetricId id, long dpart) {
public Observable<ResultSet> findMetric(String tenantId, MetricType type, MetricId id, long dpart) {
return delegate.findMetric(tenantId, type, id, dpart);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricAlreadyExistsException;
import org.hawkular.metrics.core.api.MetricData;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.Retention;
Expand Down Expand Up @@ -155,8 +156,8 @@ public void createTenants() throws Exception {

@Test
public void createAndFindMetrics() throws Exception {
Optional<Metric<?>> result = getUninterruptibly(metricsService.findMetric("t1", GAUGE,
new MetricId("does-not-exist")));
Optional<? extends Metric<? extends MetricData>> result = metricsService.findMetric("t1", GAUGE,
new MetricId("does-not-exist")).toBlocking().last();
assertNotNull(result, "null should not be returned when metric is not found");
assertFalse(result.isPresent(), "Did not expect a value when the metric is not found");

Expand All @@ -165,17 +166,16 @@ public void createAndFindMetrics() throws Exception {
24);
metricsService.createMetric(m1).toBlocking().lastOrDefault(null);

ListenableFuture<Optional<Metric<?>>> queryFuture = metricsService.findMetric(m1.getTenantId(), m1.getType(),
m1.getId());
Metric actual = getUninterruptibly(queryFuture).get();
Gauge actual = (Gauge) metricsService.findMetric(m1.getTenantId(), m1.getType(), m1.getId())
.toBlocking().last().get();
assertEquals(actual, m1, "The metric does not match the expected value");

Availability m2 = new Availability("t1", new MetricId("m2"), ImmutableMap.of("a3", "3", "a4", "3"));
metricsService.createMetric(m2).toBlocking().lastOrDefault(null);

queryFuture = metricsService.findMetric(m2.getTenantId(), m2.getType(), m2.getId());
actual = getUninterruptibly(queryFuture).get();
assertEquals(actual, m2, "The metric does not match the expected value");
Availability actualAvail = (Availability) metricsService.findMetric(m2.getTenantId(), m2.getType(), m2.getId())
.toBlocking().last().get();
assertEquals(actualAvail, m2, "The metric does not match the expected value");

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
Expand Down Expand Up @@ -223,9 +223,8 @@ public void updateMetricTags() throws Exception {
ListenableFuture<Void> deleteFuture = metricsService.deleteTags(metric, deletions);
getUninterruptibly(deleteFuture);

ListenableFuture<Optional<Metric<?>>> queryFuture = metricsService.findMetric(metric.getTenantId(), GAUGE,
metric.getId());
Metric<?> updatedMetric = getUninterruptibly(queryFuture).get();
Metric<? extends MetricData> updatedMetric = metricsService.findMetric(metric.getTenantId(), GAUGE,
metric.getId()).toBlocking().last().get();

assertEquals(updatedMetric.getTags(), ImmutableMap.of("a2", "two", "a3", "3"),
"The updated meta data does not match the expected values");
Expand Down Expand Up @@ -960,7 +959,7 @@ private void assertMetricEquals(Metric actual, Metric expected) {
private void assertMetricIndexMatches(String tenantId, MetricType type, List<? extends Metric> expected)
throws Exception {
List<Metric<?>> actualIndex = ImmutableList.copyOf(metricsService.findMetrics(tenantId, type).toBlocking()
.toIterable());
.toIterable());
assertEquals(actualIndex, expected, "The metrics index results do not match");
}

Expand Down

0 comments on commit 4ebac92

Please sign in to comment.