Skip to content

Commit

Permalink
[HWMETRICS-52] first cut at executing tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 12, 2015
1 parent dbb682c commit 14029f7
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.joda.time.DateTime;
Expand All @@ -37,6 +38,8 @@ public class LeaseManager {

public static final int DEFAULT_LEASE_TTL = 180;

public static final Function<ResultSet, Void> TO_VOID = resultSet -> null;

private Session session;

private Queries queries;
Expand Down Expand Up @@ -86,4 +89,9 @@ public ListenableFuture<Boolean> finish(Lease lease) {
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Void> deleteLeases(DateTime timeSlice) {
ResultSetFuture future = session.executeAsync(queries.deleteLeases.bind(timeSlice.toDate()));
return Futures.transform(future, TO_VOID);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ public class Queries {

public PreparedStatement finishLease;

public PreparedStatement deleteLeases;

public PreparedStatement createTask;

public PreparedStatement findTasks;

public PreparedStatement deleteTasks;

public Queries(Session session) {
createLease = session.prepare(
"INSERT INTO leases (time_slice, task_type, segment_offset) VALUES (?, ?, ?)");
Expand Down Expand Up @@ -67,6 +71,8 @@ public Queries(Session session) {
"WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
"IF owner = ?");

deleteLeases = session.prepare("DELETE FROM leases WHERE time_slice = ?");

createTask = session.prepare(
"INSERT INTO task_queue (task_type, time_slice, segment, target, sources, interval, window) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)");
Expand All @@ -75,6 +81,9 @@ public Queries(Session session) {
"SELECT target, sources, interval, window " +
"FROM task_queue " +
"WHERE task_type = ? AND time_slice = ? AND segment = ?");

deleteTasks = session.prepare(
"DELETE FROM task_queue WHERE task_type = ? AND time_slice = ? AND segment = ?");
}

}
29 changes: 20 additions & 9 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
public class Task {

private TaskDef taskDef;
private TaskType taskType;

private String target;

Expand All @@ -44,24 +44,24 @@ public class Task {
public Task() {
}

public Task(TaskDef taskDef, String target, Set<String> sources, int interval, int window) {
this.taskDef = taskDef;
public Task(TaskType taskType, String target, Set<String> sources, int interval, int window) {
this.taskType = taskType;
this.target = target;
this.sources = sources;
this.interval = minutes(interval).toStandardDuration();
this.window = minutes(window).toStandardDuration();
}

public Task(TaskDef taskDef, String target, String source, int interval, int window) {
this.taskDef = taskDef;
public Task(TaskType taskType, String target, String source, int interval, int window) {
this.taskType = taskType;
this.target = target;
this.sources = ImmutableSet.of(source);
this.interval = minutes(interval).toStandardDuration();
this.window = minutes(window).toStandardDuration();
}

public TaskDef getTaskDef() {
return taskDef;
public TaskType getTaskType() {
return taskType;
}

public String getTarget() {
Expand All @@ -85,7 +85,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Task task = (Task) o;
return Objects.equals(taskDef, task.taskDef) &&
return Objects.equals(taskType, task.taskType) &&
Objects.equals(target, task.target) &&
Objects.equals(sources, task.sources) &&
Objects.equals(interval, task.interval) &&
Expand All @@ -94,6 +94,17 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(taskDef, target, sources, interval, window);
return Objects.hash(taskType, target, sources, interval, window);
}

@Override
public String toString() {
return com.google.common.base.Objects.toStringHelper(Task.class)
.add("taskDef", taskType.getName())
.add("target", target)
.add("sources", sources)
.add("interval", interval.toStandardMinutes())
.add("window", window.toStandardMinutes())
.toString();
}
}
148 changes: 133 additions & 15 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,67 +19,185 @@
package org.hawkular.metrics.tasks;

import static java.util.stream.Collectors.toList;
import static org.joda.time.DateTime.now;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author jsanda
*/
public class TaskService {

private static final Logger logger = LoggerFactory.getLogger(TaskService.class);

private Duration timeSliceDuration;

private Session session;

private Queries queries;

private List<TaskDef> taskDefs;
private List<TaskType> taskTypes;

private DateTimeService dateTimeService = new DateTimeService();

public TaskService(Session session, Queries queries, Duration timeSliceDuration, List<TaskDef> taskDefs) {
private LeaseManager leaseManager;

private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));

private Semaphore permits = new Semaphore(1);

private String owner;

public TaskService(Session session, Queries queries, LeaseManager leaseManager, Duration timeSliceDuration,
List<TaskType> taskTypes) {
this.session = session;
this.queries = queries;
this.leaseManager = leaseManager;
this.timeSliceDuration = timeSliceDuration;
this.taskDefs = taskDefs;
this.taskTypes = taskTypes;
try {
owner = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new RuntimeException("Failed to initialize owner name", e);
}
}

public ListenableFuture<List<Task>> findTasks(String type, DateTime timeSlice, int segment) {
ResultSetFuture future = session.executeAsync(queries.findTasks.bind(type, timeSlice.toDate(), segment));
TaskDef taskDef = taskDefs.stream()
.filter(t->t.getName().equals(type))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(type + " is not a recognized task type"));
TaskType taskType = findTaskType(type);
return Futures.transform(future, (ResultSet resultSet) -> StreamSupport.stream(resultSet.spliterator(), false)
.map(row -> new Task(taskDef, row.getString(0), row.getSet(1, String.class), row.getInt(2),
.map(row -> new Task(taskType, row.getString(0), row.getSet(1, String.class), row.getInt(2),
row.getInt(3)))
.collect(toList()));
}

public ListenableFuture<DateTime> scheduleTask(Task task) {
DateTime currentTimeSlice = dateTimeService.getTimeSlice(now(), timeSliceDuration);
private TaskType findTaskType(String type) {
return taskTypes.stream()
.filter(t->t.getName().equals(type))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(type + " is not a recognized task type"));
}

public ListenableFuture<DateTime> scheduleTask(DateTime time, Task task) {
DateTime currentTimeSlice = dateTimeService.getTimeSlice(time, timeSliceDuration);
DateTime timeSlice = currentTimeSlice.plus(task.getInterval());
int segment = task.getTaskDef().getName().hashCode() % task.getTaskDef().getSegments();
int segmentOffset = segment / task.getTaskDef().getSegmentOffsets();
int segment = task.getTarget().hashCode() % task.getTaskType().getSegments();
int segmentOffset = segment / task.getTaskType().getSegmentOffsets();

ResultSetFuture queueFuture = session.executeAsync(queries.createTask.bind(task.getTaskDef().getName(),
ResultSetFuture queueFuture = session.executeAsync(queries.createTask.bind(task.getTaskType().getName(),
timeSlice.toDate(), segment, task.getTarget(), task.getSources(), (int) task.getInterval()
.getStandardMinutes(), (int) task.getWindow().getStandardMinutes()));
ResultSetFuture leaseFuture = session.executeAsync(queries.createLease.bind(timeSlice.toDate(),
task.getTaskDef().getName(), segmentOffset));
task.getTaskType().getName(), segmentOffset));
ListenableFuture<List<ResultSet>> futures = Futures.allAsList(queueFuture, leaseFuture);

return Futures.transform(futures, (List<ResultSet> resultSets) -> timeSlice);
}

public void executeTasks(DateTime timeSlice) {
try {
List<Lease> leases = Uninterruptibles.getUninterruptibly(leaseManager.findUnfinishedLeases(timeSlice));
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(new CountDownLatch(leases.size()));
while (!leases.isEmpty() && leases.stream().anyMatch(lease -> !lease.isFinished())) {
for (final Lease lease : leases) {
if (lease.getOwner() == null) {
permits.acquire();
lease.setOwner(owner);
ListenableFuture<Boolean> acquiredFuture = leaseManager.acquire(lease);
Futures.addCallback(acquiredFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean acquired) {
latchRef.get().countDown();
if (acquired) {
TaskType taskType = findTaskType(lease.getTaskType());
for (int i = lease.getSegmentOffset(); i < taskType.getSegments(); ++i) {
ListenableFuture<List<Task>> tasksFuture = findTasks(lease.getTaskType(),
timeSlice, i);
Futures.addCallback(tasksFuture, executeTasksCallback(lease, i), threadPool);
}
} else {
// someone else has the lease so return the permit and try to
// acquire another lease
permits.release();
}
}

@Override
public void onFailure(Throwable t) {
logger.warn("There was an error trying to acquire a lease", t);
latchRef.get().countDown();
}
}, threadPool);
} else {
latchRef.get().countDown();
}
}
latchRef.get().await();
leases = Uninterruptibles.getUninterruptibly(leaseManager.findUnfinishedLeases(timeSlice));
latchRef.set(new CountDownLatch(leases.size()));
}

Uninterruptibles.getUninterruptibly(leaseManager.deleteLeases(timeSlice));
} catch (ExecutionException e) {
logger.warn("Failed to load leases for time slice " + timeSlice, e);
} catch (InterruptedException e) {
logger.warn("There was an interrupt", e);
}
}

private FutureCallback<List<Task>> executeTasksCallback(Lease lease, int segment) {
return new FutureCallback<List<Task>>() {
@Override
public void onSuccess(List<Task> tasks) {
tasks.forEach(task -> {
TaskType taskType = task.getTaskType();
Runnable taskRunner = taskType.getFactory().apply(task);
taskRunner.run();
});

logger.info("deleting tasks for time slice " + lease.getTimeSlice());

session.execute(queries.deleteTasks.bind(lease.getTaskType(), lease.getTimeSlice().toDate(), segment));
ListenableFuture<Boolean> leaseFinished = leaseManager.finish(lease);
Futures.addCallback(leaseFinished, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
permits.release();
}

@Override
public void onFailure(Throwable t) {
logger.warn("Failed to mark lease finished", t);
}
}, threadPool);
}

@Override
public void onFailure(Throwable t) {

}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
package org.hawkular.metrics.tasks;

import java.util.Objects;

import com.google.common.base.Supplier;
import java.util.function.Function;

/**
* @author jsanda
*/
public class TaskDef {
public class TaskType {

private String name;

private Supplier<Runnable> factory;
private Function<Task, Runnable> factory;

private int segments;

Expand All @@ -39,16 +38,16 @@ public String getName() {
return name;
}

public TaskDef setName(String name) {
public TaskType setName(String name) {
this.name = name;
return this;
}

public Supplier<Runnable> getFactory() {
public Function<Task, Runnable> getFactory() {
return factory;
}

public TaskDef setFactory(Supplier<Runnable> factory) {
public TaskType setFactory(Function<Task, Runnable> factory) {
this.factory = factory;
return this;
}
Expand All @@ -57,7 +56,7 @@ public int getSegments() {
return segments;
}

public TaskDef setSegments(int segments) {
public TaskType setSegments(int segments) {
this.segments = segments;
return this;
}
Expand All @@ -66,7 +65,7 @@ public int getSegmentOffsets() {
return segmentOffsets;
}

public TaskDef setSegmentOffsets(int segmentOffsets) {
public TaskType setSegmentOffsets(int segmentOffsets) {
this.segmentOffsets = segmentOffsets;
return this;
}
Expand All @@ -75,8 +74,8 @@ public TaskDef setSegmentOffsets(int segmentOffsets) {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TaskDef taskDef = (TaskDef) o;
return Objects.equals(name, taskDef.name);
TaskType taskType = (TaskType) o;
return Objects.equals(name, taskType.name);
}

@Override
Expand Down

0 comments on commit 14029f7

Please sign in to comment.