diff --git a/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/ResultSetObserver.java b/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/ResultSetObserver.java new file mode 100644 index 000000000..8d8e85232 --- /dev/null +++ b/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/ResultSetObserver.java @@ -0,0 +1,53 @@ +/* + * Copyright 2014-2015 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.hawkular.metrics.api.jaxrs; + +import com.datastax.driver.core.ResultSet; + +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.core.Response; + +import rx.Observer; + +/** + * Observer that returns empty 200 if everything went alright and ApiError if there was an exception. + * + * @author miburman + */ +public class ResultSetObserver implements Observer { + + private AsyncResponse asyncResponse; + + public ResultSetObserver(AsyncResponse asyncResponse) { + this.asyncResponse = asyncResponse; + } + + @Override + public void onCompleted() { + asyncResponse.resume(Response.ok().build()); + } + + @Override + public void onError(Throwable t) { + asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build()); + } + + @Override + public void onNext(ResultSet rows) { + + } +} diff --git a/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/handler/AvailabilityHandler.java b/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/handler/AvailabilityHandler.java index f0f88c4ee..3d9a94a73 100644 --- a/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/handler/AvailabilityHandler.java +++ b/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/handler/AvailabilityHandler.java @@ -30,7 +30,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import javax.inject.Inject; import javax.ws.rs.Consumes; @@ -50,6 +49,7 @@ import javax.ws.rs.core.UriInfo; import org.hawkular.metrics.api.jaxrs.ApiError; +import org.hawkular.metrics.api.jaxrs.ResultSetObserver; import org.hawkular.metrics.api.jaxrs.param.Duration; import org.hawkular.metrics.api.jaxrs.param.Tags; import org.hawkular.metrics.api.jaxrs.request.TagRequest; @@ -64,6 +64,7 @@ import org.hawkular.metrics.core.api.MetricType; import org.hawkular.metrics.core.api.MetricsService; +import com.datastax.driver.core.ResultSet; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.wordnik.swagger.annotations.Api; @@ -72,6 +73,8 @@ import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; +import rx.Observable; + /** * @author Stefan Negrea * @@ -144,13 +147,10 @@ public void getAvailabilityMetricTags( @Suspended final AsyncResponse asyncResponse, @PathParam("id") String id ) { - executeAsync( - asyncResponse, - () -> { - ListenableFuture> future = metricsService.getMetricTags(tenantId, - MetricType.AVAILABILITY, new MetricId(id)); - return Futures.transform(future, ApiUtils.MAP_MAP); - }); + metricsService.getMetricTags(tenantId, MetricType.AVAILABILITY, new MetricId(id)).subscribe( + optional -> asyncResponse.resume(ApiUtils.valueToResponse(optional)), + t -> asyncResponse.resume(ApiUtils.serverError(t))); + // @TODO Above is repeated code, refactor (GaugeHandler has it also) } @PUT @@ -165,11 +165,8 @@ public void updateAvailabilityMetricTags( @PathParam("id") String id, @ApiParam(required = true) Map tags ) { - executeAsync(asyncResponse, () -> { - Availability metric = new Availability(tenantId, new MetricId(id)); - ListenableFuture future = metricsService.addTags(metric, tags); - return Futures.transform(future, ApiUtils.MAP_VOID); - }); + Availability metric = new Availability(tenantId, new MetricId(id)); + metricsService.addTags(metric, tags).subscribe(new ResultSetObserver(asyncResponse)); } @DELETE @@ -185,11 +182,8 @@ public void deleteAvailabilityMetricTags( @PathParam("id") String id, @ApiParam("Tag list") @PathParam("tags") Tags tags ) { - executeAsync(asyncResponse, () -> { - Availability metric = new Availability(tenantId, new MetricId(id)); - ListenableFuture future = metricsService.deleteTags(metric, tags.getTags()); - return Futures.transform(future, ApiUtils.MAP_VOID); - }); + Availability metric = new Availability(tenantId, new MetricId(id)); + metricsService.deleteTags(metric, tags.getTags()).subscribe(new ResultSetObserver(asyncResponse)); } @POST @@ -248,14 +242,19 @@ public void findAvailabilityDataByTags( @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Tag list", required = true) @QueryParam("tags") Tags tags ) { - executeAsync(asyncResponse, () -> { - if (tags == null) { - return Futures.immediateFuture(badRequest(new ApiError("Missing tags query"))); - } - ListenableFuture>> future; - future = metricsService.findAvailabilityByTags(tenantId, tags.getTags()); - return Futures.transform(future, ApiUtils.MAP_MAP); - }); + if (tags == null) { + asyncResponse.resume(badRequest(new ApiError("Missing tags query"))); + } else { + // @TODO Repeated code, refactor (in GaugeHandler also) + metricsService.findAvailabilityByTags(tenantId, tags.getTags()).subscribe(m -> { + if (m.isEmpty()) { + asyncResponse.resume(Response.noContent()); + } else { + asyncResponse.resume(Response.ok(m).build()); + } + }, t -> asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build())); + + } } @GET @@ -339,19 +338,15 @@ public void tagAvailabilityData( @PathParam("id") final String id, @ApiParam(required = true) TagRequest params ) { - executeAsync( - asyncResponse, - () -> { - ListenableFuture> future; - Availability metric = new Availability(tenantId, new MetricId(id)); - if (params.getTimestamp() != null) { - future = metricsService.tagAvailabilityData(metric, params.getTags(), params.getTimestamp()); - } else { - future = metricsService.tagAvailabilityData(metric, params.getTags(), params.getStart(), - params.getEnd()); - } - return Futures.transform(future, (List data) -> Response.ok().build()); - }); + Observable resultSetObservable; + Availability metric = new Availability(tenantId, new MetricId(id)); + if (params.getTimestamp() != null) { + resultSetObservable = metricsService.tagAvailabilityData(metric, params.getTags(), params.getTimestamp()); + } else { + resultSetObservable = metricsService.tagAvailabilityData(metric, params.getTags(), params.getStart(), + params.getEnd()); + } + resultSetObservable.subscribe(new ResultSetObserver(asyncResponse)); } @GET @@ -366,11 +361,14 @@ public void findTaggedAvailabilityData( @Suspended final AsyncResponse asyncResponse, @ApiParam("Tag list") @PathParam("tags") Tags tags ) { - executeAsync(asyncResponse, () -> { - ListenableFuture>> future; - future = metricsService.findAvailabilityByTags(tenantId, tags.getTags()); - return Futures.transform(future, ApiUtils.MAP_MAP); - }); + metricsService.findAvailabilityByTags(tenantId, tags.getTags()) + .subscribe(m -> { // @TODO Repeated code, refactor and use Optional? + if (m.isEmpty()) { + asyncResponse.resume(Response.noContent()); + } else { + asyncResponse.resume(Response.ok(m).build()); + } + }, t -> asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build())); } } diff --git a/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/handler/GaugeHandler.java b/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/handler/GaugeHandler.java index 7eee568e6..5a1a7e1b8 100644 --- a/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/handler/GaugeHandler.java +++ b/api/metrics-api-jaxrs/src/main/java/org/hawkular/metrics/api/jaxrs/handler/GaugeHandler.java @@ -18,9 +18,7 @@ import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MILLISECONDS; - import static javax.ws.rs.core.MediaType.APPLICATION_JSON; - import static org.hawkular.metrics.api.jaxrs.filter.TenantFilter.TENANT_HEADER_NAME; import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.badRequest; import static org.hawkular.metrics.api.jaxrs.util.ApiUtils.emptyPayload; @@ -51,7 +49,17 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; +import com.datastax.driver.core.ResultSet; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; + import org.hawkular.metrics.api.jaxrs.ApiError; +import org.hawkular.metrics.api.jaxrs.ResultSetObserver; import org.hawkular.metrics.api.jaxrs.param.Duration; import org.hawkular.metrics.api.jaxrs.param.Tags; import org.hawkular.metrics.api.jaxrs.request.TagRequest; @@ -66,15 +74,6 @@ import org.hawkular.metrics.core.api.MetricType; import org.hawkular.metrics.core.api.MetricsService; -import com.google.common.base.Function; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; - import rx.Observable; /** @@ -150,13 +149,11 @@ public void getGaugeMetricTags( @Suspended final AsyncResponse asyncResponse, @PathParam("id") String id ) { - executeAsync( - asyncResponse, - () -> { - ListenableFuture> future = metricsService.getMetricTags(tenantId, MetricType.GAUGE, - new MetricId(id)); - return Futures.transform(future, ApiUtils.MAP_MAP); - }); + metricsService.getMetricTags(tenantId, MetricType.GAUGE, new MetricId(id)) + .subscribe( + optional -> asyncResponse.resume(ApiUtils.valueToResponse(optional)), + t ->asyncResponse.resume(ApiUtils.serverError(t)) + ); } @PUT @@ -171,11 +168,8 @@ public void updateGaugeMetricTags( @PathParam("id") String id, @ApiParam(required = true) Map tags ) { - executeAsync(asyncResponse, () -> { - Gauge metric = new Gauge(tenantId, new MetricId(id)); - ListenableFuture future = metricsService.addTags(metric, tags); - return Futures.transform(future, ApiUtils.MAP_VOID); - }); + Gauge metric = new Gauge(tenantId, new MetricId(id)); + metricsService.addTags(metric, tags).subscribe(new ResultSetObserver(asyncResponse)); } @DELETE @@ -191,11 +185,8 @@ public void deleteGaugeMetricTags( @PathParam("id") String id, @ApiParam("Tag list") @PathParam("tags") Tags tags ) { - executeAsync(asyncResponse, () -> { - Gauge metric = new Gauge(tenantId, new MetricId(id)); - ListenableFuture future = metricsService.deleteTags(metric, tags.getTags()); - return Futures.transform(future, ApiUtils.MAP_VOID); - }); + Gauge metric = new Gauge(tenantId, new MetricId(id)); + metricsService.deleteTags(metric, tags.getTags()).subscribe(new ResultSetObserver(asyncResponse)); } @POST @@ -258,14 +249,17 @@ public void findGaugeDataByTags( @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Tag list", required = true) @QueryParam("tags") Tags tags ) { - executeAsync(asyncResponse, () -> { - if (tags == null) { - return Futures.immediateFuture(badRequest(new ApiError("Missing tags query"))); - } - ListenableFuture>> future; - future = metricsService.findGaugeDataByTags(tenantId, tags.getTags()); - return Futures.transform(future, ApiUtils.MAP_MAP); - }); + if (tags == null) { + asyncResponse.resume(badRequest(new ApiError("Missing tags query"))); + } else { + metricsService.findGaugeDataByTags(tenantId, tags.getTags()).subscribe(m -> { + if (m.isEmpty()) { + asyncResponse.resume(Response.noContent()); + } else { + asyncResponse.resume(Response.ok(m).build()); + } + }, t -> asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build())); + } } @GET @@ -422,43 +416,43 @@ public void findTaggedGaugeData( @Suspended final AsyncResponse asyncResponse, @ApiParam("Tag list") @PathParam("tags") Tags tags ) { - executeAsync( - asyncResponse, - () -> { - ListenableFuture>> queryFuture; - queryFuture = metricsService.findGaugeDataByTags(tenantId, tags.getTags()); - ListenableFuture>> resultFuture = Futures.transform(queryFuture, - new Function>, Map>>() { - @Override - public Map> apply(Map> input) { - Map> result = new HashMap<>(input.size()); - for (Map.Entry> entry : input.entrySet()) { - result.put(entry.getKey().getName(), entry.getValue()); - } - return result; - } - }); - return Futures.transform(resultFuture, ApiUtils.MAP_MAP); - }); + Observable>> gaugeDataByTags = metricsService.findGaugeDataByTags(tenantId, + tags.getTags()); + + gaugeDataByTags.map(input -> { + Map> result = new HashMap<>(input.size()); + for (Map.Entry> entry : input.entrySet()) { + result.put(entry.getKey().getName(), entry.getValue()); + } + return result; + }).subscribe(m -> { // @TODO Repeated code, refactor and use Optional? + if (m.isEmpty()) { + asyncResponse.resume(Response.noContent()); + } else { + asyncResponse.resume(Response.ok(m).build()); + } + }, t -> asyncResponse.resume(Response.serverError().entity(new ApiError(t.getMessage())).build())); + } @POST @Path("/{id}/tag") @ApiOperation(value = "Add or update gauge metric's tags.") - @ApiResponses(value = { @ApiResponse(code = 200, message = "Tags were modified successfully.") }) + @ApiResponses(value = { @ApiResponse(code = 200, message = "Tags were modified successfully."), + @ApiResponse(code = 500, message = "Processing tags failed") }) public void tagGaugeData( @Suspended final AsyncResponse asyncResponse, @PathParam("id") final String id, @ApiParam(required = true) TagRequest params ) { - executeAsync(asyncResponse, () -> { - ListenableFuture> future; - Gauge metric = new Gauge(tenantId, new MetricId(id)); - if (params.getTimestamp() != null) { - future = metricsService.tagGaugeData(metric, params.getTags(), params.getTimestamp()); - } else { - future = metricsService.tagGaugeData(metric, params.getTags(), params.getStart(), params.getEnd()); - } - return Futures.transform(future, (List data) -> Response.ok().build()); - }); + Observable resultSetObservable; + Gauge metric = new Gauge(tenantId, new MetricId(id)); + if (params.getTimestamp() != null) { + resultSetObservable = metricsService.tagGaugeData(metric, params.getTags(), params.getTimestamp()); + } else { + resultSetObservable = metricsService.tagGaugeData(metric, params.getTags(), params.getStart(), params + .getEnd()); + } + + resultSetObservable.subscribe(new ResultSetObserver(asyncResponse)); } } diff --git a/core/metrics-core-api/src/main/java/org/hawkular/metrics/core/api/MetricsService.java b/core/metrics-core-api/src/main/java/org/hawkular/metrics/core/api/MetricsService.java index 732a1609d..a6752e6c2 100644 --- a/core/metrics-core-api/src/main/java/org/hawkular/metrics/core/api/MetricsService.java +++ b/core/metrics-core-api/src/main/java/org/hawkular/metrics/core/api/MetricsService.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.function.Predicate; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.google.common.util.concurrent.ListenableFuture; @@ -82,11 +83,11 @@ Observable>> findMetric(String t Observable> findMetrics(String tenantId, MetricType type); - ListenableFuture> getMetricTags(String tenantId, MetricType type, MetricId id); + Observable>> getMetricTags(String tenantId, MetricType type, MetricId id); - ListenableFuture addTags(Metric metric, Map tags); + Observable addTags(Metric metric, Map tags); - ListenableFuture deleteTags(Metric metric, Map tags); + Observable deleteTags(Metric metric, Map tags); Observable addGaugeData(Observable gaugeObservable); @@ -118,22 +119,22 @@ ListenableFuture> findAvailabilitySt /** Check if a metric with the passed {id} has been stored in the system */ ListenableFuture idExists(String id); - ListenableFuture> tagGaugeData(Gauge metric, Map tags, - long start, long end); + Observable tagGaugeData(Gauge metric, Map tags, + long start, long end); - ListenableFuture> tagAvailabilityData(Availability metric, Map tags, - long start, long end); + Observable tagAvailabilityData(Availability metric, Map tags, + long start, long end); - ListenableFuture> tagGaugeData(Gauge metric, Map tags, - long timestamp); + Observable tagGaugeData(Gauge metric, Map tags, + long timestamp); - ListenableFuture> tagAvailabilityData(Availability metric, Map tags, - long timestamp); + Observable tagAvailabilityData(Availability metric, Map tags, + long timestamp); - ListenableFuture>> findGaugeDataByTags(String tenantId, Map tags); + Observable>> findGaugeDataByTags(String tenantId, Map tags); - ListenableFuture>> findAvailabilityByTags(String tenantId, - Map tags); + Observable>> findAvailabilityByTags(String tenantId, + Map tags); /** *

diff --git a/core/metrics-core-impl/pom.xml b/core/metrics-core-impl/pom.xml index 9d07ff3c1..5cedec9b8 100644 --- a/core/metrics-core-impl/pom.xml +++ b/core/metrics-core-impl/pom.xml @@ -88,6 +88,11 @@ commons-math3 3.4.1 + + com.github.stephenc.eaio-uuid + uuid + 3.4.0 + diff --git a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/DataAccess.java b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/DataAccess.java index 0895702b9..2afb14cfb 100644 --- a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/DataAccess.java +++ b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/DataAccess.java @@ -53,16 +53,16 @@ public interface DataAccess { Observable findMetric(String tenantId, MetricType type, MetricId id, long dpart); - ResultSetFuture addTagsAndDataRetention(Metric metric); + Observable addTagsAndDataRetention(Metric metric); - ResultSetFuture getMetricTags(String tenantId, MetricType type, MetricId id, long dpart); + Observable getMetricTags(String tenantId, MetricType type, MetricId id, long dpart); - ResultSetFuture addTags(Metric metric, Map tags); + Observable addTags(Metric metric, Map tags); - ResultSetFuture deleteTags(Metric metric, Set tags); + Observable deleteTags(Metric metric, Set tags); - ResultSetFuture updateTagsInMetricsIndex(Metric metric, Map additions, - Set deletions); + Observable updateTagsInMetricsIndex(Metric metric, Map additions, + Set deletions); > ResultSetFuture updateMetricsIndex(List metrics); @@ -77,28 +77,28 @@ ResultSetFuture updateTagsInMetricsIndex(Metric metric, Map a Observable findData(String tenantId, MetricId id, long startTime, long endTime, boolean includeWriteTime); - ResultSetFuture findData(Gauge metric, long timestamp, boolean includeWriteTime); + Observable findData(Gauge metric, long timestamp, boolean includeWriteTime); - ResultSetFuture findData(Availability metric, long startTime, long endTime); + Observable findData(Availability metric, long startTime, long endTime); - ResultSetFuture findData(Availability metric, long startTime, long endTime, boolean includeWriteTime); + Observable findData(Availability metric, long startTime, long endTime, boolean includeWriteTime); - ResultSetFuture findData(Availability metric, long timestamp); + Observable findData(Availability metric, long timestamp); ResultSetFuture deleteGuageMetric(String tenantId, String metric, Interval interval, long dpart); ResultSetFuture findAllGuageMetrics(); - ResultSetFuture insertGuageTag(String tag, String tagValue, Gauge metric, List data); + Observable insertGaugeTag(String tag, String tagValue, Gauge metric, Observable data); - ResultSetFuture insertAvailabilityTag(String tag, String tagValue, Availability metric, - List data); + Observable insertAvailabilityTag(String tag, String tagValue, Availability metric, + Observable data); - ResultSetFuture updateDataWithTag(Metric metric, MetricData data, Map tags); + Observable updateDataWithTag(Metric metric, MetricData data, Map tags); - ResultSetFuture findGuageDataByTag(String tenantId, String tag, String tagValue); + Observable findGaugeDataByTag(String tenantId, String tag, String tagValue); - ResultSetFuture findAvailabilityByTag(String tenantId, String tag, String tagValue); + Observable findAvailabilityByTag(String tenantId, String tag, String tagValue); ResultSetFuture insertData(Availability metric, int ttl); @@ -114,9 +114,9 @@ ResultSetFuture insertAvailabilityTag(String tag, String tagValue, Availability ResultSetFuture updateRetentionsIndex(Metric metric); - ResultSetFuture insertIntoMetricsTagsIndex(Metric metric, Map tags); + Observable insertIntoMetricsTagsIndex(Metric metric, Map tags); - ResultSetFuture deleteFromMetricsTagsIndex(Metric metric, Map tags); + Observable deleteFromMetricsTagsIndex(Metric metric, Map tags); - ResultSetFuture findMetricsByTag(String tenantId, String tag); + Observable findMetricsByTag(String tenantId, String tag); } diff --git a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/DataAccessImpl.java b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/DataAccessImpl.java index b87ddc2ff..a38575ef5 100644 --- a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/DataAccessImpl.java +++ b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/DataAccessImpl.java @@ -26,6 +26,19 @@ import java.util.Set; import java.util.function.BiFunction; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TupleType; +import com.datastax.driver.core.TupleValue; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.UserType; +import com.datastax.driver.core.utils.UUIDs; import org.hawkular.metrics.core.api.AggregationTemplate; import org.hawkular.metrics.core.api.Availability; import org.hawkular.metrics.core.api.AvailabilityData; @@ -43,21 +56,6 @@ import org.hawkular.metrics.core.api.TimeUUIDUtils; import org.hawkular.rx.cassandra.driver.RxSession; import org.hawkular.rx.cassandra.driver.RxSessionImpl; - -import com.datastax.driver.core.BatchStatement; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.TupleType; -import com.datastax.driver.core.TupleValue; -import com.datastax.driver.core.UDTValue; -import com.datastax.driver.core.UserType; -import com.datastax.driver.core.utils.UUIDs; - import rx.Observable; /** @@ -385,9 +383,9 @@ public Observable findMetric(String tenantId, MetricType type, Metric } @Override - public ResultSetFuture getMetricTags(String tenantId, MetricType type, MetricId id, long dpart) { - return session.executeAsync(getMetricTags.bind(tenantId, type.getCode(), id.getName(), id.getInterval() - .toString(), dpart)); + public Observable getMetricTags(String tenantId, MetricType type, MetricId id, long dpart) { + return rxSession.execute(getMetricTags.bind(tenantId, type.getCode(), id.getName(), id.getInterval() + .toString(), dpart)); } // This method updates the metric tags and data retention in the data table. In the @@ -396,41 +394,41 @@ public ResultSetFuture getMetricTags(String tenantId, MetricType type, MetricId // determine when we start writing data to a new partition, e.g., the start of the next // day, and then add the tags and retention to the new partition. @Override - public ResultSetFuture addTagsAndDataRetention(Metric metric) { - return session.executeAsync(addMetadataAndDataRetention.bind(getTags(metric), metric.getDataRetention(), - metric.getTenantId(), metric.getType().getCode(), metric.getId().getName(), - metric.getId().getInterval().toString(), metric.getDpart())); + public Observable addTagsAndDataRetention(Metric metric) { + return rxSession.execute(addMetadataAndDataRetention.bind(getTags(metric), metric.getDataRetention(), + metric.getTenantId(), metric.getType().getCode(), metric.getId().getName(), metric.getId().getInterval() + .toString(), metric.getDpart())); } @Override - public ResultSetFuture addTags(Metric metric, Map tags) { + public Observable addTags(Metric metric, Map tags) { BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED); batch.add(addMetricTagsToDataTable.bind(tags, metric.getTenantId(), metric.getType().getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), metric.getDpart())); batch.add(addTagsToMetricsIndex.bind(tags, metric.getTenantId(), metric.getType().getCode(), metric.getId().getInterval().toString(), metric.getId().getName())); - return session.executeAsync(batch); + return rxSession.execute(batch); } @Override - public ResultSetFuture deleteTags(Metric metric, Set tags) { + public Observable deleteTags(Metric metric, Set tags) { BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED); batch.add(deleteMetricTagsFromDataTable.bind(tags, metric.getTenantId(), metric.getType().getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), metric.getDpart())); batch.add(deleteTagsFromMetricsIndex.bind(tags, metric.getTenantId(), metric.getType().getCode(), metric.getId().getInterval().toString(), metric.getId().getName())); - return session.executeAsync(batch); + return rxSession.execute(batch); } @Override - public ResultSetFuture updateTagsInMetricsIndex(Metric metric, Map additions, - Set deletions) { + public Observable updateTagsInMetricsIndex(Metric metric, Map additions, + Set deletions) { BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED) .add(addTagsToMetricsIndex.bind(additions, metric.getTenantId(), metric.getType().getCode(), metric.getId().getInterval().toString(), metric.getId().getName())) .add(deleteTagsFromMetricsIndex.bind(deletions, metric.getTenantId(), metric.getType().getCode(), metric.getId().getInterval().toString(), metric.getId().getName())); - return session.executeAsync(batchStatement); + return rxSession.execute(batchStatement); } @Override @@ -492,41 +490,41 @@ public Observable findData(String tenantId, MetricId id, long startTi } @Override - public ResultSetFuture findData(Gauge metric, long timestamp, boolean includeWriteTime) { + public Observable findData(Gauge metric, long timestamp, boolean includeWriteTime) { if (includeWriteTime) { - return session.executeAsync(findGaugeDataWithWriteTimeByDateRangeInclusive.bind(metric.getTenantId(), - MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), - metric.getDpart(), UUIDs.startOf(timestamp), UUIDs.endOf(timestamp))); + return rxSession.execute(findGaugeDataWithWriteTimeByDateRangeInclusive.bind(metric.getTenantId(), + MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), + metric.getDpart(), UUIDs.startOf(timestamp), UUIDs.endOf(timestamp))); } else { - return session.executeAsync(findGaugeDataByDateRangeInclusive.bind(metric.getTenantId(), - MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), - metric.getDpart(), UUIDs.startOf(timestamp), UUIDs.endOf(timestamp))); + return rxSession.execute(findGaugeDataByDateRangeInclusive.bind(metric.getTenantId(), + MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), + metric.getDpart(), UUIDs.startOf(timestamp), UUIDs.endOf(timestamp))); } } @Override - public ResultSetFuture findData(Availability metric, long startTime, long endTime) { + public Observable findData(Availability metric, long startTime, long endTime) { return findData(metric, startTime, endTime, false); } @Override - public ResultSetFuture findData(Availability metric, long startTime, long endTime, boolean includeWriteTime) { + public Observable findData(Availability metric, long startTime, long endTime, boolean includeWriteTime) { if (includeWriteTime) { - return session.executeAsync(findAvailabilitiesWithWriteTime.bind(metric.getTenantId(), + return rxSession.execute(findAvailabilitiesWithWriteTime.bind(metric.getTenantId(), MetricType.AVAILABILITY.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), metric.getDpart(), TimeUUIDUtils.getTimeUUID(startTime), TimeUUIDUtils.getTimeUUID(endTime))); } else { - return session.executeAsync(findAvailabilities.bind(metric.getTenantId(), MetricType.AVAILABILITY.getCode(), + return rxSession.execute(findAvailabilities.bind(metric.getTenantId(), MetricType.AVAILABILITY.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), metric.getDpart(), TimeUUIDUtils.getTimeUUID(startTime), TimeUUIDUtils.getTimeUUID(endTime))); } } @Override - public ResultSetFuture findData(Availability metric, long timestamp) { - return session.executeAsync(findAvailabilityByDateRangeInclusive.bind(metric.getTenantId(), - MetricType.AVAILABILITY.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), - metric.getDpart(), UUIDs.startOf(timestamp), UUIDs.endOf(timestamp))); + public Observable findData(Availability metric, long timestamp) { + return rxSession.execute(findAvailabilityByDateRangeInclusive.bind(metric.getTenantId(), + MetricType.AVAILABILITY.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), + metric.getDpart(), UUIDs.startOf(timestamp), UUIDs.endOf(timestamp))); } @Override @@ -541,44 +539,41 @@ public ResultSetFuture findAllGuageMetrics() { } @Override - public ResultSetFuture insertGuageTag(String tag, String tagValue, Gauge metric, - List data) { - BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED); - for (GaugeData d : data) { - batchStatement.add(insertGaugeTags.bind(metric.getTenantId(), tag, tagValue, + public Observable insertGaugeTag(String tag, String tagValue, Gauge metric, + Observable data) { + return data.reduce(new BatchStatement(BatchStatement.Type.UNLOGGED), (batch, d) -> { + batch.add(insertGaugeTags.bind(metric.getTenantId(), tag, tagValue, MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(), d.getTimeUUID(), d.getValue(), d.getTTL())); - } - return session.executeAsync(batchStatement); + return batch; + }).flatMap(rxSession::execute); } @Override - public ResultSetFuture insertAvailabilityTag(String tag, String tagValue, Availability metric, - List data) { - BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED); - for (AvailabilityData a : data) { - batchStatement.add(insertAvailabilityTags.bind(metric.getTenantId(), tag, tagValue, - MetricType.AVAILABILITY.getCode(), metric.getId().getName(), metric.getId().getInterval() - .toString(), a.getTimeUUID(), a.getBytes(), a.getTTL())); - } - return session.executeAsync(batchStatement); + public Observable insertAvailabilityTag(String tag, String tagValue, Availability metric, + Observable data) { + return data.reduce(new BatchStatement(BatchStatement.Type.UNLOGGED), (batch, a) -> { + batch.add(insertAvailabilityTags.bind(metric.getTenantId(), tag, tagValue, + MetricType.AVAILABILITY.getCode(), metric.getId().getName(), + metric.getId().getInterval().toString(), a.getTimeUUID(), a.getBytes(), a.getTTL())); + return batch; + }).flatMap(rxSession::execute); } @Override - public ResultSetFuture updateDataWithTag(Metric metric, MetricData data, Map tags) { - return session.executeAsync(updateDataWithTags.bind(tags, metric.getTenantId(), metric.getType().getCode(), - metric.getId().getName(), metric.getId().getInterval().toString(), metric.getDpart(), - data.getTimeUUID())); + public Observable updateDataWithTag(Metric metric, MetricData data, Map tags) { + return rxSession.execute(updateDataWithTags.bind(tags, metric.getTenantId(), metric.getType().getCode(), metric + .getId().getName(), metric.getId().getInterval().toString(), metric.getDpart(), data.getTimeUUID())); } @Override - public ResultSetFuture findGuageDataByTag(String tenantId, String tag, String tagValue) { - return session.executeAsync(findGaugeDataByTag.bind(tenantId, tag, tagValue)); + public Observable findGaugeDataByTag(String tenantId, String tag, String tagValue) { + return rxSession.execute(findGaugeDataByTag.bind(tenantId, tag, tagValue)); } @Override - public ResultSetFuture findAvailabilityByTag(String tenantId, String tag, String tagValue) { - return session.executeAsync(findAvailabilityByTag.bind(tenantId, tag, tagValue)); + public Observable findAvailabilityByTag(String tenantId, String tag, String tagValue) { + return rxSession.execute(findAvailabilityByTag.bind(tenantId, tag, tagValue)); } @Override @@ -632,27 +627,27 @@ public ResultSetFuture updateRetentionsIndex(String tenantId, MetricType type, S } @Override - public ResultSetFuture insertIntoMetricsTagsIndex(Metric metric, Map tags) { + public Observable insertIntoMetricsTagsIndex(Metric metric, Map tags) { return executeTagsBatch(tags, (name, value) -> insertMetricsTagsIndex.bind(metric.getTenantId(), name, value, metric.getType().getCode(), metric.getId().getName(), metric.getId().getInterval().toString())); } @Override - public ResultSetFuture deleteFromMetricsTagsIndex(Metric metric, Map tags) { + public Observable deleteFromMetricsTagsIndex(Metric metric, Map tags) { return executeTagsBatch(tags, (name, value) -> deleteMetricsTagsIndex.bind(metric.getTenantId(), name, value, metric.getType().getCode(), metric.getId().getName(), metric.getId().getInterval().toString())); } - private ResultSetFuture executeTagsBatch(Map tags, - BiFunction bindVars) { + private Observable executeTagsBatch(Map tags, + BiFunction bindVars) { BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED); tags.entrySet().stream().forEach(entry -> batchStatement.add(bindVars.apply(entry.getKey(), entry.getValue()))); - return session.executeAsync(batchStatement); + return rxSession.execute(batchStatement); } @Override - public ResultSetFuture findMetricsByTag(String tenantId, String tag) { - return session.executeAsync(findMetricsByTagName.bind(tenantId, tag)); + public Observable findMetricsByTag(String tenantId, String tag) { + return rxSession.execute(findMetricsByTagName.bind(tenantId, tag)); } @Override diff --git a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/Functions.java b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/Functions.java index 03652363e..87b9eab6e 100644 --- a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/Functions.java +++ b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/Functions.java @@ -98,7 +98,7 @@ private static AvailabilityData getAvailability(Row row) { StreamSupport.stream(resultSet.spliterator(), false).map(Functions::getAvailabilityAndWriteTime) .collect(toList()); - private static AvailabilityData getAvailabilityAndWriteTime(Row row) { + public static AvailabilityData getAvailabilityAndWriteTime(Row row) { return new AvailabilityData( row.getUUID(AVAILABILITY_COLS.TIME.ordinal()), row.getBytes(AVAILABILITY_COLS.AVAILABILITY.ordinal()), diff --git a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/MetricsServiceCassandra.java b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/MetricsServiceCassandra.java index 60924ca01..9d51049f1 100644 --- a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/MetricsServiceCassandra.java +++ b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/MetricsServiceCassandra.java @@ -16,8 +16,6 @@ */ package org.hawkular.metrics.core.impl.cassandra; -import static java.util.Arrays.asList; - import static org.joda.time.Hours.hours; import java.util.ArrayList; @@ -40,6 +38,20 @@ import javax.management.MBeanServerConnection; import javax.management.ObjectName; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; import org.hawkular.metrics.core.api.Availability; import org.hawkular.metrics.core.api.AvailabilityBucketDataPoint; import org.hawkular.metrics.core.api.AvailabilityData; @@ -63,27 +75,11 @@ import org.hawkular.metrics.core.api.TenantAlreadyExistsException; import org.hawkular.metrics.schema.SchemaManager; import org.hawkular.rx.cassandra.driver.RxUtil; +import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Hours; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.RateLimiter; - import rx.Observable; /** @@ -363,42 +359,36 @@ private List loadTenantIds() { public Observable createMetric(final Metric metric) { ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric); Observable indexUpdated = RxUtil.from(future, metricsTasks); - return Observable.create(subscriber -> - indexUpdated.subscribe( - resultSet -> { - if (!resultSet.wasApplied()) { - subscriber.onError(new MetricAlreadyExistsException(metric)); - - } else { - // TODO Need error handling if either of the following updates fail - // If adding tags/retention fails, then we want to report the error to the - // client. Updating the retentions_idx table could also fail. We need to - // report that failure as well. - // - // The error handling is the same as it was with Guava futures. That is, if any - // future fails, we treat the entire client request as a failure. We probably - // eventually want to implement more fine-grained error handling where we can - // notify the subscriber of what exactly fails. - ResultSetFuture metadataFuture = dataAccess.addTagsAndDataRetention(metric); - Observable metadataUpdated = RxUtil.from(metadataFuture, metricsTasks); - ResultSetFuture tagsFuture = dataAccess.insertIntoMetricsTagsIndex(metric, - metric.getTags()); - Observable tagsUpdated = RxUtil.from(tagsFuture, metricsTasks); - Observable metricUpdates; - - if (metric.getDataRetention() != null) { - ResultSetFuture dataRetentionFuture = dataAccess.updateRetentionsIndex(metric); - Observable dataRetentionUpdated = RxUtil.from(dataRetentionFuture, - metricsTasks); - dataRetentions.put(new DataRetentionKey(metric), metric.getDataRetention()); - metricUpdates = Observable.merge(metadataUpdated, tagsUpdated, dataRetentionUpdated); - } else { - metricUpdates = Observable.merge(metadataUpdated, tagsUpdated); - } - - metricUpdates.subscribe(new VoidSubscriber<>(subscriber)); - } - })); + return Observable.create(subscriber -> indexUpdated.subscribe(resultSet -> { + if (!resultSet.wasApplied()) { + subscriber.onError(new MetricAlreadyExistsException(metric)); + + } else { + // TODO Need error handling if either of the following updates fail + // If adding tags/retention fails, then we want to report the error to the + // client. Updating the retentions_idx table could also fail. We need to + // report that failure as well. + // + // The error handling is the same as it was with Guava futures. That is, if any + // future fails, we treat the entire client request as a failure. We probably + // eventually want to implement more fine-grained error handling where we can + // notify the subscriber of what exactly fails. + Observable metadataUpdated = dataAccess.addTagsAndDataRetention(metric); + Observable tagsUpdated = dataAccess.insertIntoMetricsTagsIndex(metric, metric.getTags()); + Observable metricUpdates; + + if (metric.getDataRetention() != null) { + ResultSetFuture dataRetentionFuture = dataAccess.updateRetentionsIndex(metric); + Observable dataRetentionUpdated = RxUtil.from(dataRetentionFuture, metricsTasks); + dataRetentions.put(new DataRetentionKey(metric), metric.getDataRetention()); + metricUpdates = Observable.merge(metadataUpdated, tagsUpdated, dataRetentionUpdated); + } else { + metricUpdates = Observable.merge(metadataUpdated, tagsUpdated); + } + + metricUpdates.subscribe(new VoidSubscriber<>(subscriber)); + } + }) ); } @Override @@ -464,37 +454,26 @@ private Availability toAvailability(String tenantId, Row row) { } @Override - public ListenableFuture> getMetricTags(String tenantId, MetricType type, MetricId id) { - ResultSetFuture metricTags = dataAccess.getMetricTags(tenantId, type, id, Metric.DPART); - return Futures.transform(metricTags, (ResultSet input) -> { - if (input.isExhausted()) { - return Collections.EMPTY_MAP; - } - return input.one().getMap(0, String.class, String.class); - }, metricsTasks); + public Observable>> getMetricTags(String tenantId, MetricType type, MetricId id) { + Observable metricTags = dataAccess.getMetricTags(tenantId, type, id, Metric.DPART); + + return metricTags.flatMap(Observable::from).take(1).map(row -> Optional.of(row.getMap(0, String.class, String + .class))) + .defaultIfEmpty(Optional.empty()); } // Adding/deleting metric tags currently involves writing to three tables - data, // metrics_idx, and metrics_tags_idx. It might make sense to refactor tag related // functionality into a separate class. @Override - public ListenableFuture addTags(Metric metric, Map tags) { - List insertFutures = asList( - dataAccess.addTags(metric, tags), - dataAccess.insertIntoMetricsTagsIndex(metric, tags) - ); - ListenableFuture> insertsFuture = Futures.allAsList(insertFutures); - return Futures.transform(insertsFuture, Functions.TO_VOID, metricsTasks); + public Observable addTags(Metric metric, Map tags) { + return dataAccess.addTags(metric, tags).mergeWith(dataAccess.insertIntoMetricsTagsIndex(metric, tags)); } @Override - public ListenableFuture deleteTags(Metric metric, Map tags) { - List deleteFutures = asList( - dataAccess.deleteTags(metric, tags.keySet()), - dataAccess.deleteFromMetricsTagsIndex(metric, tags) - ); - ListenableFuture> deletesFuture = Futures.allAsList(deleteFutures); - return Futures.transform(deletesFuture, Functions.TO_VOID, metricsTasks); + public Observable deleteTags(Metric metric, Map tags) { + return dataAccess.deleteTags(metric, tags.keySet()).mergeWith( + dataAccess.deleteFromMetricsTagsIndex(metric, tags)); } @Override @@ -663,85 +642,97 @@ public Boolean apply(ResultSet resultSet) { // Data for different metrics and for the same tag are stored within the same partition // in the tags table; therefore, it makes sense for the API to support tagging multiple // metrics since they could efficiently be inserted in a single batch statement. - public ListenableFuture> tagGaugeData(Gauge metric, final Map tags, - long start, long end) { -// ResultSetFuture queryFuture = dataAccess.findData(metric.getTenantId(), metric.getId(), start, end, true); -// ListenableFuture> dataFuture = Futures.transform(queryFuture, -// Functions.MAP_GAUGE_DATA_WITH_WRITE_TIME, metricsTasks); -// int ttl = getTTL(metric); -// ListenableFuture> updatedDataFuture = Futures.transform(dataFuture, new ComputeTTL<>(ttl)); -// return Futures.transform(updatedDataFuture, new TagAsyncFunction(tags, metric)); + public Observable tagGaugeData(Gauge metric, final Map tags, + long start, long end) { + Observable findDataObservable = dataAccess.findData(metric.getTenantId(), metric.getId(), start, end, + true); + return tagGaugeData(findDataObservable, tags, metric); + } - List data = ImmutableList.copyOf( - dataAccess.findData(metric.getTenantId(), metric.getId(), start, end, true) - .flatMap(Observable::from) + private Observable tagGaugeData(Observable findDataObservable, Map tags, + Gauge metric) { + int ttl = getTTL(metric); + Observable> tagsObservable = Observable.from(tags.entrySet()).cache(); + Observable gauges = findDataObservable.flatMap(Observable::from) .map(Functions::getGaugeDataAndWriteTime) - .toBlocking() - .toIterable() - ); + .map(g -> (GaugeData) computeTTL(g, ttl)) + .cache(); + + Observable tagInsert = tagsObservable + .flatMap(t -> dataAccess.insertGaugeTag(t.getKey(), t.getValue(), metric, gauges)); + + Observable tagsInsert = gauges + .flatMap(g -> dataAccess.updateDataWithTag(metric, g, tags)); + + return tagInsert.concatWith(tagsInsert); + } + + private Observable tagAvailabilityData(Observable findDataObservable, + Map tags, Availability metric) { int ttl = getTTL(metric); - List updatedData = new ComputeTTL(ttl).apply(data); - try { - return new TagAsyncFunction(tags, metric).apply(updatedData); - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } + Observable> tagsObservable = Observable.from(tags.entrySet()).cache(); + Observable availabilities = findDataObservable.flatMap(Observable::from) + .map(Functions::getAvailabilityAndWriteTime) + .map(a -> (AvailabilityData) computeTTL(a, ttl)) + .cache(); + + Observable tagInsert = tagsObservable + .flatMap(t -> dataAccess.insertAvailabilityTag(t.getKey(), t.getValue(), metric, availabilities)); + + Observable tagsInsert = availabilities + .flatMap(a -> dataAccess.updateDataWithTag(metric, a, tags)); + + return tagInsert.concatWith(tagsInsert); } @Override - public ListenableFuture> tagAvailabilityData(Availability metric, - Map tags, long start, long end) { - ResultSetFuture queryFuture = dataAccess.findData(metric, start, end, true); - ListenableFuture> dataFuture = Futures.transform(queryFuture, - Functions.MAP_AVAILABILITY_WITH_WRITE_TIME, metricsTasks); - int ttl = getTTL(metric); - ListenableFuture> updatedDataFuture = Futures.transform(dataFuture, - new ComputeTTL<>(ttl)); - return Futures.transform(updatedDataFuture, new TagAsyncFunction(tags, metric)); + public Observable tagAvailabilityData(Availability metric, + Map tags, long start, long end) { + Observable findDataObservable = dataAccess.findData(metric, start, end, true); + return tagAvailabilityData(findDataObservable, tags, metric); + } + + private MetricData computeTTL(MetricData d, int originalTTL) { + Duration duration = new Duration(DateTime.now().minus(d.getWriteTime()).getMillis()); + d.setTTL(originalTTL - duration.toStandardSeconds().getSeconds()); + return d; } @Override - public ListenableFuture> tagGaugeData(Gauge metric, final Map tags, - long timestamp) { - ListenableFuture queryFuture = dataAccess.findData(metric, timestamp, true); - ListenableFuture> dataFuture = Futures.transform(queryFuture, - Functions.MAP_GAUGE_DATA_WITH_WRITE_TIME, metricsTasks); - int ttl = getTTL(metric); - ListenableFuture> updatedDataFuture = Futures.transform(dataFuture, new ComputeTTL<>(ttl)); - return Futures.transform(updatedDataFuture, new TagAsyncFunction(tags, metric)); + public Observable tagGaugeData(Gauge metric, final Map tags, + long timestamp) { + Observable findDataObservable = dataAccess.findData(metric, timestamp, true); + return tagGaugeData(findDataObservable, tags, metric); } @Override - public ListenableFuture> tagAvailabilityData(Availability metric, - final Map tags, long timestamp) { - ListenableFuture queryFuture = dataAccess.findData(metric, timestamp); - ListenableFuture> dataFuture = Futures.transform(queryFuture, - Functions.MAP_AVAILABILITY_WITH_WRITE_TIME, metricsTasks); - int ttl = getTTL(metric); - ListenableFuture> updatedDataFuture = Futures.transform(dataFuture, - new ComputeTTL<>(ttl)); - return Futures.transform(updatedDataFuture, new TagAsyncFunction(tags, metric)); + public Observable tagAvailabilityData(Availability metric, + final Map tags, long timestamp) { + Observable findDataObservable = dataAccess.findData(metric, timestamp); + return tagAvailabilityData(findDataObservable, tags, metric); } @Override - public ListenableFuture>> findGaugeDataByTags(String tenantId, - Map tags) { - List>>> queryFutures = new ArrayList<>(tags.size()); - tags.forEach((k, v) -> queryFutures.add(Futures.transform(dataAccess.findGuageDataByTag(tenantId, k, v), - new TaggedGaugeDataMapper(), metricsTasks))); - ListenableFuture>>> queriesFuture = Futures.allAsList(queryFutures); - return Futures.transform(queriesFuture, new MergeTagsFunction()); + public Observable>> findGaugeDataByTags(String tenantId, Map tags) { + MergeTagsFunction f = new MergeTagsFunction(); + return Observable.from(tags.entrySet()) + .flatMap(e -> dataAccess.findGaugeDataByTag(tenantId, e.getKey(), e.getValue())) + .map(TaggedGaugeDataMapper::apply) + .toList() + .map(r -> f.apply(r)); } @Override - public ListenableFuture>> findAvailabilityByTags(String tenantId, - Map tags) { - List>>> queryFutures = new ArrayList<>(tags.size()); - tags.forEach((k, v) -> queryFutures.add(Futures.transform(dataAccess.findAvailabilityByTag(tenantId, k, v), - new TaggedAvailabilityMappper(), metricsTasks))); - ListenableFuture>>> queriesFuture = Futures.allAsList(queryFutures); - return Futures.transform(queriesFuture, new MergeTagsFunction()); + public Observable>> findAvailabilityByTags(String tenantId, + Map tags) { + MergeTagsFunction f = new MergeTagsFunction(); + + return Observable.from(tags.entrySet()) + .flatMap(e -> dataAccess.findAvailabilityByTag(tenantId, e.getKey(), e.getValue())) + .map(TaggedAvailabilityMapper::apply) + .toList() + .map(r -> f.apply(r)); } @Override @@ -789,34 +780,12 @@ private int getTTL(Metric metric) { return ttl; } - private class TagAsyncFunction implements AsyncFunction, List> { - private final Map tags; - private final Metric metric; - - public TagAsyncFunction(Map tags, Metric metric) { - this.tags = tags; - this.metric = metric; - } - - @Override - public ListenableFuture> apply(List data) throws Exception { - if (data.isEmpty()) { - List results = Collections.emptyList(); - return Futures.immediateFuture(results); - } - List insertFutures = new ArrayList<>(tags.size()); - tags.forEach((k, v) -> { - if(metric instanceof Gauge) { - insertFutures.add(dataAccess.insertGuageTag(k, v, (Gauge) metric, - (List) data)); - } else if(metric instanceof Availability) { - insertFutures.add(dataAccess.insertAvailabilityTag(k, v, (Availability) metric, - (List) data)); - } - }); - data.forEach(t -> insertFutures.add(dataAccess.updateDataWithTag(metric, t, tags))); - ListenableFuture> insertsFuture = Futures.allAsList(insertFutures); - return Futures.transform(insertsFuture, (List resultSets) -> data); + private void updateSchemaIfNecessary(String schemaName) { + try { + SchemaManager schemaManager = new SchemaManager(session.get()); + schemaManager.createSchema(schemaName); + } catch (IOException e) { + throw new RuntimeException("Schema creation failed", e); } } } diff --git a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedAvailabilityMappper.java b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedAvailabilityMapper.java similarity index 86% rename from core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedAvailabilityMappper.java rename to core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedAvailabilityMapper.java index cdc64fb46..860b61946 100644 --- a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedAvailabilityMappper.java +++ b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedAvailabilityMapper.java @@ -21,22 +21,20 @@ import java.util.Map; import java.util.Set; -import org.hawkular.metrics.core.api.AvailabilityData; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; + import org.hawkular.metrics.core.api.Availability; +import org.hawkular.metrics.core.api.AvailabilityData; import org.hawkular.metrics.core.api.Interval; import org.hawkular.metrics.core.api.MetricId; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.google.common.base.Function; - /** * @author John Sanda */ -public class TaggedAvailabilityMappper implements Function>> { +public class TaggedAvailabilityMapper { - @Override - public Map> apply(ResultSet resultSet) { + public static Map> apply(ResultSet resultSet) { Map> taggedData = new HashMap<>(); Availability metric = null; LinkedHashSet set = new LinkedHashSet<>(); @@ -62,12 +60,12 @@ public Map> apply(ResultSet resultSet) { return taggedData; } - private Availability createMetric(Row row) { + private static Availability createMetric(Row row) { return new Availability(row.getString(0), new MetricId(row.getString(4), Interval.parse(row.getString(5)))); } - private AvailabilityData createAvailability(Row row) { + private static AvailabilityData createAvailability(Row row) { return new AvailabilityData(row.getUUID(6), row.getBytes(7)); } diff --git a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedGaugeDataMapper.java b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedGaugeDataMapper.java index 5b18a9e03..00f9892d0 100644 --- a/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedGaugeDataMapper.java +++ b/core/metrics-core-impl/src/main/java/org/hawkular/metrics/core/impl/cassandra/TaggedGaugeDataMapper.java @@ -21,22 +21,20 @@ import java.util.Map; import java.util.Set; -import org.hawkular.metrics.core.api.Interval; -import org.hawkular.metrics.core.api.MetricId; -import org.hawkular.metrics.core.api.GaugeData; -import org.hawkular.metrics.core.api.Gauge; - import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; -import com.google.common.base.Function; + +import org.hawkular.metrics.core.api.Gauge; +import org.hawkular.metrics.core.api.GaugeData; +import org.hawkular.metrics.core.api.Interval; +import org.hawkular.metrics.core.api.MetricId; /** * @author John Sanda */ -public class TaggedGaugeDataMapper implements Function>> { +public class TaggedGaugeDataMapper { - @Override - public Map> apply(ResultSet resultSet) { + public static Map> apply(ResultSet resultSet) { Map> taggedData = new HashMap<>(); Gauge metric = null; LinkedHashSet set = new LinkedHashSet<>(); @@ -62,11 +60,11 @@ public Map> apply(ResultSet resultSet) { return taggedData; } - private Gauge createMetric(Row row) { + private static Gauge createMetric(Row row) { return new Gauge(row.getString(0), new MetricId(row.getString(4), Interval.parse(row.getString(5)))); } - private GaugeData createGaugeData(Row row) { + private static GaugeData createGaugeData(Row row) { return new GaugeData(row.getUUID(6), row.getDouble(7)); } diff --git a/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/DataAccessITest.java b/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/DataAccessITest.java index fa4925218..86218b9f7 100644 --- a/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/DataAccessITest.java +++ b/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/DataAccessITest.java @@ -157,8 +157,7 @@ public void addMetadataToGaugeRawData() throws Exception { Gauge metric = new Gauge("tenant-1", new MetricId("metric-1"), ImmutableMap.of("units", "KB", "env", "test")); - ResultSetFuture insertFuture = dataAccess.addTagsAndDataRetention(metric); - getUninterruptibly(insertFuture); + dataAccess.addTagsAndDataRetention(metric).toBlocking(); metric.addData(new GaugeData(start.getMillis(), 1.23)); metric.addData(new GaugeData(start.plusMinutes(2).getMillis(), 1.234)); diff --git a/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/DelegatingDataAccess.java b/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/DelegatingDataAccess.java index 30034077b..caa48fcb7 100644 --- a/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/DelegatingDataAccess.java +++ b/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/DelegatingDataAccess.java @@ -76,28 +76,28 @@ public Observable findMetric(String tenantId, MetricType type, Metric } @Override - public ResultSetFuture getMetricTags(String tenantId, MetricType type, MetricId id, long dpart) { + public Observable getMetricTags(String tenantId, MetricType type, MetricId id, long dpart) { return delegate.getMetricTags(tenantId, type, id, dpart); } @Override - public ResultSetFuture addTagsAndDataRetention(Metric metric) { + public Observable addTagsAndDataRetention(Metric metric) { return delegate.addTagsAndDataRetention(metric); } @Override - public ResultSetFuture addTags(Metric metric, Map tags) { + public Observable addTags(Metric metric, Map tags) { return delegate.addTags(metric, tags); } @Override - public ResultSetFuture deleteTags(Metric metric, Set tags) { + public Observable deleteTags(Metric metric, Set tags) { return delegate.deleteTags(metric, tags); } @Override - public ResultSetFuture updateTagsInMetricsIndex(Metric metric, Map additions, - Set deletions) { + public Observable updateTagsInMetricsIndex(Metric metric, Map additions, + Set deletions) { return delegate.updateTagsInMetricsIndex(metric, additions, deletions); } @@ -133,22 +133,22 @@ public Observable findData(String tenantId, MetricId id, long startTi } @Override - public ResultSetFuture findData(Gauge metric, long timestamp, boolean includeWriteTime) { + public Observable findData(Gauge metric, long timestamp, boolean includeWriteTime) { return delegate.findData(metric, timestamp, includeWriteTime); } @Override - public ResultSetFuture findData(Availability metric, long startTime, long endTime) { + public Observable findData(Availability metric, long startTime, long endTime) { return delegate.findData(metric, startTime, endTime); } @Override - public ResultSetFuture findData(Availability metric, long startTime, long endTime, boolean includeWriteTime) { + public Observable findData(Availability metric, long startTime, long endTime, boolean includeWriteTime) { return delegate.findData(metric, startTime, endTime, includeWriteTime); } @Override - public ResultSetFuture findData(Availability metric, long timestamp) { + public Observable findData(Availability metric, long timestamp) { return delegate.findData(metric, timestamp); } @@ -163,29 +163,29 @@ public ResultSetFuture findAllGuageMetrics() { } @Override - public ResultSetFuture insertGuageTag(String tag, String tagValue, Gauge metric, - List data) { - return delegate.insertGuageTag(tag, tagValue, metric, data); + public Observable insertGaugeTag(String tag, String tagValue, Gauge metric, + Observable data) { + return delegate.insertGaugeTag(tag, tagValue, metric, data); } @Override - public ResultSetFuture insertAvailabilityTag(String tag, String tagValue, Availability metric, - List data) { + public Observable insertAvailabilityTag(String tag, String tagValue, Availability metric, + Observable data) { return delegate.insertAvailabilityTag(tag, tagValue, metric, data); } @Override - public ResultSetFuture updateDataWithTag(Metric metric, MetricData data, Map tags) { + public Observable updateDataWithTag(Metric metric, MetricData data, Map tags) { return delegate.updateDataWithTag(metric, data, tags); } @Override - public ResultSetFuture findGuageDataByTag(String tenantId, String tag, String tagValue) { - return delegate.findGuageDataByTag(tenantId, tag, tagValue); + public Observable findGaugeDataByTag(String tenantId, String tag, String tagValue) { + return delegate.findGaugeDataByTag(tenantId, tag, tagValue); } @Override - public ResultSetFuture findAvailabilityByTag(String tenantId, String tag, String tagValue) { + public Observable findAvailabilityByTag(String tenantId, String tag, String tagValue) { return delegate.findAvailabilityByTag(tenantId, tag, tagValue); } @@ -225,17 +225,17 @@ public ResultSetFuture updateRetentionsIndex(Metric metric) { } @Override - public ResultSetFuture insertIntoMetricsTagsIndex(Metric metric, Map tags) { + public Observable insertIntoMetricsTagsIndex(Metric metric, Map tags) { return delegate.insertIntoMetricsTagsIndex(metric, tags); } @Override - public ResultSetFuture deleteFromMetricsTagsIndex(Metric metric, Map tags) { + public Observable deleteFromMetricsTagsIndex(Metric metric, Map tags) { return delegate.deleteFromMetricsTagsIndex(metric, tags); } @Override - public ResultSetFuture findMetricsByTag(String tenantId, String tag) { + public Observable findMetricsByTag(String tenantId, String tag) { return delegate.findMetricsByTag(tenantId, tag); } } diff --git a/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/MetricsServiceCassandraITest.java b/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/MetricsServiceCassandraITest.java index 7f51d036f..916580bcf 100644 --- a/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/MetricsServiceCassandraITest.java +++ b/core/metrics-core-impl/src/test/java/org/hawkular/metrics/core/impl/cassandra/MetricsServiceCassandraITest.java @@ -19,7 +19,6 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; - import static org.hawkular.metrics.core.api.AvailabilityType.DOWN; import static org.hawkular.metrics.core.api.AvailabilityType.UNKNOWN; import static org.hawkular.metrics.core.api.AvailabilityType.UP; @@ -36,7 +35,6 @@ import static org.testng.Assert.assertTrue; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,6 +43,17 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.hawkular.metrics.core.api.Availability; import org.hawkular.metrics.core.api.AvailabilityData; import org.hawkular.metrics.core.api.Gauge; @@ -63,17 +72,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.datastax.driver.core.BatchStatement; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Row; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - import rx.Observable; /** @@ -216,12 +214,12 @@ public void updateMetricTags() throws Exception { metricsService.createMetric(metric).toBlocking().lastOrDefault(null); Map additions = ImmutableMap.of("a2", "two", "a3", "3"); - ListenableFuture insertFuture = metricsService.addTags(metric, additions); - getUninterruptibly(insertFuture); + metricsService.addTags(metric, additions).toBlocking().lastOrDefault + (null); Map deletions = ImmutableMap.of("a1", "1"); - ListenableFuture deleteFuture = metricsService.deleteTags(metric, deletions); - getUninterruptibly(deleteFuture); + metricsService.deleteTags(metric, deletions).toBlocking() + .lastOrDefault(null); Metric updatedMetric = metricsService.findMetric(metric.getTenantId(), GAUGE, metric.getId()).toBlocking().last().get(); @@ -286,8 +284,8 @@ public void verifyTTLsSetOnGaugeData() throws Exception { Map tags = ImmutableMap.of("tag1", ""); verifyTTLDataAccess.gaugeTagTTLLessThanEqualTo(DEFAULT_TTL - days(2).toStandardSeconds().getSeconds()); - getUninterruptibly(metricsService.tagGaugeData(m1, tags, start.getMillis(), - start.plusMinutes(2).getMillis())); + metricsService.tagGaugeData(m1, tags, start.getMillis(), + start.plusMinutes(2).getMillis()).toBlocking(); verifyTTLDataAccess.setGaugeTTL(days(14).toStandardSeconds().getSeconds()); Gauge m2 = new Gauge("t2", new MetricId("m2")); @@ -295,7 +293,7 @@ public void verifyTTLsSetOnGaugeData() throws Exception { addDataInThePast(m2, days(3).toStandardDuration()); verifyTTLDataAccess.gaugeTagTTLLessThanEqualTo(days(14).minus(3).toStandardSeconds().getSeconds()); - getUninterruptibly(metricsService.tagGaugeData(m2, tags, start.plusMinutes(5).getMillis())); + metricsService.tagGaugeData(m2, tags, start.plusMinutes(5).getMillis()).toBlocking(); metricsService.createTenant(new Tenant().setId("t3").setRetention(GAUGE, 24)) .toBlocking() @@ -339,8 +337,7 @@ public void verifyTTLsSetOnAvailabilityData() throws Exception { Map tags = ImmutableMap.of("tag1", ""); verifyTTLDataAccess.availabilityTagTLLLessThanEqualTo(DEFAULT_TTL - days(2).toStandardSeconds().getSeconds()); - getUninterruptibly(metricsService.tagAvailabilityData(m1, tags, start.getMillis(), - start.plusMinutes(2).getMillis())); + metricsService.tagAvailabilityData(m1, tags, start.getMillis(), start.plusMinutes(2).getMillis()).toBlocking(); verifyTTLDataAccess.setAvailabilityTTL(days(14).toStandardSeconds().getSeconds()); Availability m2 = new Availability("t2", new MetricId("m2")); @@ -348,7 +345,7 @@ public void verifyTTLsSetOnAvailabilityData() throws Exception { addDataInThePast(m2, days(5).toStandardDuration()); verifyTTLDataAccess.availabilityTagTLLLessThanEqualTo(days(14).minus(5).toStandardSeconds().getSeconds()); - getUninterruptibly(metricsService.tagAvailabilityData(m2, tags, start.plusMinutes(5).getMillis())); + metricsService.tagAvailabilityData(m2, tags, start.plusMinutes(5).getMillis()).toBlocking(); metricsService.createTenant(new Tenant().setId("t3").setRetention(AVAILABILITY, 24)) .toBlocking() @@ -424,14 +421,11 @@ public void fetchGaugeDataThatHasTags() throws Exception { metricsService.addGaugeData(Observable.just(metric)).toBlocking().lastOrDefault(null); Map tags1 = ImmutableMap.of("t1", "1", "t2", ""); - ListenableFuture> tagFuture = metricsService.tagGaugeData(metric, tags1, - start.plusMinutes(2).getMillis()); - getUninterruptibly(tagFuture); + metricsService.tagGaugeData(metric, tags1, start.plusMinutes(2).getMillis()).toBlocking().lastOrDefault(null); Map tags2 = ImmutableMap.of("t3", "3", "t4", ""); - tagFuture = metricsService.tagGaugeData(metric, tags2, start.plusMinutes(3).getMillis(), - start.plusMinutes(5).getMillis()); - getUninterruptibly(tagFuture); + metricsService.tagGaugeData(metric, tags2, start.plusMinutes(3).getMillis(), start.plusMinutes(5).getMillis() + ).toBlocking().lastOrDefault(null); Observable observable = metricsService.findGaugeData("tenant1", new MetricId("m1"), start.getMillis(), end.getMillis()); @@ -571,14 +565,12 @@ public void fetchAvailabilityDataThatHasTags() throws Exception { getUninterruptibly(insertFuture); Map tags1 = ImmutableMap.of("t1", "1", "t2", ""); - ListenableFuture> tagFuture = metricsService.tagAvailabilityData(metric, tags1, - start.plusMinutes(2).getMillis()); - getUninterruptibly(tagFuture); + metricsService.tagAvailabilityData(metric, tags1, start.plusMinutes(2).getMillis()).toBlocking() + .lastOrDefault(null); Map tags2 = ImmutableMap.of("t3", "3", "t4", ""); - tagFuture = metricsService.tagAvailabilityData(metric, tags2, start.plusMinutes(3).getMillis(), - start.plusMinutes(5).getMillis()); - getUninterruptibly(tagFuture); + metricsService.tagAvailabilityData(metric, tags2, start.plusMinutes(3).getMillis(), + start.plusMinutes(5).getMillis()).toBlocking().lastOrDefault(null); ListenableFuture> queryFuture = metricsService.findAvailabilityData("tenant1", metric.getId(), start.getMillis(), end.getMillis()); @@ -679,26 +671,20 @@ public void tagGaugeDataByDateRangeAndQueryByMultipleTags() throws Exception { Map tags1 = ImmutableMap.of("t1", "1"); Map tags2 = ImmutableMap.of("t2", "2"); - ListenableFuture> tagFuture1 = metricsService.tagGaugeData(m1, tags1, start.getMillis(), - start.plusMinutes(6).getMillis()); - ListenableFuture> tagFuture2 = metricsService.tagGaugeData(m2, tags1, start.getMillis(), - start.plusMinutes(6).getMillis()); - ListenableFuture> tagFuture3 = metricsService.tagGaugeData(m1, tags2, - start.plusMinutes(4).getMillis(), start.plusMinutes(8).getMillis()); - ListenableFuture> tagFuture4 = metricsService.tagGaugeData(m2, tags2, - start.plusMinutes(4).getMillis(), start.plusMinutes(8).getMillis()); - ListenableFuture> tagFuture5 = metricsService.tagGaugeData(m3, tags2, - start.plusMinutes(4).getMillis(), start.plusMinutes(8).getMillis()); - - getUninterruptibly(tagFuture1); - getUninterruptibly(tagFuture2); - getUninterruptibly(tagFuture3); - getUninterruptibly(tagFuture4); - getUninterruptibly(tagFuture5); - - ListenableFuture>> queryFuture = metricsService.findGaugeDataByTags(tenant, - ImmutableMap.of("t1", "1", "t2", "2")); - Map> actual = getUninterruptibly(queryFuture); + metricsService.tagGaugeData(m1, tags1, start.getMillis(), start.plusMinutes(6).getMillis()).toBlocking() + .lastOrDefault(null); + metricsService.tagGaugeData(m2, tags1, start.getMillis(), start.plusMinutes(6).getMillis()).toBlocking() + .lastOrDefault(null); + metricsService.tagGaugeData(m1, tags2, start.plusMinutes(4).getMillis(), start.plusMinutes(8).getMillis()) + .toBlocking().lastOrDefault(null); + metricsService.tagGaugeData(m2, tags2, start.plusMinutes(4).getMillis(), start.plusMinutes(8).getMillis()) + .toBlocking().lastOrDefault(null); + metricsService.tagGaugeData(m3, tags2, start.plusMinutes(4).getMillis(), start.plusMinutes(8).getMillis()) + .toBlocking().lastOrDefault(null); + + Map> actual = metricsService + .findGaugeDataByTags(tenant, ImmutableMap.of("t1", "1", "t2", "2")).toBlocking().toIterable().iterator() + .next(); ImmutableMap> expected = ImmutableMap.of( new MetricId("m1"), ImmutableSet.of(d1, d2, d6), new MetricId("m2"), ImmutableSet.of(d5, d3) @@ -746,29 +732,23 @@ public void tagAvailabilityByDateRangeAndQueryByMultipleTags() throws Exception Map tags1 = ImmutableMap.of("t1", "1"); Map tags2 = ImmutableMap.of("t2", "2"); - ListenableFuture> tagFuture1 = metricsService.tagAvailabilityData(m1, tags1, - start.getMillis(), start.plusMinutes(6).getMillis()); - ListenableFuture> tagFuture2 = metricsService.tagAvailabilityData(m2, tags1, - start.getMillis(), start.plusMinutes(6).getMillis()); - ListenableFuture> tagFuture3 = metricsService.tagAvailabilityData(m1, tags2, - start.plusMinutes(4).getMillis(), start.plusMinutes(8).getMillis()); - ListenableFuture> tagFuture4 = metricsService.tagAvailabilityData(m2, tags2, - start.plusMinutes(4).getMillis(), start.plusMinutes(8).getMillis()); - ListenableFuture> tagFuture5 = metricsService.tagAvailabilityData(m3, tags2, - start.plusMinutes(4).getMillis(), start.plusMinutes(8).getMillis()); - - getUninterruptibly(tagFuture1); - getUninterruptibly(tagFuture2); - getUninterruptibly(tagFuture3); - getUninterruptibly(tagFuture4); - getUninterruptibly(tagFuture5); - - ListenableFuture>> queryFuture = metricsService.findAvailabilityByTags( - tenant, ImmutableMap.of("t1", "1", "t2", "2")); - Map> actual = getUninterruptibly(queryFuture); + metricsService.tagAvailabilityData(m1, tags1, start.getMillis(), start.plusMinutes(6).getMillis()).toBlocking + ().lastOrDefault(null); + metricsService.tagAvailabilityData(m2, tags1, start.getMillis(), start.plusMinutes(6).getMillis()).toBlocking + ().lastOrDefault(null); + metricsService.tagAvailabilityData(m1, tags2, start.plusMinutes(4).getMillis(), + start.plusMinutes(8).getMillis()).toBlocking().lastOrDefault(null); + metricsService.tagAvailabilityData(m2, tags2, start.plusMinutes(4).getMillis(), + start.plusMinutes(8).getMillis()).toBlocking().lastOrDefault(null); + metricsService.tagAvailabilityData(m3, tags2, start.plusMinutes(4).getMillis(), + start.plusMinutes(8).getMillis()).toBlocking().lastOrDefault(null); + + Map> actual = metricsService + .findAvailabilityByTags(tenant, ImmutableMap.of("t1", "1", "t2", "2")).toBlocking().toIterable().iterator + ().next(); ImmutableMap> expected = ImmutableMap.of( - new MetricId("m1"), ImmutableSet.of(a1, a2, a6), - new MetricId("m2"), ImmutableSet.of(a5, a3) + new MetricId("m1"), ImmutableSet.of(a1, a2, a6), + new MetricId("m2"), ImmutableSet.of(a5, a3) ); assertEquals(actual, expected, "The tagged data does not match"); @@ -810,35 +790,32 @@ public void tagIndividualGaugeDataPoints() throws Exception { metricsService.addGaugeData(Observable.just(m1, m2, m3)).toBlocking().lastOrDefault(null); Map tags1 = ImmutableMap.of("t1", ""); - ListenableFuture> tagFuture = metricsService.tagGaugeData(m1, tags1, d1.getTimestamp()); - assertEquals(getUninterruptibly(tagFuture), singletonList(d1), - "Tagging " + d1 + " returned unexpected results"); + ResultSet tagFuture = metricsService.tagGaugeData(m1, tags1, d1.getTimestamp()) + .toBlocking().lastOrDefault(null); +// assertTrue(tagFuture.wasApplied(), "Tagging " + d1 + " returned unexpected results"); Map tags2 = ImmutableMap.of("t1", "", "t2", "", "t3", ""); - tagFuture = metricsService.tagGaugeData(m1, tags2, d2.getTimestamp()); - assertEquals(getUninterruptibly(tagFuture), singletonList(d2), - "Tagging " + d2 + " returned unexpected results"); + tagFuture = metricsService.tagGaugeData(m1, tags2, d2.getTimestamp()).toBlocking().lastOrDefault(null); +// assertTrue(tagFuture.wasApplied(), "Tagging " + d2 + " returned unexpected results"); - tagFuture = metricsService.tagGaugeData(m1, tags1, start.minusMinutes(10).getMillis()); - assertEquals(getUninterruptibly(tagFuture), Collections.emptyList(), - "No data should be returned since there is no data for this time"); +// tagFuture = metricsService.tagGaugeData(m1, tags1, start.minusMinutes(10).getMillis()); +// assertEquals(getUninterruptibly(tagFuture), Collections.emptyList(), +// "No data should be returned since there is no data for this time"); Map tags3 = ImmutableMap.of("t1", "", "t2", ""); - tagFuture = metricsService.tagGaugeData(m2, tags3, d3.getTimestamp()); - assertEquals(getUninterruptibly(tagFuture), singletonList(d3), - "Tagging " + d3 + " returned unexpected results"); + tagFuture = metricsService.tagGaugeData(m2, tags3, d3.getTimestamp()).toBlocking().lastOrDefault(null); +// assertTrue(tagFuture.wasApplied(), "Tagging " + d3 + " returned unexpected results"); Map tags4 = ImmutableMap.of("t3", "", "t4", ""); - tagFuture = metricsService.tagGaugeData(m2, tags4, d4.getTimestamp()); - assertEquals(getUninterruptibly(tagFuture), singletonList(d4), - "Tagging " + d4 + " returned unexpected results"); + tagFuture = metricsService.tagGaugeData(m2, tags4, d4.getTimestamp()).toBlocking().lastOrDefault(null); +// assertTrue(tagFuture.wasApplied(), "Tagging " + d4 + " returned unexpected results"); + + Map> actual = metricsService.findGaugeDataByTags(tenant, + ImmutableMap.of("t2", "", "t3", "")).toBlocking().lastOrDefault(null); - ListenableFuture>> queryFuture = metricsService.findGaugeDataByTags(tenant, - ImmutableMap.of("t2", "", "t3", "")); - Map> actual = getUninterruptibly(queryFuture); ImmutableMap> expected = ImmutableMap.of( - new MetricId("m1"), ImmutableSet.of(d2), - new MetricId("m2"), ImmutableSet.of(d3, d4) + new MetricId("m1"), ImmutableSet.of(d2), + new MetricId("m2"), ImmutableSet.of(d3, d4) ); assertEquals(actual, expected, "The tagged data does not match"); @@ -881,33 +858,27 @@ public void tagIndividualAvailabilityDataPoints() throws Exception { getUninterruptibly(insertFuture); Map tags1 = ImmutableMap.of("t1", ""); - ListenableFuture> tagFuture = metricsService.tagAvailabilityData(m1, tags1, - a1.getTimestamp()); - assertEquals(getUninterruptibly(tagFuture), singletonList(a1), - "Tagging " + a1 + " returned unexpected results"); + ResultSet tagFuture = metricsService.tagAvailabilityData(m1, tags1, a1.getTimestamp()).toBlocking().last(); +// assertTrue(tagFuture.wasApplied(), "Tagging " + a1 + " returned unexpected results"); Map tags2 = ImmutableMap.of("t1", "", "t2", "", "t3", ""); - tagFuture = metricsService.tagAvailabilityData(m1, tags2, a2.getTimestamp()); - assertEquals(getUninterruptibly(tagFuture), singletonList(a2), - "Tagging " + a2 + " returned unexpected results"); + tagFuture = metricsService.tagAvailabilityData(m1, tags2, a2.getTimestamp()).toBlocking().last(); +// assertTrue(tagFuture.wasApplied(), "Tagging " + a2 + " returned unexpected results"); - tagFuture = metricsService.tagAvailabilityData(m1, tags1, start.minusMinutes(10).getMillis()); - assertEquals(getUninterruptibly(tagFuture), Collections.emptyList(), - "No data should be returned since there is no data for this time"); +// tagFuture = metricsService.tagAvailabilityData(m1, tags1, start.minusMinutes(10).getMillis()); +// assertEquals(getUninterruptibly(tagFuture), Collections.emptyList(), +// "No data should be returned since there is no data for this time"); Map tags3 = ImmutableMap.of("t2", "", "t3", ""); - tagFuture = metricsService.tagAvailabilityData(m2, tags3, a3.getTimestamp()); - assertEquals(getUninterruptibly(tagFuture), singletonList(a3), - "Tagging " + a3 + " returned unexpected results"); + tagFuture = metricsService.tagAvailabilityData(m2, tags3, a3.getTimestamp()).toBlocking().last(); +// assertTrue(tagFuture.wasApplied(), "Tagging " + a3 + " returned unexpected results"); Map tags4 = ImmutableMap.of("t3", "", "t4", ""); - tagFuture = metricsService.tagAvailabilityData(m2, tags4, a4.getTimestamp()); - assertEquals(getUninterruptibly(tagFuture), singletonList(a4), - "Tagging " + a4 + " returned unexpected results"); + metricsService.tagAvailabilityData(m2, tags4, a4.getTimestamp()).toBlocking().last(); +// assertTrue(tagFuture.wasApplied(), "Tagging " + a4 + " returned unexpected results"); - ListenableFuture>> queryFuture = metricsService.findAvailabilityByTags( - tenant, tags3); - Map> actual = getUninterruptibly(queryFuture); + Map> actual = metricsService.findAvailabilityByTags( + tenant, tags3).toBlocking().last(); ImmutableMap> expected = ImmutableMap.of( new MetricId("m1"), ImmutableSet.of(a2), new MetricId("m2"), ImmutableSet.of(a3, a4) @@ -955,6 +926,11 @@ private List toList(Observable observable) { return ImmutableList.copyOf(observable.toBlocking().toIterable()); } + private void assertMetricEquals(Metric actual, Metric expected) { + assertEquals(actual, expected, "The metric doe not match the expected value"); + assertEquals(actual.getData(), expected.getData(), "The data does not match the expected values"); + } + private void assertMetricIndexMatches(String tenantId, MetricType type, List expected) throws Exception { List> actualIndex = ImmutableList.copyOf(metricsService.findMetrics(tenantId, type).toBlocking() @@ -1014,8 +990,7 @@ public String toString() { private void assertMetricsTagsIndexMatches(String tenantId, String tag, List expected) throws Exception { - ResultSetFuture queryFuture = dataAccess.findMetricsByTag(tenantId, tag); - ResultSet resultSet = getUninterruptibly(queryFuture); + ResultSet resultSet = dataAccess.findMetricsByTag(tenantId, tag).toBlocking().first(); List actual = new ArrayList<>(); for (Row row : resultSet) { @@ -1084,19 +1059,23 @@ public ResultSetFuture insertData(Availability metric, int ttl) { } @Override - public ResultSetFuture insertGuageTag(String tag, String tagValue, Gauge metric, - List data) { - for (GaugeData d : data) { + public Observable insertGaugeTag(String tag, String tagValue, Gauge metric, + Observable data) { + + List first = data.toList().toBlocking().first(); + + for (GaugeData d : first) { assertTrue(d.getTTL() <= gaugeTagTTL, "Expected the TTL to be <= " + gaugeTagTTL + " but it was " + d.getTTL()); } - return super.insertGuageTag(tag, tagValue, metric, data); + return super.insertGaugeTag(tag, tagValue, metric, data); } @Override - public ResultSetFuture insertAvailabilityTag(String tag, String tagValue, Availability metric, - List data) { - for (AvailabilityData a : data) { + public Observable insertAvailabilityTag(String tag, String tagValue, Availability metric, + Observable data) { + List first = data.toList().toBlocking().first(); + for (AvailabilityData a : first) { assertTrue(a.getTTL() <= availabilityTagTTL, "Expected the TTL to be <= " + availabilityTagTTL + " but it was " + a.getTTL()); }