Skip to content

Commit

Permalink
[HWKMETRICS-168] initial commit for new scheduler and task classes
Browse files Browse the repository at this point in the history
[HWKMETRICS-168] persist task to queue, update trigger api, and clean up tests

[HWKMETRICS-168] add method/test for scheduling task and update schema

Pretty much all of the code prior to this ticket is commented out because it is
basically being rewritten. Tests will be added back though.

[HWKMETRICS-168] big refactoring to get concurrency right.

The scheduler deals with 4 different thread pools - one for emitting ticks, one
for processing leases, one for executing tasks, and the C* driver's I/O
threads. Making sure things execute on the right thread pool turned out to be
a challenge when chaning various Rx operators together. It is not obvious like
when you explicitly submit some task to a thread pool.

[HWKMETRICS-168] handle exceptions thrown by tasks

[HWKMETRICS-168] add support for setting number of executions in repeating trigger

This is generally useful feature, but I added it right now to help with
integration tests.

[HWKMETRICS-168] adding group_key and exec_order columns to task_queue table

These columns help if/when tasks have interdependencies. All tasks having the
same group key will be stored in the same queue, which means that they will be
associated with the same lease. The exec_order column defines an execution
order for tasks within the same group. Tasks with a lower number are executed
first.

The commit also adds/updates TaskSchedulerTest to use RxJava's TestScheduler.
It took me a good bit of time over the weekend to understand how to set things
up, but it was well worth the effort. Tests will be much more reliable,
consistent, and faster as they use a virtual clock.

[HWKMETRICS-168] emit event when scheduler is done with time slice

The getAvailableLeases method now publishes the timestamp when all work is
done. This is basically a test hook so that we can make tests consistent and
repeatable.

[HWKMETRICS-168] more tests

[HWKMETRICS-168] get RatesITest passing again

[HWKMETRICS-168] removing obsolete classes

[HWKMETRICS-168] fix post merge errors and disable integration test

There were some changes need as a result of HWKMETRICS-114 which made the
tenant id part of MetricId. The REST integration test for fetching counters is
disabled for now. Rates are calculated every minute (as of right now). We
do not want to have a test literally spin for several minutes waiting for rates
to be generated. We need a way to use RxJava's TestScheduler for REST
integration tests. I am going to createa spearate ticket for this because I
think it will involve a bit of work.

[HWKMETRICS-168] remove duplicate code in trigger classes and bump RxJava version

[HWKMETRICS-168] add test to verify execution order and fix query

I am adding a test that simulates a long running task. Every task is supposed
execute every minute. One of the tasks takes 3 minutes to complete. Tasks are
split into two groups. The test verifies that tests are executed in the
expected order despite the long delay.

This commit also fixes a bug in the query to find available leases. The filter
function needs to also check that the owern is not set in addition to the lease
not being finished.

fix imports after rebase
  • Loading branch information
John Sanda committed Aug 7, 2015
1 parent 257d051 commit bf3cceb
Show file tree
Hide file tree
Showing 37 changed files with 2,296 additions and 2,346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.hawkular.metrics.api.jaxrs;

