Skip to content

Commit

Permalink
Merge pull request #311 from hawkular/implicit-tenants
Browse files Browse the repository at this point in the history
[HWKMETRICS-204] Implicit tenant creation
  • Loading branch information
Stefan Negrea committed Aug 19, 2015
2 parents b2e5913 + 889a99a commit b05b125
Show file tree
Hide file tree
Showing 29 changed files with 988 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ before_install:
install:
- mvn -version -B
script:
- mvn verify -Dwildfly.logging.console.level=DEBUG -Djboss-as.logging.console.level=DEBUG -B
- mvn verify -Dwildfly.logging.console.level=INFO -Djboss-as.logging.console.level=INFO -B
after_failure: bash .travis.diagnose.sh
after_success:
- PROJECT_VERSION=`mvn org.apache.maven.plugins:maven-help-plugin:2.1.1:evaluate -Dexpression=project.version | grep -v '\['`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.hawkular.metrics.api.jaxrs;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -29,7 +30,9 @@
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.WAIT_FOR_SERVICE;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -46,13 +49,20 @@
import org.hawkular.metrics.api.jaxrs.util.Eager;
import org.hawkular.metrics.api.jaxrs.util.VirtualClock;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.core.impl.CreateTenants;
import org.hawkular.metrics.core.impl.DataAccess;
import org.hawkular.metrics.core.impl.DataAccessImpl;
import org.hawkular.metrics.core.impl.DateTimeService;
import org.hawkular.metrics.core.impl.GenerateRate;
import org.hawkular.metrics.core.impl.MetricsServiceImpl;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.RepeatingTrigger;
import org.hawkular.metrics.tasks.api.AbstractTrigger;
import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.impl.Queries;
import org.hawkular.metrics.tasks.impl.TaskSchedulerImpl;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,6 +73,8 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;

import rx.Subscription;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;

