Skip to content

Commit

Permalink
[HWKMETRICS-76] porting more task service code to RxJava
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 26, 2015
1 parent 82b0211 commit ebb5189
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.hawkular.metrics.tasks.api;

import com.google.common.util.concurrent.ListenableFuture;
import org.joda.time.DateTime;
import rx.Observable;

/**
* The primary API for task scheduling and execution. See {@link TaskServiceBuilder} for details on creating and
Expand Down Expand Up @@ -64,5 +64,5 @@ public interface TaskService {
* @param task The task to schedule for execution
* @return The task with its {@link Task#getTimeSlice() scheduled execution time} set
*/
ListenableFuture<Task> scheduleTask(DateTime time, Task task);
Observable<Task> scheduleTask(DateTime time, Task task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hawkular.metrics.tasks.impl.LeaseService;
import org.hawkular.metrics.tasks.impl.Queries;
import org.hawkular.metrics.tasks.impl.TaskServiceImpl;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;

/**
Expand Down Expand Up @@ -69,8 +70,9 @@ public TaskServiceBuilder withTaskTypes(List<TaskType> taskTypes) {

public TaskService build() {
Queries queries = new Queries(session);
LeaseService leaseService = new LeaseService(new RxSessionImpl(session), queries);
TaskServiceImpl taskService = new TaskServiceImpl(session, queries, leaseService, taskTypes);
RxSession rxSession = new RxSessionImpl(session);
LeaseService leaseService = new LeaseService(rxSession, queries);
TaskServiceImpl taskService = new TaskServiceImpl(rxSession, queries, leaseService, taskTypes);
taskService.setTimeUnit(timeUnit);

return taskService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class LeaseService {

public static final int DEFAULT_RENEWAL_RATE = 60;

private RxSession rxSession;
private RxSession session;

private Queries queries;

Expand All @@ -51,7 +51,7 @@ public class LeaseService {
private int renewalRate = DEFAULT_RENEWAL_RATE;

public LeaseService(RxSession session, Queries queries) {
this.rxSession = session;
this.session = session;
this.queries = queries;
}

Expand All @@ -69,7 +69,7 @@ void setRenewalRate(int renewalRate) {
}

public Observable<? extends List<Lease>> loadLeases(DateTime timeSlice) {
return rxSession.execute(queries.findLeases.bind(timeSlice.toDate()))
return session.execute(queries.findLeases.bind(timeSlice.toDate()))
.flatMap(Observable::from)
.map(row -> new Lease(timeSlice, row.getString(0), row.getInt(1), row.getString(2), row.getBool(3)))
.filter(lease -> !lease.isFinished())
Expand All @@ -95,17 +95,17 @@ public Observable<Lease> findUnfinishedLeases(DateTime timeSlice) {


public Observable<Boolean> acquire(Lease lease) {
return rxSession.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
return session.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset())).map(ResultSet::wasApplied);
}

public Observable<Boolean> acquire(Lease lease, int ttl) {
return rxSession.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
return session.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset())).map(ResultSet::wasApplied);
}

public Observable<Boolean> renew(Lease lease) {
return rxSession.execute(queries.renewLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
return session.execute(queries.renewLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset(), lease.getOwner())).map(ResultSet::wasApplied);
}

Expand Down Expand Up @@ -145,16 +145,12 @@ private Runnable createRenewRunnable(Lease lease, Thread leaseOwner) {
}

public Observable<Boolean> finish(Lease lease) {
return rxSession.execute(queries.finishLease.bind(lease.getTimeSlice().toDate(), lease.getTaskType(),
return session.execute(queries.finishLease.bind(lease.getTimeSlice().toDate(), lease.getTaskType(),
lease.getSegmentOffset(), lease.getOwner())).map(ResultSet::wasApplied);
}

// public Observable<Void> deleteLeases(DateTime timeSlice) {
// return rxSession.execute(queries.deleteLeases.bind(timeSlice.toDate())).flatMap(resultSet -> null);
// }

public void deleteLeases(DateTime timeSlice) {
rxSession.getSession().execute(queries.deleteLeases.bind(timeSlice.toDate()));
public Observable<Void> deleteLeases(DateTime timeSlice) {
return session.execute(queries.deleteLeases.bind(timeSlice.toDate())).flatMap(resultSet -> null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@
import java.util.function.Consumer;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.hawkular.metrics.schema.SchemaManager;
Expand All @@ -49,8 +45,6 @@
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskType;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;
import org.hawkular.rx.cassandra.driver.RxUtil;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
Expand All @@ -65,8 +59,6 @@ public class TaskServiceImpl implements TaskService {

private static final Logger logger = LoggerFactory.getLogger(TaskServiceImpl.class);

private Session session;

private RxSession rxSession;

private Queries queries;
Expand Down Expand Up @@ -105,17 +97,16 @@ public class TaskServiceImpl implements TaskService {
*/
private TimeUnit timeUnit = TimeUnit.MINUTES;

public TaskServiceImpl(Session session, Queries queries, LeaseService leaseService, List<TaskType> taskTypes) {
public TaskServiceImpl(RxSession session, Queries queries, LeaseService leaseService, List<TaskType> taskTypes) {
try {
this.session = session;
this.rxSession = session;
this.queries = queries;
this.leaseService = leaseService;
this.taskTypes = taskTypes;
dateTimeService = new DateTimeService();
owner = InetAddress.getLocalHost().getHostName();
rxSession = new RxSessionImpl(session);

SchemaManager schemaManager = new SchemaManager(session);
SchemaManager schemaManager = new SchemaManager(session.getSession());
String keyspace = System.getProperty("keyspace", "hawkular_metrics");
schemaManager.createSchema(keyspace);
} catch (UnknownHostException e) {
Expand Down Expand Up @@ -206,15 +197,13 @@ private TaskType findTaskType(String type) {
}

@Override
public ListenableFuture<Task> scheduleTask(DateTime time, Task task) {
public Observable<Task> scheduleTask(DateTime time, Task task) {
TaskType taskType = findTaskType(task.getTaskType().getName());

DateTime currentTimeSlice = dateTimeService.getTimeSlice(time, task.getInterval());
DateTime timeSlice = currentTimeSlice.plus(task.getInterval());

ListenableFuture<DateTime> timeFuture = scheduleTaskAt(timeSlice, task);
return Futures.transform(timeFuture, (DateTime scheduledTime) -> new TaskImpl(task.getTaskType(),
scheduledTime,
return scheduleTaskAt(timeSlice, task).map(scheduledTime -> new TaskImpl(task.getTaskType(), scheduledTime,
task.getTarget(), task.getSources(), task.getInterval(), task.getWindow()));
}

Expand All @@ -224,55 +213,74 @@ private Observable<TaskContainer> rescheduleTask(TaskContainer taskContainer) {
int segment = Math.abs(taskContainer.getTarget().hashCode() % taskType.getSegments());
int segmentsPerOffset = taskType.getSegments() / taskType.getSegmentOffsets();
int segmentOffset = (segment / segmentsPerOffset) * segmentsPerOffset;
ResultSetFuture queueFuture;
Observable<ResultSet> queueObservable;

if (taskContainer.getFailedTimeSlices().isEmpty()) {
queueFuture = session.executeAsync(queries.createTask.bind(taskType.getName(), nextTimeSlice.toDate(),
queueObservable = rxSession.execute(queries.createTask.bind(taskType.getName(), nextTimeSlice.toDate(),
segment, taskContainer.getTarget(), taskContainer.getSources(),
taskContainer.getInterval().toStandardMinutes().getMinutes(),
taskContainer.getWindow().toStandardMinutes().getMinutes()));
} else {
queueFuture = session.executeAsync(queries.createTaskWithFailures.bind(taskType.getName(),
queueObservable = rxSession.execute(queries.createTaskWithFailures.bind(taskType.getName(),
nextTimeSlice.toDate(), segment, taskContainer.getTarget(), taskContainer.getSources(),
taskContainer.getInterval().toStandardMinutes().getMinutes(),
taskContainer.getWindow().toStandardMinutes().getMinutes(), toDates(
taskContainer.getFailedTimeSlices())));
}
ResultSetFuture leaseFuture = session.executeAsync(queries.createLease.bind(nextTimeSlice.toDate(),
Observable<ResultSet> leaseObservable = rxSession.execute(queries.createLease.bind(nextTimeSlice.toDate(),
taskType.getName(), segmentOffset));
ListenableFuture<List<ResultSet>> futures = Futures.allAsList(queueFuture, leaseFuture);
ListenableFuture<TaskContainer> resultFuture = Futures.transform(futures, (List<ResultSet> resultSets) ->
taskContainer);

return RxUtil.from(resultFuture, workers);
return Observable.create(subscriber ->
queueObservable.concatWith(leaseObservable).subscribe(
resultSet -> {},
subscriber::onError,
() -> {
subscriber.onNext(taskContainer);
subscriber.onCompleted();
})
);
}

private Set<Date> toDates(Set<DateTime> times) {
return times.stream().map(DateTime::toDate).collect(toSet());
}

private ListenableFuture<DateTime> scheduleTaskAt(DateTime time, Task task) {
private Observable<DateTime> scheduleTaskAt(DateTime time, Task task) {
TaskType taskType = task.getTaskType();
int segment = Math.abs(task.getTarget().hashCode() % taskType.getSegments());
int segmentsPerOffset = taskType.getSegments() / taskType.getSegmentOffsets();
int segmentOffset = (segment / segmentsPerOffset) * segmentsPerOffset;

ResultSetFuture queueFuture = session.executeAsync(queries.createTask.bind(taskType.getName(),
Observable<ResultSet> queueObservable = rxSession.execute(queries.createTask.bind(taskType.getName(),
time.toDate(), segment, task.getTarget(), task.getSources(), (int) task.getInterval()
.getStandardMinutes(), (int) task.getWindow().getStandardMinutes()));
ResultSetFuture leaseFuture = session.executeAsync(queries.createLease.bind(time.toDate(),
Observable<ResultSet> leaseObservable = rxSession.execute(queries.createLease.bind(time.toDate(),
taskType.getName(), segmentOffset));
ListenableFuture<List<ResultSet>> futures = Futures.allAsList(queueFuture, leaseFuture);

return Futures.transform(futures, (List<ResultSet> resultSets) -> time);
return Observable.create(subscriber -> queueObservable.concatWith(leaseObservable).subscribe(
resultSet -> {
},
subscriber::onError,
() -> {
subscriber.onNext(time);
subscriber.onCompleted();
}
)
);
}

public void executeTasks(DateTime timeSlice) {
/**
* This method is visible for testing. It is not part of the {@link TaskService} API. It runs on the scheduler
* thread and is blocking. Task execution is done in parallel in the workers thread pool, but the scheduler thread
* executing this method blocks until all tasks for the time slice are finished.
*
* @param timeSlice The time slice to process
*/
void executeTasks(DateTime timeSlice) {
try {
// Execute tasks in order of task types. Once all of the tasks are executed, we delete the lease partition.
taskTypes.forEach(taskType -> executeTasks(timeSlice, taskType));
// leaseService.deleteLeases(timeSlice).toBlocking().lastOrDefault(null);
leaseService.deleteLeases(timeSlice);
leaseService.deleteLeases(timeSlice).toBlocking().lastOrDefault(null);
} catch (Exception e) {
logger.warn("Failed to delete lease partition for time slice " + timeSlice, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.common.util.concurrent.Uninterruptibles;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.impl.Queries;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeSuite;

Expand All @@ -38,6 +40,8 @@ public class BaseTest {

protected static Session session;

protected static RxSession rxSession;

protected static DateTimeService dateTimeService;

protected static Queries queries;
Expand All @@ -47,6 +51,7 @@ public static void initSuite() throws Exception {
Cluster cluster = Cluster.builder().addContactPoints("127.0.0.01").build();
String keyspace = System.getProperty("keyspace", "hawkulartest");
session = cluster.connect("system");
rxSession = new RxSessionImpl(session);

SchemaManager schemaManager = new SchemaManager(session);
schemaManager.createSchema(keyspace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.hawkular.metrics.tasks.impl;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.joda.time.DateTime.now;
import static org.joda.time.Duration.standardSeconds;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -46,7 +47,7 @@ public void initClass() {

@Test
public void startScheduler() throws Exception {
List<TaskType> taskTypes = asList(new TaskType().setName("test").setSegments(1).setSegmentOffsets(1));
List<TaskType> taskTypes = singletonList(new TaskType().setName("test").setSegments(1).setSegmentOffsets(1));

List<DateTime> actualTimeSlices = new ArrayList<>();
List<DateTime> expectedTimeSlices = asList(
Expand All @@ -55,7 +56,7 @@ public void startScheduler() throws Exception {
dateTimeService.getTimeSlice(now().plusSeconds(4), standardSeconds(1))
);

TaskServiceImpl taskService = new TaskServiceImpl(session, queries, leaseService, taskTypes) {
TaskServiceImpl taskService = new TaskServiceImpl(rxSession, queries, leaseService, taskTypes) {
@Override
public void executeTasks(DateTime timeSlice) {
actualTimeSlices.add(timeSlice);
Expand Down
Loading

0 comments on commit ebb5189

Please sign in to comment.