Skip to content

Commit

Permalink
[HWKMETRICS-52] set failed_time_slices when rescheduling failed task
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 12, 2015
1 parent f83a979 commit ba4e4fa
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class Queries {

public PreparedStatement createTask;

public PreparedStatement createTaskWithFailures;

public PreparedStatement findTasks;

public PreparedStatement deleteTasks;
Expand Down Expand Up @@ -75,6 +77,10 @@ public Queries(Session session) {
"INSERT INTO task_queue (task_type, time_slice, segment, target, sources, interval, window) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)");

createTaskWithFailures = session.prepare(
"INSERT INTO task_queue (task_type, time_slice, segment, target, sources, interval, window, " +
"failed_time_slices) VALUES (?, ?, ?, ?, ?, ?, ?, ?)");

findTasks = session.prepare(
"SELECT target, sources, interval, window, failed_time_slices " +
"FROM task_queue " +
Expand Down
13 changes: 2 additions & 11 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,6 @@ public Task(TaskType taskType, String target, Set<String> sources, int interval,
this.failedTimeSlices.addAll(failedTimeSlices);
}

// public Task(TaskType taskType, String target, String source, int interval, int window,
// Collection<Date> failedTimeSlices) {
// this.taskType = taskType;
// this.target = target;
// this.sources = ImmutableSet.of(source);
// this.interval = minutes(interval).toStandardDuration();
// this.window = minutes(window).toStandardDuration();
// this.failedTimeSlices.addAll(failedTimeSlices.stream().map(DateTime::new).collect(toList()));
// }

public TaskType getTaskType() {
return taskType;
}
Expand Down Expand Up @@ -124,7 +114,8 @@ public boolean equals(Object o) {
Objects.equals(target, task.target) &&
Objects.equals(sources, task.sources) &&
Objects.equals(interval, task.interval) &&
Objects.equals(window, task.window);
Objects.equals(window, task.window) &&
Objects.equals(failedTimeSlices, task.failedTimeSlices);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package org.hawkular.metrics.tasks;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.joda.time.DateTime.now;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -125,6 +128,9 @@ public void start() {
}

public void shutdown() {
// ticker.shutdownNow();
// scheduler.shutdownNow();
// List<Runnable> droppedTasks = workers.shutdownNow()
}

public ListenableFuture<List<Task>> findTasks(String type, DateTime timeSlice, int segment) {
Expand All @@ -149,19 +155,45 @@ public ListenableFuture<DateTime> scheduleTask(DateTime time, Task task) {
DateTime currentTimeSlice = dateTimeService.getTimeSlice(time, timeSliceDuration);
DateTime timeSlice = currentTimeSlice.plus(task.getInterval());

return scheduleTaskAt(timeSlice, task, taskType);
return scheduleTaskAt(timeSlice, task);
}

private ListenableFuture<DateTime> rescheduleTask(DateTime currentTimeSlice, Task task) {
DateTime nextTimeSlice = currentTimeSlice.plus(task.getInterval());
return scheduleTaskAt(nextTimeSlice, task, task.getTaskType());
return scheduleTaskAt(nextTimeSlice, task);
}

private ListenableFuture<DateTime> scheduleTaskAt(DateTime time, Task task, TaskType taskType) {
private ListenableFuture<DateTime> rescheduleFailedTask(DateTime currentTimeSlice, Task task) {
TaskType taskType = task.getTaskType();
int segment = Math.abs(task.getTarget().hashCode() % taskType.getSegments());
int segmentsPerOffset = taskType.getSegments() / taskType.getSegmentOffsets();
int segmentOffset = (segment / segmentsPerOffset) * segmentsPerOffset;

task.getFailedTimeSlices().add(currentTimeSlice);

DateTime timeSlice = currentTimeSlice.plus(task.getInterval());

ResultSetFuture queueFuture = session.executeAsync(queries.createTaskWithFailures.bind(taskType.getName(),
timeSlice.toDate(), segment, task.getTarget(), task.getSources(), (int) task.getInterval()
.getStandardMinutes(), (int) task.getWindow().getStandardMinutes(),
toDates(task.getFailedTimeSlices())));
ResultSetFuture leaseFuture = session.executeAsync(queries.createLease.bind(timeSlice.toDate(),
taskType.getName(), segmentOffset));
ListenableFuture<List<ResultSet>> futures = Futures.allAsList(queueFuture, leaseFuture);

return Futures.transform(futures, (List<ResultSet> resultSets) -> timeSlice);
}

private Set<Date> toDates(Set<DateTime> times) {
return times.stream().map(DateTime::toDate).collect(toSet());
}

private ListenableFuture<DateTime> scheduleTaskAt(DateTime time, Task task) {
TaskType taskType = task.getTaskType();
int segment = Math.abs(task.getTarget().hashCode() % taskType.getSegments());
int segmentsPerOffset = taskType.getSegments() / taskType.getSegmentOffsets();
int segmentOffset = (segment / segmentsPerOffset) * segmentsPerOffset;

ResultSetFuture queueFuture = session.executeAsync(queries.createTask.bind(taskType.getName(),
time.toDate(), segment, task.getTarget(), task.getSources(), (int) task.getInterval()
.getStandardMinutes(), (int) task.getWindow().getStandardMinutes()));
Expand Down Expand Up @@ -220,7 +252,7 @@ public void onSuccess(Boolean acquired) {
ListenableFuture<List<Task>> tasksFuture = findTasks(lease.getTaskType(),
timeSlice, i);
ListenableFuture<ExecutionResults> resultsFuture =
Futures.transform(tasksFuture, executeTaskSegment1(timeSlice,
Futures.transform(tasksFuture, executeTaskSegment(timeSlice,
taskType, i), workers);
ListenableFuture<List<DateTime>> nextExecutionsFuture = Futures.transform(
resultsFuture, scheduleNextExecution, workers);
Expand Down Expand Up @@ -263,7 +295,7 @@ public void onFailure(Throwable t) {
}
}

private Function<List<Task>, ExecutionResults> executeTaskSegment1(DateTime timeSlice, TaskType taskType,
private Function<List<Task>, ExecutionResults> executeTaskSegment(DateTime timeSlice, TaskType taskType,
int segment) {
return tasks -> {
ExecutionResults results = new ExecutionResults(timeSlice, taskType, segment);
Expand All @@ -283,8 +315,13 @@ private Function<List<Task>, ExecutionResults> executeTaskSegment1(DateTime time

private AsyncFunction<ExecutionResults, List<DateTime>> scheduleNextExecution = results -> {
List<ListenableFuture<DateTime>> scheduledFutures = new ArrayList<>();
results.getExecutedTasks().forEach(task ->
scheduledFutures.add(rescheduleTask(results.getTimeSlice(), task)));
results.getExecutedTasks().forEach(task -> {
if (task.succeeded()) {
scheduledFutures.add(rescheduleTask(results.getTimeSlice(), task));
} else {
scheduledFutures.add(rescheduleFailedTask(results.getTimeSlice(), task));
}
});
return Futures.allAsList(scheduledFutures);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,17 @@ private Function<DateTime, Lease> newLease(TaskType taskType, Integer segmentOff
return timeSlice -> new Lease(timeSlice, taskType.getName(), segmentOffset, null, false);
}

/**
* The compiler requires that a method with @SafeVarargs be either static or final.
* checkstyle fails when making a private method final, so this method is protected to
* avoid any more wasted time with checkstyle/
* @param timeSlice
* @param leaseFns
* @throws Exception
*/
@SafeVarargs
private final void assertLeasesCreated(DateTime timeSlice, Function<DateTime, Lease>... leaseFns) throws Exception {
protected final void assertLeasesCreated(DateTime timeSlice, Function<DateTime, Lease>... leaseFns)
throws Exception {
Lease[] leases = Arrays.stream(leaseFns)
.map((Function<DateTime, Lease> fn) -> fn.apply(timeSlice))
.toArray(Lease[]::new);
Expand Down

0 comments on commit ba4e4fa

Please sign in to comment.