Skip to content

Commit

Permalink
Merge pull request #297 from hawkular/task-scheduler
Browse files Browse the repository at this point in the history
Task scheduler refactoring
  • Loading branch information
burmanm committed Aug 7, 2015
2 parents 257d051 + bf3cceb commit 38ff28c
Show file tree
Hide file tree
Showing 37 changed files with 2,296 additions and 2,346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.hawkular.metrics.api.jaxrs;

import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -30,11 +29,11 @@

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -52,14 +51,15 @@
import org.hawkular.metrics.api.jaxrs.config.ConfigurationProperty;
import org.hawkular.metrics.api.jaxrs.util.Eager;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.core.impl.GenerateRate;
import org.hawkular.metrics.core.impl.MetricsServiceImpl;
import org.hawkular.metrics.core.impl.TaskTypes;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskServiceBuilder;
import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.metrics.tasks.impl.Lease;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/**
* Bean created on startup to manage the lifecycle of the {@link MetricsService} instance shared in application scope.
Expand All @@ -81,7 +81,7 @@ public enum State {

private MetricsServiceImpl metricsService;

private TaskService taskService;
private TaskScheduler taskScheduler;

private final ScheduledExecutorService lifecycleExecutor;

Expand Down Expand Up @@ -191,11 +191,29 @@ private void startMetricsService() {
// will change at some point though because the task scheduling service will
// probably move to the hawkular-commons repo.
initSchema();
initTaskService();

taskScheduler = new TaskScheduler() {
@Override
public Observable<Lease> start() {
LOG.warn("Task scheduling is not yet supported");
return Observable.empty();
}

@Override
public Observable<Task2> scheduleTask(String name, String groupKey, int executionOrder,
Map<String, String> parameters, Trigger trigger) {
LOG.warn("Task scheduling is not yet supported");
return Observable.empty();
}

@Override
public void shutdown() {

}
};

metricsService = new MetricsServiceImpl();
metricsService.setTaskService(taskService);
taskService.subscribe(TaskTypes.COMPUTE_RATE, new GenerateRate(metricsService));
metricsService.setTaskScheduler(taskScheduler);

// TODO Set up a managed metric registry
// We want a managed registry that can be shared by the JAX-RS endpoint and the core. Then we can expose
Expand Down Expand Up @@ -257,23 +275,6 @@ private void initSchema() {
session.execute("USE " + keyspace);
}

private void initTaskService() {
LOG.info("Initializing {}", TaskService.class.getSimpleName());
taskService = new TaskServiceBuilder()
.withSession(session)
.withTimeUnit(getTimeUnit())
.withTaskTypes(singletonList(TaskTypes.COMPUTE_RATE))
.build();
taskService.start();
}

private TimeUnit getTimeUnit() {
if ("seconds".equals(timeUnits)) {
return SECONDS;
}
return MINUTES;
}

/**
* @return a {@link MetricsService} instance to share in application scope
*/
Expand All @@ -297,7 +298,7 @@ void destroy() {
private void stopMetricsService() {
state = State.STOPPING;
metricsService.shutdown();
taskService.shutdown();
taskScheduler.shutdown();
if (session != null) {
try {
session.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
package org.hawkular.metrics.core.impl;

import static java.util.Collections.singletonList;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;
import static org.joda.time.Duration.standardMinutes;
import static org.joda.time.Duration.standardSeconds;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.tasks.api.Task;
import org.joda.time.Duration;
import org.hawkular.metrics.tasks.api.Task2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
Expand All @@ -35,7 +36,7 @@
/**
* @author jsanda
*/
public class GenerateRate implements Action1<Task> {
public class GenerateRate implements Action1<Task2> {

private static final Logger logger = LoggerFactory.getLogger(GenerateRate.class);

Expand All @@ -46,32 +47,38 @@ public GenerateRate(MetricsService metricsService) {
}

@Override
public void call(Task task) {
public void call(Task2 task) {
logger.info("Generating rate for {}", task);
MetricId id = new MetricId(task.getTenantId(), COUNTER_RATE, task.getSources().iterator().next());
long start = task.getTimeSlice().getMillis();
long end = task.getTimeSlice().plus(getDuration(task.getWindow())).getMillis();
MetricId id = new MetricId(task.getParameters().get("tenantId"), COUNTER, task.getParameters().get("metricId"));
long start = task.getTrigger().getTriggerTime();
long end = start + TimeUnit.MINUTES.toMillis(1);

CountDownLatch latch = new CountDownLatch(1);

logger.debug("start = {}, end = {}", start, end);
metricsService.findCounterData(id, start, end)
.take(1)
.map(dataPoint -> dataPoint.getValue().doubleValue() / (end - start) * 1000)
.map(rate -> new Metric<>(id, singletonList(new DataPoint<>(start, rate))))
.doOnNext(dataPoint -> logger.debug("Data Point = {}", dataPoint))
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(new MetricId(id.getTenantId(), COUNTER_RATE, id.getName()),
singletonList(new DataPoint<>(start, rate))))
.flatMap(metric -> metricsService.addGaugeData(Observable.just(metric)))
.subscribe(
aVoid -> {
},
t -> logger.warn("Failed to persist rate data", t),
() -> logger.debug("Successfully persisted rate data for {}", task)
t -> {
logger.warn("Failed to persist rate data", t);
latch.countDown();
},
() -> {
logger.debug("Successfully persisted rate data");
latch.countDown();
}
);
}

private Duration getDuration(int duration) {
// This is somewhat of a temporary hack until HWKMETRICS-142 is done. The time units
// for tasks are currently scheduled globally with the TaskServiceBuilder class. The
// system property below is the only hook we have right now for specifying and
// checking the time units used.
if ("seconds".equals(System.getProperty("hawkular.scheduler.time-units", "minutes"))) {
return standardSeconds(duration);
try {
latch.await();
} catch (InterruptedException e) {
}
return standardMinutes(duration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
package org.hawkular.metrics.core.impl;

import static java.util.Comparator.comparingLong;

import static org.hawkular.metrics.core.api.MetricType.AVAILABILITY;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;
import static org.hawkular.metrics.core.api.MetricType.GAUGE;
import static org.hawkular.metrics.core.impl.Functions.getTTLAvailabilityDataPoint;
import static org.hawkular.metrics.core.impl.Functions.getTTLGaugeDataPoint;
import static org.joda.time.DateTime.now;
import static org.joda.time.Hours.hours;

import java.util.ArrayList;
Expand All @@ -40,9 +38,23 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import java.util.regex.Pattern;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.hawkular.metrics.core.api.AvailabilityBucketDataPoint;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.BucketedOutput;
Expand All @@ -63,27 +75,14 @@
import org.hawkular.metrics.core.impl.transformers.ItemsToSetTransformer;
import org.hawkular.metrics.core.impl.transformers.TagsIndexRowTransformer;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.RepeatingTrigger;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.rx.cassandra.driver.RxUtil;
import org.joda.time.Duration;
import org.joda.time.Hours;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
Expand Down Expand Up @@ -140,7 +139,7 @@ public int hashCode() {

private DataAccess dataAccess;

private TaskService taskService;
private TaskScheduler taskScheduler;

private MetricRegistry metricRegistry;

Expand Down Expand Up @@ -310,8 +309,8 @@ void setDataAccess(DataAccess dataAccess) {
this.dataAccess = dataAccess;
}

public void setTaskService(TaskService taskService) {
this.taskService = taskService;
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}

@Override
Expand Down Expand Up @@ -425,9 +424,15 @@ public Observable<Void> createMetric(Metric<?> metric) {
}

if (metric.getType() == COUNTER) {
Task task = TaskTypes.COMPUTE_RATE.createTask(metric.getTenantId(), metric.getId().getName() +
"$rate", metric.getId().getName());
taskService.scheduleTask(now(), task);
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.build();
Map<String, String> params = ImmutableMap.of(
"metricId", metric.getId().getName(),
"tenantId", metric.getTenantId()
);
taskScheduler.scheduleTask("compute-rate", metric.getTenantId(), 1000, params, trigger);
}

Observable.merge(updates).subscribe(new VoidSubscriber<>(subscriber));
Expand Down Expand Up @@ -843,4 +848,5 @@ private <T> T time(Timer timer, Callable<T> callable) {
throw new RuntimeException("There was an error during a timed event", e);
}
}

}

This file was deleted.

0 comments on commit 38ff28c

Please sign in to comment.