Skip to content

Commit

Permalink
[HWKMETRICS-130] refactor task execution
Browse files Browse the repository at this point in the history
Prior to this commit the TaskType class provided a factory for producing an
object to exectue tasks. There are better ways to do this with RxJava. Now
clients register a subscriber with TaskService, and that subscriber is notified
of tasks to execute.

The code for computing rates has been pulled out of MetricsServiceImpl and put
into a new class, GenerateRate. This decouples MetricsServiceInmpl from the
computation of rates which will make it a lot easier to execute these various
components in isolation.
  • Loading branch information
John Sanda committed Jun 19, 2015
1 parent 89bf6b3 commit 657cf13
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 205 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl;

import static java.util.Collections.singletonList;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;

import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.tasks.api.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;

/**
* @author jsanda
*/
public class GenerateRate implements Action1<Task> {

private static final Logger logger = LoggerFactory.getLogger(GenerateRate.class);

private MetricsService metricsService;

public GenerateRate(MetricsService metricsService) {
this.metricsService = metricsService;
}

@Override
public void call(Task task) {
logger.info("Generating rate for {}", task);
MetricId id = new MetricId(task.getSources().iterator().next());
long end = task.getTimeSlice().getMillis();
long start = task.getTimeSlice().minusSeconds(task.getWindow()).getMillis();
logger.debug("start = {}, end = {}", start, end);
metricsService.findCounterData(task.getTenantId(), id, start, end)
.take(1)
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(task.getTenantId(), COUNTER_RATE, id,
singletonList(new DataPoint<>(start, rate))))
.flatMap(metric -> metricsService.addGaugeData(Observable.just(metric)))
.subscribe(
aVoid -> {},
t -> logger.warn("Failed to persist rate data", t),
() -> logger.debug("Successfully persisted rate data")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,7 @@ public int hashCode() {

private DataAccess dataAccess;

private List<TaskType> taskTypes = singletonList(
new TaskType()
.setName("counter-rate")
.setSegments(10)
.setSegmentOffsets(10)
.setInterval(5)
.setWindow(5)
.setFactory(this::generateRate));
private List<TaskType> taskTypes;

private TaskService taskService;

Expand Down Expand Up @@ -329,11 +322,8 @@ void setDataAccess(DataAccess dataAccess) {
this.dataAccess = dataAccess;
}

/**
* This is a test hook
*/
List<TaskType> getTaskTypes() {
return taskTypes;
void setTaskTypes(List<TaskType> taskTypes) {
this.taskTypes = taskTypes;
}

public void setTaskService(TaskService taskService) {
Expand Down Expand Up @@ -833,4 +823,5 @@ private Consumer<Task> generateRate() {
);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.Retention;
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.tasks.api.TaskType;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.testng.annotations.BeforeClass;
Expand All @@ -88,8 +89,15 @@ public class MetricsServiceITest extends MetricsITest {
@BeforeClass
public void initClass() {
initSession();

metricsService = new MetricsServiceImpl();
metricsService.setTaskService(new FakeTaskService());
metricsService.setTaskTypes(singletonList(new TaskType()
.setName("counter-rate")
.setSegments(10)
.setSegmentOffsets(10)
.setInterval(5)
.setWindow(5)));
metricsService.startUp(session, getKeyspace(), false, new MetricRegistry());
dataAccess = metricsService.getDataAccess();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.hawkular.metrics.core.impl;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.joda.time.DateTime.now;
import static org.joda.time.Duration.standardSeconds;
Expand All @@ -30,6 +31,7 @@
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskServiceBuilder;
import org.hawkular.metrics.tasks.api.TaskType;
import org.hawkular.metrics.tasks.impl.TaskServiceImpl;
import org.joda.time.DateTime;
import org.testng.annotations.AfterClass;
Expand All @@ -55,21 +57,30 @@ public void initClass() {

dateTimeService = new DateTimeService();

metricsService = new MetricsServiceImpl();

String keyspace = "hawkulartest";
System.setProperty("keyspace", keyspace);
TaskType taskType =new TaskType()
.setName("counter-rate")
.setSegments(10)
.setSegmentOffsets(10)
.setInterval(5)
.setWindow(5);

taskService = new TaskServiceBuilder()
.withSession(session)
.withTimeUnit(TimeUnit.SECONDS)
.withTaskTypes(metricsService.getTaskTypes())
.withTaskTypes(singletonList(taskType))
.build();
((TaskServiceImpl) taskService).setTimeUnit(TimeUnit.SECONDS);
taskService.start();


metricsService = new MetricsServiceImpl();
metricsService.setTaskTypes(singletonList(taskType));
metricsService.setTaskService(taskService);

((TaskServiceImpl) taskService).subscribe(taskType, new GenerateRate(metricsService));

String keyspace = "hawkulartest";
System.setProperty("keyspace", keyspace);

taskService.start();
metricsService.startUp(session, keyspace, false, new MetricRegistry());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.hawkular.metrics.tasks.impl.TaskImpl;

Expand All @@ -35,8 +33,6 @@ public class TaskType {

private String name;

private Supplier<Consumer<Task>> factory;

// TODO This should be an implementation detail of the taks service
private int segments;

Expand All @@ -59,18 +55,6 @@ public TaskType setName(String name) {
return this;
}

/**
* A function that produces functions that carry out task execution.
*/
public Supplier<Consumer<Task>> getFactory() {
return factory;
}

public TaskType setFactory(Supplier<Consumer<Task>> factory) {
this.factory = factory;
return this;
}

/**
* The number of partitions in the database to use for storing tasks. The segment is determined by,
* <code>task.target.hashCode() % segments</code>
Expand Down

0 comments on commit 657cf13

Please sign in to comment.