Skip to content

Commit

Permalink
[HWKMETRICS-148] Update metrics creation to allow users to overwrite …
Browse files Browse the repository at this point in the history
…previously created metric definitions. The flag defaults to false and is not required.
  • Loading branch information
Stefan Negrea committed Apr 28, 2016
1 parent c855b45 commit 0fe8267
Show file tree
Hide file tree
Showing 15 changed files with 221 additions and 41 deletions.
Expand Up @@ -104,6 +104,8 @@ public class AvailabilityHandler {
public void createAvailabilityMetric(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<AvailabilityType> metric,
@ApiParam(value = "Overwrite previously created metric if it exists. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() != null
Expand All @@ -116,7 +118,7 @@ public void createAvailabilityMetric(
metric = new Metric<>(
new MetricId<>(tenantId, AVAILABILITY, metric.getMetricId().getName()), metric.getTags(),
metric.getDataRetention());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Expand Up @@ -109,6 +109,8 @@ public class CounterHandler {
public void createCounter(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<Long> metric,
@ApiParam(value = "Overwrite previously created metric if it exists. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() != null
Expand All @@ -121,7 +123,7 @@ public void createCounter(
metric = new Metric<>(new MetricId<>(tenantId, COUNTER, metric.getId()),
metric.getTags(), metric.getDataRetention());
URI location = uriInfo.getBaseUriBuilder().path("/counters/{id}").build(metric.getMetricId().getName());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Expand Up @@ -106,6 +106,8 @@ public class GaugeHandler {
public void createGaugeMetric(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<Double> metric,
@ApiParam(value = "Overwrite previously created metric if it exists. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() != null
Expand All @@ -117,7 +119,7 @@ public void createGaugeMetric(
metric = new Metric<>(new MetricId<>(tenantId, GAUGE, metric.getId()), metric.getTags(),
metric.getDataRetention());
URI location = uriInfo.getBaseUriBuilder().path("/gauges/{id}").build(metric.getMetricId().getName());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Expand Up @@ -31,6 +31,7 @@

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -100,6 +101,8 @@ public class MetricHandler {
public <T> void createMetric(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<T> metric,
@ApiParam(value = "Overwrite previously created metric if it exists. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() == null || !metric.getType().isUserType()) {
Expand All @@ -109,7 +112,7 @@ public <T> void createMetric(
metric = new Metric<>(id, metric.getTags(), metric.getDataRetention());
URI location = uriInfo.getBaseUriBuilder().path("/{type}/{id}").build(MetricTypeTextConverter.getLongForm(id
.getType()), id.getName());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Expand Up @@ -45,7 +45,7 @@ public interface DataAccess {

Observable<Row> findTenant(String id);

<T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric);
<T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean overwrite);

<T> Observable<Row> findMetric(MetricId<T> id);

Expand Down
Expand Up @@ -73,6 +73,8 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement insertIntoMetricsIndex;

private PreparedStatement insertIntoMetricsIndexOverwrite;

private PreparedStatement findMetric;

private PreparedStatement getMetricTags;
Expand Down Expand Up @@ -180,6 +182,10 @@ protected void initPreparedStatements() {
"VALUES (?, ?, ?, ?, ?) " +
"IF NOT EXISTS");

insertIntoMetricsIndexOverwrite = session.prepare(
"INSERT INTO metrics_idx (tenant_id, type, metric, data_retention, tags) " +
"VALUES (?, ?, ?, ?, ?) ");

updateMetricsIndex = session.prepare(
"INSERT INTO metrics_idx (tenant_id, type, metric) VALUES (?, ?, ?)");

Expand Down Expand Up @@ -355,8 +361,15 @@ public Observable<Row> findTenant(String id) {
}

@Override
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric) {
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean overwrite) {
MetricId<T> metricId = metric.getMetricId();

if (overwrite) {
return session.executeAsync(
insertIntoMetricsIndexOverwrite.bind(metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName(), metric.getDataRetention(), metric.getTags()));
}

return session.executeAsync(insertIntoMetricsIndex.bind(metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName(), metric.getDataRetention(), metric.getTags()));
}
Expand Down
Expand Up @@ -84,13 +84,14 @@ public interface MetricsService {
* </p>
*
* @param metric The metric to create
* @param overwrite Flag to force overwrite previous metric definition if it exists
*
* @return This method only has side effects and does not return any data. As such,
* {@link rx.Observer#onNext(Object) onNext} is not called. {@link rx.Observer#onCompleted()} onCompleted}
* is called when the operation completes successfully, and {@link rx.Observer#onError(Throwable)} onError}
* is called when it fails.
*/
Observable<Void> createMetric(Metric<?> metric);
Observable<Void> createMetric(Metric<?> metric, boolean overwrite);

<T> Observable<Metric<T>> findMetric(MetricId<T> id);

Expand Down
Expand Up @@ -382,19 +382,18 @@ private List<String> loadTenantIds() {
}

@Override
public Observable<Void> createMetric(Metric<?> metric) {
public Observable<Void> createMetric(Metric<?> metric, boolean overwrite) {
MetricType<?> metricType = metric.getMetricId().getType();
if (!metricType.isUserType()) {
throw new IllegalArgumentException(metric + " cannot be created. " + metricType + " metrics are " +
"internally generated metrics and cannot be created by clients.");
}

ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric);
ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric, overwrite);
Observable<ResultSet> indexUpdated = ListenableFutureObservable.from(future, metricsTasks);
return Observable.create(subscriber -> indexUpdated.subscribe(resultSet -> {
if (!resultSet.wasApplied()) {
if (!overwrite && !resultSet.wasApplied()) {
subscriber.onError(new MetricAlreadyExistsException(metric));

} else {
// TODO Need error handling if either of the following updates fail
// If adding tags/retention fails, then we want to report the error to the
Expand Down
Expand Up @@ -65,8 +65,8 @@ public Observable<Row> findTenant(String id) {
}

@Override
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric) {
return delegate.insertMetricInMetricsIndex(metric);
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean overwrite) {
return delegate.insertMetricInMetricsIndex(metric, overwrite);
}

@Override
Expand Down
Expand Up @@ -79,9 +79,9 @@ public void generateRates() {
Metric<Long> c2 = new Metric<>(new MetricId<>(tenant, COUNTER, "C2"));
Metric<Long> c3 = new Metric<>(new MetricId<>(tenant, COUNTER, "C3"));

doAction(() -> metricsService.createMetric(c1));
doAction(() -> metricsService.createMetric(c2));
doAction(() -> metricsService.createMetric(c3));
doAction(() -> metricsService.createMetric(c1, false));
doAction(() -> metricsService.createMetric(c2, false));
doAction(() -> metricsService.createMetric(c3, false));

doAction(() -> metricsService.addDataPoints(COUNTER, Observable.from(asList(
new Metric<>(c1.getMetricId(), asList(new DataPoint<>(start.getMillis(), 10L),
Expand Down
Expand Up @@ -181,7 +181,7 @@ public void createTenants() throws Exception {
@Test
public void createMetricsIdxTenants() throws Exception {
Metric<Double> em1 = new Metric<>(new MetricId<>("t123", GAUGE, "em1"));
metricsService.createMetric(em1).toBlocking().lastOrDefault(null);
metricsService.createMetric(em1, false).toBlocking().lastOrDefault(null);

Metric<Double> actual = metricsService.<Double> findMetric(em1.getMetricId())
.toBlocking()
Expand All @@ -203,7 +203,7 @@ public void createMetricsIdxTenants() throws Exception {
@Test
public void createAndFindMetrics() throws Exception {
Metric<Double> em1 = new Metric<>(new MetricId<>("t1", GAUGE, "em1"));
metricsService.createMetric(em1).toBlocking().lastOrDefault(null);
metricsService.createMetric(em1, false).toBlocking().lastOrDefault(null);
Metric<Double> actual = metricsService.<Double> findMetric(em1.getMetricId())
.toBlocking()
.lastOrDefault(null);
Expand All @@ -212,14 +212,14 @@ public void createAndFindMetrics() throws Exception {
assertEquals(actual, em2, "The metric does not match the expected value");

Metric<Double> m1 = new Metric<>(new MetricId<>("t1", GAUGE, "m1"), ImmutableMap.of("a1", "1", "a2", "2"), 24);
metricsService.createMetric(m1).toBlocking().lastOrDefault(null);
metricsService.createMetric(m1, false).toBlocking().lastOrDefault(null);

actual = metricsService.<Double> findMetric(m1.getMetricId()).toBlocking().last();
assertEquals(actual, m1, "The metric does not match the expected value");

Metric<AvailabilityType> m2 = new Metric<>(new MetricId<>("t1", AVAILABILITY, "m2"),
ImmutableMap.of("a3", "3", "a4", "3"), DEFAULT_TTL);
metricsService.createMetric(m2).toBlocking().lastOrDefault(null);
metricsService.createMetric(m2, false).toBlocking().lastOrDefault(null);

// Find definitions with given tags
Map<String, String> tagMap = new HashMap<>();
Expand All @@ -229,7 +229,7 @@ public void createAndFindMetrics() throws Exception {
// Test that distinct filtering does not remove same name from different types
Metric<Double> gm2 = new Metric<>(new MetricId<>("t1", GAUGE, "m2"),
ImmutableMap.of("a3", "3", "a4", "3"), null);
metricsService.createMetric(gm2).toBlocking().lastOrDefault(null);
metricsService.createMetric(gm2, false).toBlocking().lastOrDefault(null);

Metric<AvailabilityType> actualAvail = metricsService.<AvailabilityType> findMetric(m2.getMetricId())
.toBlocking()
Expand All @@ -238,7 +238,7 @@ public void createAndFindMetrics() throws Exception {

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
metricsService.createMetric(m1).subscribe(
metricsService.createMetric(m1, false).subscribe(
nullArg -> {
},
t -> {
Expand All @@ -251,11 +251,11 @@ public void createAndFindMetrics() throws Exception {
"Expected a " + MetricAlreadyExistsException.class.getSimpleName() + " to be thrown");

Metric<Double> m3 = new Metric<>(new MetricId<>("t1", GAUGE, "m3"), emptyMap(), 24);
metricsService.createMetric(m3).toBlocking().lastOrDefault(null);
metricsService.createMetric(m3, false).toBlocking().lastOrDefault(null);

Metric<Double> m4 = new Metric<>(new MetricId<>("t1", GAUGE, "m4"),
ImmutableMap.of("a1", "A", "a2", ""), null);
metricsService.createMetric(m4).toBlocking().lastOrDefault(null);
metricsService.createMetric(m4, false).toBlocking().lastOrDefault(null);

assertMetricIndexMatches("t1", GAUGE, asList(new Metric<>(em1.getMetricId(), 7), m1,
new Metric<>(gm2.getMetricId(), gm2.getTags(), 7),
Expand Down Expand Up @@ -314,7 +314,8 @@ private List<Metric> createTagMetrics(String tenantId) throws Exception {
}

// Insert gauges
Observable.from(metricsToAdd).subscribe(m -> metricsService.createMetric(m).toBlocking().lastOrDefault(null));
Observable.from(metricsToAdd)
.subscribe(m -> metricsService.createMetric(m, false).toBlocking().lastOrDefault(null));

return metricsToAdd;
}
Expand Down Expand Up @@ -448,7 +449,7 @@ public void createBasicCounterMetric() throws Exception {
MetricId<Long> id = new MetricId<>(tenantId, COUNTER, name);

Metric<Long> counter = new Metric<>(id);
metricsService.createMetric(counter).toBlocking().lastOrDefault(null);
metricsService.createMetric(counter, false).toBlocking().lastOrDefault(null);

Metric<Long> actual = metricsService.<Long> findMetric(id).toBlocking().lastOrDefault(null);

Expand All @@ -464,7 +465,7 @@ public void createCounterWithTags() throws Exception {
Map<String, String> tags = ImmutableMap.of("x", "1", "y", "2");

Metric<Long> counter = new Metric<>(id, tags, null);
metricsService.createMetric(counter).toBlocking().lastOrDefault(null);
metricsService.createMetric(counter, false).toBlocking().lastOrDefault(null);

Metric<Long> actual = metricsService.findMetric(id).toBlocking().lastOrDefault(null);
Metric<Long> ec = new Metric<>(counter.getMetricId(), counter.getTags(), 7);
Expand All @@ -482,7 +483,7 @@ public void createCounterWithDataRetention() throws Exception {
Integer retention = 100;

Metric<Long> counter = new Metric<>(id, emptyMap(), retention);
metricsService.createMetric(counter).toBlocking().lastOrDefault(null);
metricsService.createMetric(counter, false).toBlocking().lastOrDefault(null);

Metric<Long> actual = metricsService.<Long> findMetric(id).toBlocking().lastOrDefault(null);
assertEquals(actual, counter, "The counter metric does not match");
Expand All @@ -493,7 +494,8 @@ public void createCounterWithDataRetention() throws Exception {

@Test(expectedExceptions = IllegalArgumentException.class)
public void doNotAllowCreationOfCounterRateMetric() {
metricsService.createMetric(new Metric<>(new MetricId<>("test", COUNTER_RATE, "counter-rate"))).toBlocking()
metricsService.createMetric(new Metric<>(new MetricId<>("test", COUNTER_RATE, "counter-rate")), false)
.toBlocking()
.lastOrDefault(null);
}

Expand All @@ -516,7 +518,7 @@ public void doNotAllowEmptyMetricTags() {
public void updateMetricTags() throws Exception {
Metric<Double> metric = new Metric<>(new MetricId<>("t1", GAUGE, "m1"),
ImmutableMap.of("a1", "1", "a2", "2"), DEFAULT_TTL);
metricsService.createMetric(metric).toBlocking().lastOrDefault(null);
metricsService.createMetric(metric, false).toBlocking().lastOrDefault(null);

Map<String, String> additions = ImmutableMap.of("a2", "two", "a3", "3");
metricsService.addTags(metric, additions).toBlocking().lastOrDefault
Expand Down Expand Up @@ -1623,7 +1625,7 @@ public void addGaugeDataForMultipleMetrics() throws Exception {
new DataPoint<>(start.plusSeconds(30).getMillis(), 55.5),
new DataPoint<>(end.getMillis(), 66.6)
));
metricsService.createMetric(m4).toBlocking().lastOrDefault(null);
metricsService.createMetric(m4, false).toBlocking().lastOrDefault(null);

metricsService.addDataPoints(GAUGE, Observable.just(m1, m2, m3, m4)).toBlocking().lastOrDefault(null);

Expand Down Expand Up @@ -1691,7 +1693,7 @@ public void addAvailabilityForMultipleMetrics() throws Exception {
asList(
new DataPoint<>(start.plusMinutes(2).getMillis(), UP),
new DataPoint<>(end.plusMinutes(2).getMillis(), UP)));
metricsService.createMetric(m4).toBlocking().lastOrDefault(null);
metricsService.createMetric(m4, false).toBlocking().lastOrDefault(null);

metricsService.addDataPoints(AVAILABILITY, Observable.just(m4)).toBlocking().lastOrDefault(null);

Expand Down Expand Up @@ -1853,6 +1855,45 @@ public void shouldReceiveInsertedDataNotifications() throws Exception {
assertTrue(actual.containsAll(expected));
}

@Test
public void createCounterMetricExists() throws Exception {
String tenantId = "metric-exists";
String name = "basic-metric";
int dataRetention = 20;
MetricId<Long> id = new MetricId<>(tenantId, COUNTER, name);

Metric<Long> metric = new Metric<>(id, dataRetention);
metricsService.createMetric(metric, false).toBlocking().lastOrDefault(null);

Metric<Long> actual = metricsService.<Long> findMetric(id).toBlocking().last();

assertEquals(actual, new Metric<>(metric.getMetricId(), dataRetention), "The counter metric does not match");
assertMetricIndexMatches(tenantId, COUNTER, singletonList(new Metric<>(metric.getMetricId(), dataRetention)));

try {
metricsService.createMetric(metric, false).toBlocking().lastOrDefault(null);
fail("Metrics should already be stored and not overwritten.");
} catch (MetricAlreadyExistsException e) {

}

dataRetention = 100;
Map<String, String> tags = new HashMap<String, String>();
tags.put("test", "test2");
metric = new Metric<>(id, tags, dataRetention);
try {
metricsService.createMetric(metric, true).toBlocking().lastOrDefault(null);
} catch (MetricAlreadyExistsException e) {
fail("Metrics should already be stored and not overwritten.");
}

actual = metricsService.<Long> findMetric(id).toBlocking().last();
assertEquals(actual, new Metric<>(metric.getMetricId(), tags, dataRetention),
"The counter metric does not match");
assertMetricIndexMatches(tenantId, COUNTER,
singletonList(new Metric<>(metric.getMetricId(), tags, dataRetention)));
}

private <T> List<T> toList(Observable<T> observable) {
return ImmutableList.copyOf(observable.toBlocking().toIterable());
}
Expand Down
Expand Up @@ -120,9 +120,9 @@ public void generateRates() {

doAction(() -> metricsService.createTenant(new Tenant(tenant)));

doAction(() -> metricsService.createMetric(c1));
doAction(() -> metricsService.createMetric(c2));
doAction(() -> metricsService.createMetric(c3));
doAction(() -> metricsService.createMetric(c1, false));
doAction(() -> metricsService.createMetric(c2, false));
doAction(() -> metricsService.createMetric(c3, false));

doAction(() -> metricsService.addDataPoints(COUNTER, Observable.from(asList(
new Metric<>(c1.getMetricId(), asList(new DataPoint<>(start.getMillis(), 10L),
Expand Down

0 comments on commit 0fe8267

Please sign in to comment.