Expand Down Expand Up @@ -131,6 +143,10 @@ public enum State {
private int connectionAttempts;
private Session session;

private DataAccess dataAcces;

private Map<? super Action1<Task2>, Subscription> jobs = new HashMap<>();

MetricsServiceLifecycle() {
ThreadFactory threadFactory = r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
Expand Down Expand Up @@ -198,18 +214,23 @@ private void startMetricsService() {
// will change at some point though because the task scheduling service will
// probably move to the hawkular-commons repo.
initSchema();

dataAcces = new DataAccessImpl(session);
initTaskScheduler();

metricsService = new MetricsServiceImpl();
metricsService.setDataAccess(dataAcces);
metricsService.setTaskScheduler(taskScheduler);
metricsService.setDateTimeService(createDateTimeService());

// TODO Set up a managed metric registry
// We want a managed registry that can be shared by the JAX-RS endpoint and the core. Then we can expose
// the registered metrics in various ways such as new REST endpoints, JMX, or via different
// com.codahale.metrics.Reporter instances.
metricsService.startUp(session, keyspace, false, false, new MetricRegistry());
LOG.info("Metrics service started");

initJobs();

state = State.STARTED;
} catch (Exception t) {
LOG.error("An error occurred trying to connect to the Cassandra cluster", t);
Expand Down Expand Up @@ -267,16 +288,32 @@ private void initSchema() {
private void initTaskScheduler() {
taskScheduler = new TaskSchedulerImpl(new RxSessionImpl(session), new Queries(session));
if (Boolean.valueOf(useVirtualClock.toLowerCase())) {
// We do not want to start the task scheduler when we are using the virtual
// clock. Instead we want to wait to start it until a client sets the virtual
// clock; otherwise, we will get a MissingBackpressureException.
TestScheduler scheduler = Schedulers.test();
scheduler.advanceTimeTo(System.currentTimeMillis(), MILLISECONDS);
virtualClock = new VirtualClock(scheduler);
RepeatingTrigger.now = scheduler::now;
AbstractTrigger.now = scheduler::now;
((TaskSchedulerImpl) taskScheduler).setTickScheduler(scheduler);
} else {
taskScheduler.start();

}
taskScheduler.start();
}

private void initJobs() {
GenerateRate generateRates = new GenerateRate(metricsService);
CreateTenants createTenants = new CreateTenants(metricsService, dataAcces);

jobs.put(generateRates, taskScheduler.getTasks().filter(task -> task.getName().equals(GenerateRate.TASK_NAME))
.subscribe(generateRates));
jobs.put(createTenants, taskScheduler.getTasks().filter(task -> task.getName().equals(CreateTenants.TASK_NAME))
.subscribe(createTenants));
}

private DateTimeService createDateTimeService() {
DateTimeService dateTimeService = new DateTimeService();
if (Boolean.valueOf(useVirtualClock.toLowerCase())) {
dateTimeService.now = () -> new DateTime(virtualClock.now());
}
return dateTimeService;
}

/**
Expand Down Expand Up @@ -315,6 +352,7 @@ private void stopMetricsService() {
state = State.STOPPING;
metricsService.shutdown();
taskScheduler.shutdown();
jobs.values().forEach(Subscription::unsubscribe);
if (session != null) {
try {
session.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.hawkular.metrics.api.jaxrs.handler;

import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
Expand Down Expand Up @@ -73,9 +74,6 @@ public Response getTime() {
public Response setTime(Map<String, Object> params) {
Long time = (Long) params.get("time");
virtualClock.advanceTimeTo(time);
if (!taskScheduler.isRunning()) {
taskScheduler.start();
}
return Response.ok().build();
}

Expand All @@ -96,6 +94,8 @@ public Response waitForDuration(@QueryParam("duration") Duration duration) {
.subscribe(timeSlicesSubscriber);

try {
virtualClock.advanceTimeBy(numMinutes, MINUTES);

timeSlicesSubscriber.awaitTerminalEvent(10, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertTerminalEvent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.hawkular.metrics.api.jaxrs;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -29,7 +30,9 @@
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.WAIT_FOR_SERVICE;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -46,13 +49,20 @@
import org.hawkular.metrics.api.jaxrs.util.Eager;
import org.hawkular.metrics.api.jaxrs.util.VirtualClock;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.core.impl.CreateTenants;
import org.hawkular.metrics.core.impl.DataAccess;
import org.hawkular.metrics.core.impl.DataAccessImpl;
import org.hawkular.metrics.core.impl.DateTimeService;
import org.hawkular.metrics.core.impl.GenerateRate;
import org.hawkular.metrics.core.impl.MetricsServiceImpl;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.RepeatingTrigger;
import org.hawkular.metrics.tasks.api.AbstractTrigger;
import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.impl.Queries;
import org.hawkular.metrics.tasks.impl.TaskSchedulerImpl;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,6 +73,8 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;

import rx.Subscription;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;

Expand Down Expand Up @@ -131,6 +143,10 @@ public enum State {
private int connectionAttempts;
private Session session;

private DataAccess dataAcces;

private Map<? super Action1<Task2>, Subscription> jobs = new HashMap<>();

MetricsServiceLifecycle() {
ThreadFactory threadFactory = r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
Expand Down Expand Up @@ -198,18 +214,23 @@ private void startMetricsService() {
// will change at some point though because the task scheduling service will
// probably move to the hawkular-commons repo.
initSchema();

dataAcces = new DataAccessImpl(session);
initTaskScheduler();

metricsService = new MetricsServiceImpl();
metricsService.setDataAccess(dataAcces);
metricsService.setTaskScheduler(taskScheduler);
metricsService.setDateTimeService(createDateTimeService());

// TODO Set up a managed metric registry
// We want a managed registry that can be shared by the JAX-RS endpoint and the core. Then we can expose
// the registered metrics in various ways such as new REST endpoints, JMX, or via different
// com.codahale.metrics.Reporter instances.
metricsService.startUp(session, keyspace, false, false, new MetricRegistry());
LOG.info("Metrics service started");

initJobs();

state = State.STARTED;
} catch (Exception t) {
LOG.error("An error occurred trying to connect to the Cassandra cluster", t);
Expand Down Expand Up @@ -267,16 +288,32 @@ private void initSchema() {
private void initTaskScheduler() {
taskScheduler = new TaskSchedulerImpl(new RxSessionImpl(session), new Queries(session));
if (Boolean.valueOf(useVirtualClock.toLowerCase())) {
// We do not want to start the task scheduler when we are using the virtual
// clock. Instead we want to wait to start it until a client sets the virtual
// clock; otherwise, we will get a MissingBackpressureException.
TestScheduler scheduler = Schedulers.test();
scheduler.advanceTimeTo(System.currentTimeMillis(), MILLISECONDS);
virtualClock = new VirtualClock(scheduler);
RepeatingTrigger.now = scheduler::now;
AbstractTrigger.now = scheduler::now;
((TaskSchedulerImpl) taskScheduler).setTickScheduler(scheduler);
} else {
taskScheduler.start();

}
taskScheduler.start();
}

private void initJobs() {
GenerateRate generateRates = new GenerateRate(metricsService);
CreateTenants createTenants = new CreateTenants(metricsService, dataAcces);

jobs.put(generateRates, taskScheduler.getTasks().filter(task -> task.getName().equals(GenerateRate.TASK_NAME))
.subscribe(generateRates));
jobs.put(createTenants, taskScheduler.getTasks().filter(task -> task.getName().equals(CreateTenants.TASK_NAME))
.subscribe(createTenants));
}

private DateTimeService createDateTimeService() {
DateTimeService dateTimeService = new DateTimeService();
if (Boolean.valueOf(useVirtualClock.toLowerCase())) {
dateTimeService.now = () -> new DateTime(virtualClock.now());
}
return dateTimeService;
}

/**
Expand Down Expand Up @@ -315,6 +352,7 @@ private void stopMetricsService() {
state = State.STOPPING;
metricsService.shutdown();
taskScheduler.shutdown();
jobs.values().forEach(Subscription::unsubscribe);
if (session != null) {
try {
session.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.hawkular.metrics.api.jaxrs.handler;

import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
Expand Down Expand Up @@ -73,9 +74,6 @@ public Response getTime() {
public Response setTime(Map<String, Object> params) {
Long time = (Long) params.get("time");
virtualClock.advanceTimeTo(time);
if (!taskScheduler.isRunning()) {
taskScheduler.start();
}
return Response.ok().build();
}

Expand All @@ -96,6 +94,8 @@ public Response waitForDuration(@QueryParam("duration") Duration duration) {
.subscribe(timeSlicesSubscriber);

try {
virtualClock.advanceTimeBy(numMinutes, MINUTES);

timeSlicesSubscriber.awaitTerminalEvent(10, SECONDS);
timeSlicesSubscriber.assertNoErrors();
timeSlicesSubscriber.assertTerminalEvent();
Expand Down

0 comments on commit b05b125

Please sign in to comment.