Skip to content

Commit

Permalink
Merge pull request #493 from hawkular/HWKMETRICS-148
Browse files Browse the repository at this point in the history
[HWKMETRICS-148]
  • Loading branch information
jsanda committed Apr 28, 2016
2 parents c855b45 + 7da0cb9 commit a4882a4
Show file tree
Hide file tree
Showing 18 changed files with 310 additions and 84 deletions.
Expand Up @@ -104,6 +104,9 @@ public class AvailabilityHandler {
public void createAvailabilityMetric(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<AvailabilityType> metric,
@ApiParam(value = "Overwrite previously created metric configuration if it exists. "
+ "Only data retention and tags are overwriten; existing data points are unnafected. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() != null
Expand All @@ -116,7 +119,7 @@ public void createAvailabilityMetric(
metric = new Metric<>(
new MetricId<>(tenantId, AVAILABILITY, metric.getMetricId().getName()), metric.getTags(),
metric.getDataRetention());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Expand Up @@ -109,6 +109,9 @@ public class CounterHandler {
public void createCounter(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<Long> metric,
@ApiParam(value = "Overwrite previously created metric configuration if it exists. "
+ "Only data retention and tags are overwriten; existing data points are unnafected. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() != null
Expand All @@ -121,7 +124,7 @@ public void createCounter(
metric = new Metric<>(new MetricId<>(tenantId, COUNTER, metric.getId()),
metric.getTags(), metric.getDataRetention());
URI location = uriInfo.getBaseUriBuilder().path("/counters/{id}").build(metric.getMetricId().getName());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Expand Up @@ -106,6 +106,9 @@ public class GaugeHandler {
public void createGaugeMetric(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<Double> metric,
@ApiParam(value = "Overwrite previously created metric configuration if it exists. "
+ "Only data retention and tags are overwriten; existing data points are unnafected. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() != null
Expand All @@ -117,7 +120,7 @@ public void createGaugeMetric(
metric = new Metric<>(new MetricId<>(tenantId, GAUGE, metric.getId()), metric.getTags(),
metric.getDataRetention());
URI location = uriInfo.getBaseUriBuilder().path("/gauges/{id}").build(metric.getMetricId().getName());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Expand Up @@ -31,6 +31,7 @@

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -100,6 +101,8 @@ public class MetricHandler {
public <T> void createMetric(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<T> metric,
@ApiParam(value = "Overwrite previously created metric if it exists. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() == null || !metric.getType().isUserType()) {
Expand All @@ -109,7 +112,7 @@ public <T> void createMetric(
metric = new Metric<>(id, metric.getTags(), metric.getDataRetention());
URI location = uriInfo.getBaseUriBuilder().path("/{type}/{id}").build(MetricTypeTextConverter.getLongForm(id
.getType()), id.getName());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -25,10 +25,12 @@

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
Expand Down Expand Up @@ -73,10 +75,14 @@ public class TenantsHandler {
public void createTenant(
@Suspended AsyncResponse asyncResponse,
@ApiParam(required = true) TenantDefinition tenantDefinition,
@ApiParam(value = "Overwrite previously created tenant configuration if it exists. "
+ "Only data retention settings are overwriten; existing metrics and data points are unnafected. "
+ "Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
URI location = uriInfo.getBaseUriBuilder().path("/tenants").build();
metricsService.createTenant(tenantDefinition.toTenant())
metricsService.createTenant(tenantDefinition.toTenant(), overwrite)
.subscribe(new TenantCreatedObserver(asyncResponse, location));
}

Expand Down
Expand Up @@ -37,15 +37,14 @@
* @author John Sanda
*/
public interface DataAccess {
Observable<ResultSet> insertTenant(String tenantId);

Observable<ResultSet> insertTenant(Tenant tenant);
Observable<ResultSet> insertTenant(Tenant tenant, boolean overwrite);

Observable<Row> findAllTenantIds();

Observable<Row> findTenant(String id);

<T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric);
<T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean overwrite);

<T> Observable<Row> findMetric(MetricId<T> id);

Expand Down
Expand Up @@ -63,7 +63,7 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement insertTenant;

private PreparedStatement insertTenantId;
private PreparedStatement insertTenantOverwrite;

private PreparedStatement findAllTenantIds;

Expand All @@ -73,6 +73,8 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement insertIntoMetricsIndex;

private PreparedStatement insertIntoMetricsIndexOverwrite;

private PreparedStatement findMetric;

private PreparedStatement getMetricTags;
Expand Down Expand Up @@ -146,11 +148,12 @@ public DataAccessImpl(Session session) {
}

protected void initPreparedStatements() {
insertTenantId = session.prepare("INSERT INTO tenants (id) VALUES (?)");

insertTenant = session.prepare(
"INSERT INTO tenants (id, retentions) VALUES (?, ?) IF NOT EXISTS");

insertTenantOverwrite = session.prepare(
"INSERT INTO tenants (id, retentions) VALUES (?, ?)");

findAllTenantIds = session.prepare("SELECT DISTINCT id FROM tenants");

findAllTenantIdsFromMetricsIdx = session.prepare("SELECT DISTINCT tenant_id, type FROM metrics_idx");
Expand Down Expand Up @@ -180,6 +183,10 @@ protected void initPreparedStatements() {
"VALUES (?, ?, ?, ?, ?) " +
"IF NOT EXISTS");

insertIntoMetricsIndexOverwrite = session.prepare(
"INSERT INTO metrics_idx (tenant_id, type, metric, data_retention, tags) " +
"VALUES (?, ?, ?, ?, ?) ");

updateMetricsIndex = session.prepare(
"INSERT INTO metrics_idx (tenant_id, type, metric) VALUES (?, ?, ?)");

Expand Down Expand Up @@ -332,14 +339,15 @@ protected void initPreparedStatements() {
"WHERE tenant_id = ? AND tname = ? AND tvalue = ?");
}

@Override public Observable<ResultSet> insertTenant(String tenantId) {
return rxSession.execute(insertTenantId.bind(tenantId));
}

@Override
public Observable<ResultSet> insertTenant(Tenant tenant) {
public Observable<ResultSet> insertTenant(Tenant tenant, boolean overwrite) {
Map<String, Integer> retentions = tenant.getRetentionSettings().entrySet().stream()
.collect(toMap(entry -> entry.getKey().getText(), Map.Entry::getValue));

if (overwrite) {
return rxSession.execute(insertTenantOverwrite.bind(tenant.getId(), retentions));
}

return rxSession.execute(insertTenant.bind(tenant.getId(), retentions));
}

Expand All @@ -355,8 +363,15 @@ public Observable<Row> findTenant(String id) {
}

@Override
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric) {
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean overwrite) {
MetricId<T> metricId = metric.getMetricId();

if (overwrite) {
return session.executeAsync(
insertIntoMetricsIndexOverwrite.bind(metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName(), metric.getDataRetention(), metric.getTags()));
}

return session.executeAsync(insertIntoMetricsIndex.bind(metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName(), metric.getDataRetention(), metric.getTags()));
}
Expand Down
Expand Up @@ -57,11 +57,12 @@ public interface MetricsService {
*
* @param tenant
* The {@link Tenant tenant} to create
* @param overwrite Flag to force overwrite previous tenant definition if it exists
* @return void
* @throws org.hawkular.metrics.core.api.exception.TenantAlreadyExistsException
* tenant already exists
*/
Observable<Void> createTenant(Tenant tenant);
Observable<Void> createTenant(Tenant tenant, boolean overwrite);

Observable<Tenant> getTenants();

Expand All @@ -84,13 +85,14 @@ public interface MetricsService {
* </p>
*
* @param metric The metric to create
* @param overwrite Flag to force overwrite previous metric definition if it exists
*
* @return This method only has side effects and does not return any data. As such,
* {@link rx.Observer#onNext(Object) onNext} is not called. {@link rx.Observer#onCompleted()} onCompleted}
* is called when the operation completes successfully, and {@link rx.Observer#onError(Throwable)} onError}
* is called when it fails.
*/
Observable<Void> createMetric(Metric<?> metric);
Observable<Void> createMetric(Metric<?> metric, boolean overwrite);

<T> Observable<Metric<T>> findMetric(MetricId<T> id);

Expand Down
Expand Up @@ -341,9 +341,9 @@ public void setDefaultTTL(int defaultTTL) {
}

@Override
public Observable<Void> createTenant(final Tenant tenant) {
public Observable<Void> createTenant(final Tenant tenant, boolean overwrite) {
return Observable.create(subscriber -> {
Observable<Void> updates = dataAccess.insertTenant(tenant).flatMap(resultSet -> {
Observable<Void> updates = dataAccess.insertTenant(tenant, overwrite).flatMap(resultSet -> {
if (!resultSet.wasApplied()) {
throw new TenantAlreadyExistsException(tenant.getId());
}
Expand Down Expand Up @@ -382,19 +382,18 @@ private List<String> loadTenantIds() {
}

@Override
public Observable<Void> createMetric(Metric<?> metric) {
public Observable<Void> createMetric(Metric<?> metric, boolean overwrite) {
MetricType<?> metricType = metric.getMetricId().getType();
if (!metricType.isUserType()) {
throw new IllegalArgumentException(metric + " cannot be created. " + metricType + " metrics are " +
"internally generated metrics and cannot be created by clients.");
}

ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric);
ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric, overwrite);
Observable<ResultSet> indexUpdated = ListenableFutureObservable.from(future, metricsTasks);
return Observable.create(subscriber -> indexUpdated.subscribe(resultSet -> {
if (!resultSet.wasApplied()) {
if (!overwrite && !resultSet.wasApplied()) {
subscriber.onError(new MetricAlreadyExistsException(metric));

} else {
// TODO Need error handling if either of the following updates fail
// If adding tags/retention fails, then we want to report the error to the
Expand Down
Expand Up @@ -78,9 +78,9 @@ public void insertAndFindTenant() throws Exception {
Tenant tenant2 = new Tenant("tenant-2", ImmutableMap.of(GAUGE, 14));


dataAccess.insertTenant(tenant1).toBlocking().lastOrDefault(null);
dataAccess.insertTenant(tenant1, false).toBlocking().lastOrDefault(null);

dataAccess.insertTenant(tenant2).toBlocking().lastOrDefault(null);
dataAccess.insertTenant(tenant2, false).toBlocking().lastOrDefault(null);

Tenant actual = dataAccess.findTenant(tenant1.getId())
.map(Functions::getTenant)
Expand All @@ -90,8 +90,8 @@ public void insertAndFindTenant() throws Exception {

@Test
public void doNotAllowDuplicateTenants() throws Exception {
dataAccess.insertTenant(new Tenant("tenant-1")).toBlocking().lastOrDefault(null);
ResultSet resultSet = dataAccess.insertTenant(new Tenant("tenant-1"))
dataAccess.insertTenant(new Tenant("tenant-1"), false).toBlocking().lastOrDefault(null);
ResultSet resultSet = dataAccess.insertTenant(new Tenant("tenant-1"), false)
.toBlocking()
.lastOrDefault(null);
assertFalse(resultSet.wasApplied(), "Tenants should not be overwritten");
Expand Down
Expand Up @@ -45,13 +45,8 @@ public DelegatingDataAccess(DataAccess delegate) {
}

@Override
public Observable<ResultSet> insertTenant(String tenantId) {
return delegate.insertTenant(tenantId);
}

@Override
public Observable<ResultSet> insertTenant(Tenant tenant) {
return delegate.insertTenant(tenant);
public Observable<ResultSet> insertTenant(Tenant tenant, boolean overwrite) {
return delegate.insertTenant(tenant, overwrite);
}

@Override
Expand All @@ -65,8 +60,8 @@ public Observable<Row> findTenant(String id) {
}

@Override
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric) {
return delegate.insertMetricInMetricsIndex(metric);
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean overwrite) {
return delegate.insertMetricInMetricsIndex(metric, overwrite);
}

@Override
Expand Down
Expand Up @@ -79,9 +79,9 @@ public void generateRates() {
Metric<Long> c2 = new Metric<>(new MetricId<>(tenant, COUNTER, "C2"));
Metric<Long> c3 = new Metric<>(new MetricId<>(tenant, COUNTER, "C3"));

doAction(() -> metricsService.createMetric(c1));
doAction(() -> metricsService.createMetric(c2));
doAction(() -> metricsService.createMetric(c3));
doAction(() -> metricsService.createMetric(c1, false));
doAction(() -> metricsService.createMetric(c2, false));
doAction(() -> metricsService.createMetric(c3, false));

doAction(() -> metricsService.addDataPoints(COUNTER, Observable.from(asList(
new Metric<>(c1.getMetricId(), asList(new DataPoint<>(start.getMillis(), 10L),
Expand Down

0 comments on commit a4882a4

Please sign in to comment.