Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HWKMETRICS-148] #493

Merged
merged 5 commits into from
Apr 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public class AvailabilityHandler {
public void createAvailabilityMetric(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<AvailabilityType> metric,
@ApiParam(value = "Overwrite previously created metric configuration if it exists. "
+ "Only data retention and tags are overwriten; existing data points are unnafected. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() != null
Expand All @@ -116,7 +119,7 @@ public void createAvailabilityMetric(
metric = new Metric<>(
new MetricId<>(tenantId, AVAILABILITY, metric.getMetricId().getName()), metric.getTags(),
metric.getDataRetention());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ public class CounterHandler {
public void createCounter(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<Long> metric,
@ApiParam(value = "Overwrite previously created metric configuration if it exists. "
+ "Only data retention and tags are overwriten; existing data points are unnafected. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() != null
Expand All @@ -121,7 +124,7 @@ public void createCounter(
metric = new Metric<>(new MetricId<>(tenantId, COUNTER, metric.getId()),
metric.getTags(), metric.getDataRetention());
URI location = uriInfo.getBaseUriBuilder().path("/counters/{id}").build(metric.getMetricId().getName());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public class GaugeHandler {
public void createGaugeMetric(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<Double> metric,
@ApiParam(value = "Overwrite previously created metric configuration if it exists. "
+ "Only data retention and tags are overwriten; existing data points are unnafected. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() != null
Expand All @@ -117,7 +120,7 @@ public void createGaugeMetric(
metric = new Metric<>(new MetricId<>(tenantId, GAUGE, metric.getId()), metric.getTags(),
metric.getDataRetention());
URI location = uriInfo.getBaseUriBuilder().path("/gauges/{id}").build(metric.getMetricId().getName());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -100,6 +101,8 @@ public class MetricHandler {
public <T> void createMetric(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(required = true) Metric<T> metric,
@ApiParam(value = "Overwrite previously created metric if it exists. Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
if (metric.getType() == null || !metric.getType().isUserType()) {
Expand All @@ -109,7 +112,7 @@ public <T> void createMetric(
metric = new Metric<>(id, metric.getTags(), metric.getDataRetention());
URI location = uriInfo.getBaseUriBuilder().path("/{type}/{id}").build(MetricTypeTextConverter.getLongForm(id
.getType()), id.getName());
metricsService.createMetric(metric).subscribe(new MetricCreatedObserver(asyncResponse, location));
metricsService.createMetric(metric, overwrite).subscribe(new MetricCreatedObserver(asyncResponse, location));
}

@GET
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -25,10 +25,12 @@

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
Expand Down Expand Up @@ -73,10 +75,14 @@ public class TenantsHandler {
public void createTenant(
@Suspended AsyncResponse asyncResponse,
@ApiParam(required = true) TenantDefinition tenantDefinition,
@ApiParam(value = "Overwrite previously created tenant configuration if it exists. "
+ "Only data retention settings are overwriten; existing metrics and data points are unnafected. "
+ "Defaults to false.",
required = false) @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@Context UriInfo uriInfo
) {
URI location = uriInfo.getBaseUriBuilder().path("/tenants").build();
metricsService.createTenant(tenantDefinition.toTenant())
metricsService.createTenant(tenantDefinition.toTenant(), overwrite)
.subscribe(new TenantCreatedObserver(asyncResponse, location));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@
* @author John Sanda
*/
public interface DataAccess {
Observable<ResultSet> insertTenant(String tenantId);

Observable<ResultSet> insertTenant(Tenant tenant);
Observable<ResultSet> insertTenant(Tenant tenant, boolean overwrite);

Observable<Row> findAllTenantIds();

Observable<Row> findTenant(String id);

<T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric);
<T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean overwrite);

<T> Observable<Row> findMetric(MetricId<T> id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement insertTenant;

private PreparedStatement insertTenantId;
private PreparedStatement insertTenantOverwrite;

private PreparedStatement findAllTenantIds;

Expand All @@ -73,6 +73,8 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement insertIntoMetricsIndex;

private PreparedStatement insertIntoMetricsIndexOverwrite;

private PreparedStatement findMetric;

private PreparedStatement getMetricTags;
Expand Down Expand Up @@ -146,11 +148,12 @@ public DataAccessImpl(Session session) {
}

protected void initPreparedStatements() {
insertTenantId = session.prepare("INSERT INTO tenants (id) VALUES (?)");

insertTenant = session.prepare(
"INSERT INTO tenants (id, retentions) VALUES (?, ?) IF NOT EXISTS");

insertTenantOverwrite = session.prepare(
"INSERT INTO tenants (id, retentions) VALUES (?, ?)");

findAllTenantIds = session.prepare("SELECT DISTINCT id FROM tenants");

findAllTenantIdsFromMetricsIdx = session.prepare("SELECT DISTINCT tenant_id, type FROM metrics_idx");
Expand Down Expand Up @@ -180,6 +183,10 @@ protected void initPreparedStatements() {
"VALUES (?, ?, ?, ?, ?) " +
"IF NOT EXISTS");

insertIntoMetricsIndexOverwrite = session.prepare(
"INSERT INTO metrics_idx (tenant_id, type, metric, data_retention, tags) " +
"VALUES (?, ?, ?, ?, ?) ");

updateMetricsIndex = session.prepare(
"INSERT INTO metrics_idx (tenant_id, type, metric) VALUES (?, ?, ?)");

Expand Down Expand Up @@ -332,14 +339,15 @@ protected void initPreparedStatements() {
"WHERE tenant_id = ? AND tname = ? AND tvalue = ?");
}

@Override public Observable<ResultSet> insertTenant(String tenantId) {
return rxSession.execute(insertTenantId.bind(tenantId));
}

@Override
public Observable<ResultSet> insertTenant(Tenant tenant) {
public Observable<ResultSet> insertTenant(Tenant tenant, boolean overwrite) {
Map<String, Integer> retentions = tenant.getRetentionSettings().entrySet().stream()
.collect(toMap(entry -> entry.getKey().getText(), Map.Entry::getValue));

if (overwrite) {
return rxSession.execute(insertTenantOverwrite.bind(tenant.getId(), retentions));
}

return rxSession.execute(insertTenant.bind(tenant.getId(), retentions));
}

Expand All @@ -355,8 +363,15 @@ public Observable<Row> findTenant(String id) {
}

@Override
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric) {
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean overwrite) {
MetricId<T> metricId = metric.getMetricId();

if (overwrite) {
return session.executeAsync(
insertIntoMetricsIndexOverwrite.bind(metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName(), metric.getDataRetention(), metric.getTags()));
}

return session.executeAsync(insertIntoMetricsIndex.bind(metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName(), metric.getDataRetention(), metric.getTags()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ public interface MetricsService {
*
* @param tenant
* The {@link Tenant tenant} to create
* @param overwrite Flag to force overwrite previous tenant definition if it exists
* @return void
* @throws org.hawkular.metrics.core.api.exception.TenantAlreadyExistsException
* tenant already exists
*/
Observable<Void> createTenant(Tenant tenant);
Observable<Void> createTenant(Tenant tenant, boolean overwrite);

Observable<Tenant> getTenants();

Expand All @@ -84,13 +85,14 @@ public interface MetricsService {
* </p>
*
* @param metric The metric to create
* @param overwrite Flag to force overwrite previous metric definition if it exists
*
* @return This method only has side effects and does not return any data. As such,
* {@link rx.Observer#onNext(Object) onNext} is not called. {@link rx.Observer#onCompleted()} onCompleted}
* is called when the operation completes successfully, and {@link rx.Observer#onError(Throwable)} onError}
* is called when it fails.
*/
Observable<Void> createMetric(Metric<?> metric);
Observable<Void> createMetric(Metric<?> metric, boolean overwrite);

<T> Observable<Metric<T>> findMetric(MetricId<T> id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,9 @@ public void setDefaultTTL(int defaultTTL) {
}

@Override
public Observable<Void> createTenant(final Tenant tenant) {
public Observable<Void> createTenant(final Tenant tenant, boolean overwrite) {
return Observable.create(subscriber -> {
Observable<Void> updates = dataAccess.insertTenant(tenant).flatMap(resultSet -> {
Observable<Void> updates = dataAccess.insertTenant(tenant, overwrite).flatMap(resultSet -> {
if (!resultSet.wasApplied()) {
throw new TenantAlreadyExistsException(tenant.getId());
}
Expand Down Expand Up @@ -382,19 +382,18 @@ private List<String> loadTenantIds() {
}

@Override
public Observable<Void> createMetric(Metric<?> metric) {
public Observable<Void> createMetric(Metric<?> metric, boolean overwrite) {
MetricType<?> metricType = metric.getMetricId().getType();
if (!metricType.isUserType()) {
throw new IllegalArgumentException(metric + " cannot be created. " + metricType + " metrics are " +
"internally generated metrics and cannot be created by clients.");
}

ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric);
ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric, overwrite);
Observable<ResultSet> indexUpdated = ListenableFutureObservable.from(future, metricsTasks);
return Observable.create(subscriber -> indexUpdated.subscribe(resultSet -> {
if (!resultSet.wasApplied()) {
if (!overwrite && !resultSet.wasApplied()) {
subscriber.onError(new MetricAlreadyExistsException(metric));

} else {
// TODO Need error handling if either of the following updates fail
// If adding tags/retention fails, then we want to report the error to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public void insertAndFindTenant() throws Exception {
Tenant tenant2 = new Tenant("tenant-2", ImmutableMap.of(GAUGE, 14));


dataAccess.insertTenant(tenant1).toBlocking().lastOrDefault(null);
dataAccess.insertTenant(tenant1, false).toBlocking().lastOrDefault(null);

dataAccess.insertTenant(tenant2).toBlocking().lastOrDefault(null);
dataAccess.insertTenant(tenant2, false).toBlocking().lastOrDefault(null);

Tenant actual = dataAccess.findTenant(tenant1.getId())
.map(Functions::getTenant)
Expand All @@ -90,8 +90,8 @@ public void insertAndFindTenant() throws Exception {

@Test
public void doNotAllowDuplicateTenants() throws Exception {
dataAccess.insertTenant(new Tenant("tenant-1")).toBlocking().lastOrDefault(null);
ResultSet resultSet = dataAccess.insertTenant(new Tenant("tenant-1"))
dataAccess.insertTenant(new Tenant("tenant-1"), false).toBlocking().lastOrDefault(null);
ResultSet resultSet = dataAccess.insertTenant(new Tenant("tenant-1"), false)
.toBlocking()
.lastOrDefault(null);
assertFalse(resultSet.wasApplied(), "Tenants should not be overwritten");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,8 @@ public DelegatingDataAccess(DataAccess delegate) {
}

@Override
public Observable<ResultSet> insertTenant(String tenantId) {
return delegate.insertTenant(tenantId);
}

@Override
public Observable<ResultSet> insertTenant(Tenant tenant) {
return delegate.insertTenant(tenant);
public Observable<ResultSet> insertTenant(Tenant tenant, boolean overwrite) {
return delegate.insertTenant(tenant, overwrite);
}

@Override
Expand All @@ -65,8 +60,8 @@ public Observable<Row> findTenant(String id) {
}

@Override
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric) {
return delegate.insertMetricInMetricsIndex(metric);
public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean overwrite) {
return delegate.insertMetricInMetricsIndex(metric, overwrite);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public void generateRates() {
Metric<Long> c2 = new Metric<>(new MetricId<>(tenant, COUNTER, "C2"));
Metric<Long> c3 = new Metric<>(new MetricId<>(tenant, COUNTER, "C3"));

doAction(() -> metricsService.createMetric(c1));
doAction(() -> metricsService.createMetric(c2));
doAction(() -> metricsService.createMetric(c3));
doAction(() -> metricsService.createMetric(c1, false));
doAction(() -> metricsService.createMetric(c2, false));
doAction(() -> metricsService.createMetric(c3, false));

doAction(() -> metricsService.addDataPoints(COUNTER, Observable.from(asList(
new Metric<>(c1.getMetricId(), asList(new DataPoint<>(start.getMillis(), 10L),
Expand Down