Skip to content

Commit

Permalink
[HWKMETRICS-204] initial commit for task to create tenants
Browse files Browse the repository at this point in the history
We query the metrics_idx table to determine whether or not a metric exists.
Metrics are implicitly created when inserting data points because we write to
both the data and the metrics_idx tables. We need to implicitly create tenants
in order to set up various background jobs like generating rates and deleting
data (see HWKMETRICS-191 for details).
  • Loading branch information
John Sanda committed Aug 13, 2015
1 parent 807e094 commit 1044a12
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface MetricsService {
*/
Observable<Void> createTenant(Tenant tenant);

Observable<Void> createTenants(Observable<Tenant> tenants);

Observable<Tenant> getTenants();

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl;

import java.util.concurrent.CountDownLatch;

import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.tasks.api.Task2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.functions.Action1;

/**
* @author jsanda
*/
public class CreateTenants implements Action1<Task2> {

private static final Logger logger = LoggerFactory.getLogger(CreateTenants.class);

private MetricsService metricsService;

private DataAccess dataAccess;

public CreateTenants(MetricsService metricsService, DataAccess dataAccess) {
this.metricsService = metricsService;
this.dataAccess = dataAccess;
}

@Override public void call(Task2 task) {
long bucket = task.getTrigger().getTriggerTime();
CountDownLatch latch = new CountDownLatch(1);

Observable<Tenant> tenants = dataAccess.findTenantIds(bucket)
.flatMap(Observable::from)
.map(row -> row.getString(0))
.flatMap(tenantId -> getTenant(tenantId).map(tenant -> tenant == null ? tenantId : tenant.getId()))
.filter(tenantId -> tenantId != null)
.map(Tenant::new);

metricsService.createTenants(tenants).subscribe(
aVoid -> {},
t -> {
logger.warn("Tenant creation failed", t);
latch.countDown();
},
() -> {
dataAccess.deleteTenantsBucket(bucket).subscribe(
resultSet -> {},
t -> {
logger.warn("Failed to delete tenants bucket [" + bucket + "]", t);
latch.countDown();
},
latch::countDown
);
}
);
try {
latch.await();
} catch (InterruptedException e) {
}
}

private Observable<Tenant> getTenant(String tenantId) {
return dataAccess.findTenant(tenantId)
.flatMap(Observable::from)
.map(Functions::getTenant);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import java.util.Map;
import java.util.Set;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.Tenant;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;

import rx.Observable;

/**
Expand All @@ -39,8 +41,12 @@ public interface DataAccess {

Observable<ResultSet> findAllTenantIds();

Observable<ResultSet> findTenantIds(long time);

Observable<ResultSet> findTenant(String id);

Observable<ResultSet> deleteTenantsBucket(long time);

ResultSetFuture insertMetricInMetricsIndex(Metric metric);

Observable<ResultSet> findMetric(MetricId id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.datastax.driver.core.BatchStatement.Type.UNLOGGED;

import java.nio.ByteBuffer;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -65,8 +66,12 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement findAllTenantIds;

private PreparedStatement findTenantIdsByTime;

private PreparedStatement findTenant;

private PreparedStatement deleteTenantsBucket;

private PreparedStatement insertIntoMetricsIndex;

private PreparedStatement findMetric;
Expand Down Expand Up @@ -149,7 +154,11 @@ protected void initPreparedStatements() {

findAllTenantIds = session.prepare("SELECT DISTINCT id FROM tenants");

findTenant = session.prepare("SELECT id, retentions FROM tenants WHERE id = ?");
findTenantIdsByTime = session.prepare("SELECT tenant FROM tenants_by_time WHERE bucket = ?");

findTenant = session.prepare("SELECT id, retentions FROM tenants WHERE id = ?");

deleteTenantsBucket = session.prepare("DELETE FROM tenants_by_time WHERE bucket = ?");

findMetric = session.prepare(
"SELECT metric, interval, tags, data_retention " +
Expand Down Expand Up @@ -338,11 +347,19 @@ public Observable<ResultSet> findAllTenantIds() {
return rxSession.execute(findAllTenantIds.bind());
}

@Override public Observable<ResultSet> findTenantIds(long time) {
return rxSession.execute(findTenantIdsByTime.bind(new Date(time)));
}

@Override
public Observable<ResultSet> findTenant(String id) {
return rxSession.execute(findTenant.bind(id));
}

@Override public Observable<ResultSet> deleteTenantsBucket(long time) {
return rxSession.execute(deleteTenantsBucket.bind(new Date(time)));
}

@Override
public ResultSetFuture insertMetricInMetricsIndex(Metric metric) {
return session.executeAsync(insertIntoMetricsIndex.bind(metric.getTenantId(), metric.getType().getCode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,32 @@ public Observable<Void> createTenant(final Tenant tenant) {

return ratesScheduled.concatWith(retentionUpdates);
});
updates.subscribe(resultSet -> {}, subscriber::onError, subscriber::onCompleted);
updates.subscribe(resultSet -> {
}, subscriber::onError, subscriber::onCompleted);
});
}

@Override public Observable<Void> createTenants(Observable<Tenant> tenants) {
return tenants.flatMap(tenant -> dataAccess.insertTenant(tenant).flatMap(resultSet -> {
if (!resultSet.wasApplied()) {
throw new TenantAlreadyExistsException(tenant.getId());
}
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.build();
Map<String, String> params = ImmutableMap.of("tenant", tenant.getId());
Observable<Void> ratesScheduled = taskScheduler.scheduleTask("generate-rates", tenant.getId(),
100, params, trigger).map(task -> null);
Observable<Void> retentionUpdates = Observable.from(tenant.getRetentionSettings().entrySet())
.flatMap(entry -> dataAccess.updateRetentionsIndex(tenant.getId(), entry.getKey(),
ImmutableMap.of(makeSafe(entry.getKey().getText()), entry.getValue())))
.map(rs -> null);

return ratesScheduled.concatWith(retentionUpdates);
}));
}

@Override
public Observable<Tenant> getTenants() {
return dataAccess.findAllTenantIds()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl;

import static java.util.Collections.emptyMap;
import static java.util.UUID.randomUUID;

import static org.joda.time.Duration.standardMinutes;
import static org.testng.Assert.assertEquals;

import java.util.List;

import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.tasks.api.SingleExecutionTrigger;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.metrics.tasks.impl.Task2Impl;
import org.joda.time.DateTime;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;

/**
* @author jsanda
*/
public class CreateTenantsITest extends MetricsITest {

private MetricsServiceImpl metricsService;

private DateTimeService dateTimeService;

@BeforeClass
public void initClass() {
initSession();

dateTimeService = new DateTimeService();

metricsService = new MetricsServiceImpl();
metricsService.setTaskScheduler(new FakeTaskScheduler());

metricsService.startUp(session, getKeyspace(), false, new MetricRegistry());
}

@BeforeMethod
public void initMethod() {
session.execute("TRUNCATE tenants");
session.execute("TRUNCATE tenants_by_time");
}

@Test
public void executeWhenThereAreNoTenantsForBucket() {
DateTime bucket = dateTimeService.getTimeSlice(DateTime.now(), standardMinutes(1));

Tenant t1 = new Tenant("T1");
doAction(() -> metricsService.createTenant(t1));

Trigger trigger = new SingleExecutionTrigger(bucket.getMillis());
Task2Impl taskDetails = new Task2Impl(randomUUID(), "$system", 0, "create-tenants", emptyMap(), trigger);

CreateTenants task = new CreateTenants(metricsService, metricsService.getDataAccess());
task.call(taskDetails);

List<Tenant> actual = getOnNextEvents(metricsService::getTenants);
List<Tenant> expected = ImmutableList.of(t1);

assertEquals(actual, expected, "No new tenants should have been created.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import java.util.Map;
import java.util.Set;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.Tenant;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;

import rx.Observable;

/**
Expand Down Expand Up @@ -57,6 +59,14 @@ public Observable<ResultSet> findTenant(String id) {
return delegate.findTenant(id);
}

@Override public Observable<ResultSet> findTenantIds(long time) {
return delegate.findTenantIds(time);
}

@Override public Observable<ResultSet> deleteTenantsBucket(long time) {
return delegate.deleteTenantsBucket(time);
}

@Override
public ResultSetFuture insertMetricInMetricsIndex(Metric metric) {
return delegate.insertMetricInMetricsIndex(metric);
Expand Down
10 changes: 9 additions & 1 deletion schema-manager/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ CREATE TYPE ${keyspace}.aggregation_template (
CREATE TABLE ${keyspace}.tenants (
id text PRIMARY KEY,
retentions map<text, int>
);
) WITH compaction = { 'class': 'LeveledCompactionStrategy' };

-- #

CREATE TABLE ${keyspace}.tenants_by_time (
bucket timestamp,
tenant text,
PRIMARY KEY (bucket)
) WITH compaction = { 'class': 'LeveledCompactionStrategy' };

-- #

Expand Down

0 comments on commit 1044a12

Please sign in to comment.