Skip to content

Commit

Permalink
[HWKMETRICS-204] perform system initialization at start up
Browse files Browse the repository at this point in the history
In MetricsServiceImpl.startUp(), we now initializae some state that is shared
across all h-metrics instances. We create an internal, system tenant and
schedule the tenant creation job.
  • Loading branch information
John Sanda committed Aug 13, 2015
1 parent f131ad3 commit 10594ab
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class CreateTenants implements Action1<Task2> {

private static final Logger logger = LoggerFactory.getLogger(CreateTenants.class);

public static final String TASK_NAME = "create-tenants";

private TenantsService tenantsService;

private DataAccess dataAccess;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class GenerateRate implements Action1<Task2> {

private static final Logger logger = LoggerFactory.getLogger(GenerateRate.class);

public static final String TASK_NAME = "generate-rates";

private MetricsService metricsService;

public GenerateRate(MetricsService metricsService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.hawkular.metrics.core.impl;

import static java.util.Collections.emptyMap;
import static java.util.Comparator.comparingLong;
import static java.util.concurrent.TimeUnit.MINUTES;

import static org.hawkular.metrics.core.api.MetricType.AVAILABILITY;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;
Expand All @@ -26,7 +28,6 @@
import static org.hawkular.metrics.core.impl.Functions.makeSafe;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -38,7 +39,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -98,6 +98,8 @@ public class MetricsServiceImpl implements MetricsService, TenantsService {
*/
public static final int DEFAULT_TTL = Duration.standardDays(7).toStandardSeconds().getSeconds();

public static final String SYSTEM_TENANT_ID = makeSafe("system");

private static class DataRetentionKey {
private final MetricId metricId;

Expand Down Expand Up @@ -195,6 +197,8 @@ public void startUp(Session session, String keyspace, boolean resetDb, boolean c
this.metricRegistry = metricRegistry;
initMetrics();

initSystemTenant();

GenerateRate generateRates = new GenerateRate(this);
taskScheduler.subscribe(generateRates);
}
Expand Down Expand Up @@ -227,6 +231,30 @@ void unloadDataRetentions() {
dataRetentions.clear();
}

private void initSystemTenant() {
CountDownLatch latch = new CountDownLatch(1);
Trigger trigger = new RepeatingTrigger.Builder().withDelay(1, MINUTES).withInterval(30, MINUTES).build();
dataAccess.insertTenant(new Tenant(SYSTEM_TENANT_ID))
.filter(ResultSet::wasApplied)
.map(row -> taskScheduler.scheduleTask(CreateTenants.TASK_NAME, SYSTEM_TENANT_ID, 100, emptyMap(),
trigger))
.subscribe(
task -> logger.debug("Scheduled {}", task),
t -> {
logger.error("Failed to initialize system tenant", t)
latch.countDown();
},
() -> {
logger.debug("Successfully initialized system tenant");
latch.countDown();
}
);
try {
latch.await();
} catch (InterruptedException e) {
}
}

private void initMetrics() {
gaugeInserts = metricRegistry.meter("gauge-inserts");
availabilityInserts = metricRegistry.meter("availability-inserts");
Expand All @@ -242,7 +270,7 @@ private static class MergeDataPointTagsFunction<T extends DataPoint> implements
@Override
public Map<MetricId, Set<T>> call(List<Map<MetricId, Set<T>>> taggedDataMaps) {
if (taggedDataMaps.isEmpty()) {
return Collections.emptyMap();
return emptyMap();
}
if (taggedDataMaps.size() == 1) {
return taggedDataMaps.get(0);
Expand Down Expand Up @@ -324,8 +352,8 @@ public Observable<Void> createTenant(final Tenant tenant) {
}

Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.withDelay(1, MINUTES)
.withInterval(1, MINUTES)
.build();
Map<String, String> params = ImmutableMap.of("tenant", tenant.getId());
Observable<Void> ratesScheduled = taskScheduler.scheduleTask("generate-rates", tenant.getId(),
Expand All @@ -338,16 +366,17 @@ public Observable<Void> createTenant(final Tenant tenant) {

return ratesScheduled.concatWith(retentionUpdates);
});
updates.subscribe(resultSet -> {}, subscriber::onError, subscriber::onCompleted);
updates.subscribe(resultSet -> {
}, subscriber::onError, subscriber::onCompleted);
});
}

@Override
public Observable<Void> createTenants(long creationTime, Observable<String> tenantIds) {
return tenantIds.flatMap(tenantId -> dataAccess.insertTenant(tenantId).flatMap(resultSet -> {
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.withDelay(1, MINUTES)
.withInterval(1, MINUTES)
.build();
Map<String, String> params = ImmutableMap.of(
"tenant", tenantId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl;

import static java.util.Collections.singletonList;

import static org.hawkular.metrics.core.impl.MetricsServiceImpl.SYSTEM_TENANT_ID;
import static org.joda.time.Duration.standardMinutes;
import static org.testng.Assert.assertEquals;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.tasks.api.AbstractTrigger;
import org.hawkular.metrics.tasks.impl.Queries;
import org.hawkular.metrics.tasks.impl.TaskSchedulerImpl;
import org.joda.time.DateTime;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;

import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;

/**
* @author jsanda
*/
public class MetricsInitializationITest extends MetricsITest {

private MetricsServiceImpl metricsService;

private TaskSchedulerImpl taskScheduler;

private DateTimeService dateTimeService;

private TestScheduler tickScheduler;

private String keyspace;

@BeforeClass
public void initClass() {
initSession();

resetDB();

dateTimeService = new DateTimeService();
DateTime startTime = dateTimeService.getTimeSlice(DateTime.now(), standardMinutes(1)).minusMinutes(20);

tickScheduler = Schedulers.test();
tickScheduler.advanceTimeTo(startTime.getMillis(), TimeUnit.MILLISECONDS);

taskScheduler = new TaskSchedulerImpl(rxSession, new Queries(session));
taskScheduler.setTickScheduler(tickScheduler);

AbstractTrigger.now = tickScheduler::now;

metricsService = new MetricsServiceImpl();
metricsService.setTaskScheduler(taskScheduler);

taskScheduler.start();

keyspace = System.getProperty("keyspace", "hawkulartest");
metricsService.startUp(session, keyspace, false, new MetricRegistry());
}

protected void resetDB() {
session.execute("TRUNCATE tenants");
session.execute("TRUNCATE leases");
session.execute("TRUNCATE task_queue");
session.execute("TRUNCATE tasks");
}

@Test
public void systemTenantShouldBeCreated() {
List<Tenant> actual = getOnNextEvents(() ->
metricsService.getTenants().filter(tenant -> tenant.getId().equals(SYSTEM_TENANT_ID)));
List<Tenant> expected = singletonList(new Tenant(SYSTEM_TENANT_ID));

assertEquals(actual, expected, "Expected to find only the system tenant");
}

@Test(dependsOnMethods = "systemTenantShouldBeCreated")
public void tenantCreationJobShouldBeCreated() {
ResultSet resultSet = session.execute("select group_key, name from tasks");
List<Row> rows = resultSet.all();

assertEquals(rows.size(), 1, "Expected to find one task");
assertEquals(rows.get(0).getString(0), SYSTEM_TENANT_ID, "The job group key does not match");
assertEquals(rows.get(0).getString(1), CreateTenants.TASK_NAME, "The job name does not match");
}

@Test(dependsOnMethods = "tenantCreationJobShouldBeCreated")
public void doSystemInitializationOnlyOnce() {
MetricsServiceImpl service = new MetricsServiceImpl();
service.setTaskScheduler(taskScheduler);
service.startUp(session, keyspace, false, new MetricRegistry());

List<Tenant> actual = getOnNextEvents(() ->
service.getTenants().filter(tenant -> tenant.getId().equals(SYSTEM_TENANT_ID)));

assertEquals(actual.size(), 1, "Expected to find only the system tenant but found " + actual);

ResultSet resultSet = session.execute("select group_key, name from tasks");
List<Row> rows = resultSet.all();

assertEquals(rows.size(), 1, "There should only be one scheduled job");
}

}

0 comments on commit 10594ab

Please sign in to comment.