import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -30,11 +29,11 @@

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -52,14 +51,15 @@
import org.hawkular.metrics.api.jaxrs.config.ConfigurationProperty;
import org.hawkular.metrics.api.jaxrs.util.Eager;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.core.impl.GenerateRate;
import org.hawkular.metrics.core.impl.MetricsServiceImpl;
import org.hawkular.metrics.core.impl.TaskTypes;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskServiceBuilder;
import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.metrics.tasks.impl.Lease;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/**
* Bean created on startup to manage the lifecycle of the {@link MetricsService} instance shared in application scope.
Expand All @@ -81,7 +81,7 @@ public enum State {

private MetricsServiceImpl metricsService;

private TaskService taskService;
private TaskScheduler taskScheduler;

private final ScheduledExecutorService lifecycleExecutor;

Expand Down Expand Up @@ -191,11 +191,29 @@ private void startMetricsService() {
// will change at some point though because the task scheduling service will
// probably move to the hawkular-commons repo.
initSchema();
initTaskService();

taskScheduler = new TaskScheduler() {
@Override
public Observable<Lease> start() {
LOG.warn("Task scheduling is not yet supported");
return Observable.empty();
}

@Override
public Observable<Task2> scheduleTask(String name, String groupKey, int executionOrder,
Map<String, String> parameters, Trigger trigger) {
LOG.warn("Task scheduling is not yet supported");
return Observable.empty();
}

@Override
public void shutdown() {

}
};

metricsService = new MetricsServiceImpl();
metricsService.setTaskService(taskService);
taskService.subscribe(TaskTypes.COMPUTE_RATE, new GenerateRate(metricsService));
metricsService.setTaskScheduler(taskScheduler);

// TODO Set up a managed metric registry
// We want a managed registry that can be shared by the JAX-RS endpoint and the core. Then we can expose
Expand Down Expand Up @@ -257,23 +275,6 @@ private void initSchema() {
session.execute("USE " + keyspace);
}

private void initTaskService() {
LOG.info("Initializing {}", TaskService.class.getSimpleName());
taskService = new TaskServiceBuilder()
.withSession(session)
.withTimeUnit(getTimeUnit())
.withTaskTypes(singletonList(TaskTypes.COMPUTE_RATE))
.build();
taskService.start();
}

private TimeUnit getTimeUnit() {
if ("seconds".equals(timeUnits)) {
return SECONDS;
}
return MINUTES;
}

/**
* @return a {@link MetricsService} instance to share in application scope
*/
Expand All @@ -297,7 +298,7 @@ void destroy() {
private void stopMetricsService() {
state = State.STOPPING;
metricsService.shutdown();
taskService.shutdown();
taskScheduler.shutdown();
if (session != null) {
try {
session.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
package org.hawkular.metrics.core.impl;

import static java.util.Collections.singletonList;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;
import static org.joda.time.Duration.standardMinutes;
import static org.joda.time.Duration.standardSeconds;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.tasks.api.Task;
import org.joda.time.Duration;
import org.hawkular.metrics.tasks.api.Task2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
Expand All @@ -35,7 +36,7 @@
/**
* @author jsanda
*/
public class GenerateRate implements Action1<Task> {
public class GenerateRate implements Action1<Task2> {

private static final Logger logger = LoggerFactory.getLogger(GenerateRate.class);

Expand All @@ -46,32 +47,38 @@ public GenerateRate(MetricsService metricsService) {
}

@Override
public void call(Task task) {
public void call(Task2 task) {
logger.info("Generating rate for {}", task);
MetricId id = new MetricId(task.getTenantId(), COUNTER_RATE, task.getSources().iterator().next());
long start = task.getTimeSlice().getMillis();
long end = task.getTimeSlice().plus(getDuration(task.getWindow())).getMillis();
MetricId id = new MetricId(task.getParameters().get("tenantId"), COUNTER, task.getParameters().get("metricId"));
long start = task.getTrigger().getTriggerTime();
long end = start + TimeUnit.MINUTES.toMillis(1);

CountDownLatch latch = new CountDownLatch(1);

logger.debug("start = {}, end = {}", start, end);
metricsService.findCounterData(id, start, end)
.take(1)
.map(dataPoint -> dataPoint.getValue().doubleValue() / (end - start) * 1000)
.map(rate -> new Metric<>(id, singletonList(new DataPoint<>(start, rate))))
.doOnNext(dataPoint -> logger.debug("Data Point = {}", dataPoint))
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(new MetricId(id.getTenantId(), COUNTER_RATE, id.getName()),
singletonList(new DataPoint<>(start, rate))))
.flatMap(metric -> metricsService.addGaugeData(Observable.just(metric)))
.subscribe(
aVoid -> {
},
t -> logger.warn("Failed to persist rate data", t),
() -> logger.debug("Successfully persisted rate data for {}", task)
t -> {
logger.warn("Failed to persist rate data", t);
latch.countDown();
},
() -> {
logger.debug("Successfully persisted rate data");
latch.countDown();
}
);
}

private Duration getDuration(int duration) {
// This is somewhat of a temporary hack until HWKMETRICS-142 is done. The time units
// for tasks are currently scheduled globally with the TaskServiceBuilder class. The
// system property below is the only hook we have right now for specifying and
// checking the time units used.
if ("seconds".equals(System.getProperty("hawkular.scheduler.time-units", "minutes"))) {
return standardSeconds(duration);
try {
latch.await();
} catch (InterruptedException e) {
}
return standardMinutes(duration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
package org.hawkular.metrics.core.impl;

import static java.util.Comparator.comparingLong;

import static org.hawkular.metrics.core.api.MetricType.AVAILABILITY;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;
import static org.hawkular.metrics.core.api.MetricType.GAUGE;
import static org.hawkular.metrics.core.impl.Functions.getTTLAvailabilityDataPoint;
import static org.hawkular.metrics.core.impl.Functions.getTTLGaugeDataPoint;
import static org.joda.time.DateTime.now;
import static org.joda.time.Hours.hours;

import java.util.ArrayList;
Expand All @@ -40,9 +38,23 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import java.util.regex.Pattern;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.hawkular.metrics.core.api.AvailabilityBucketDataPoint;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.BucketedOutput;
Expand All @@ -63,27 +75,14 @@
import org.hawkular.metrics.core.impl.transformers.ItemsToSetTransformer;
import org.hawkular.metrics.core.impl.transformers.TagsIndexRowTransformer;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.RepeatingTrigger;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.rx.cassandra.driver.RxUtil;
import org.joda.time.Duration;
import org.joda.time.Hours;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
Expand Down Expand Up @@ -140,7 +139,7 @@ public int hashCode() {

private DataAccess dataAccess;

private TaskService taskService;
private TaskScheduler taskScheduler;

private MetricRegistry metricRegistry;

Expand Down Expand Up @@ -310,8 +309,8 @@ void setDataAccess(DataAccess dataAccess) {
this.dataAccess = dataAccess;
}

public void setTaskService(TaskService taskService) {
this.taskService = taskService;
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}

@Override
Expand Down Expand Up @@ -425,9 +424,15 @@ public Observable<Void> createMetric(Metric<?> metric) {
}

if (metric.getType() == COUNTER) {
Task task = TaskTypes.COMPUTE_RATE.createTask(metric.getTenantId(), metric.getId().getName() +
"$rate", metric.getId().getName());
taskService.scheduleTask(now(), task);
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.build();
Map<String, String> params = ImmutableMap.of(
"metricId", metric.getId().getName(),
"tenantId", metric.getTenantId()
);
taskScheduler.scheduleTask("compute-rate", metric.getTenantId(), 1000, params, trigger);
}

Observable.merge(updates).subscribe(new VoidSubscriber<>(subscriber));
Expand Down Expand Up @@ -843,4 +848,5 @@ private <T> T time(Timer timer, Callable<T> callable) {
throw new RuntimeException("There was an error during a timed event", e);
}
}

}

This file was deleted.

0 comments on commit bf3cceb

Please sign in to comment.