Skip to content

Commit

Permalink
[HWKMETRICS-52] add shutdown and interrupt handling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 12, 2015
1 parent b07119c commit 3a8a153
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
/**
* @author jsanda
*/
public class LeaseManager {
public class LeaseService {

private static final Logger logger = LoggerFactory.getLogger(LeaseManager.class);
private static final Logger logger = LoggerFactory.getLogger(LeaseService.class);

public static final int DEFAULT_LEASE_TTL = 180;

Expand All @@ -58,11 +58,16 @@ public class LeaseManager {

private int renewalRate = DEFAULT_RENEWAL_RATE;

public LeaseManager(Session session, Queries queries) {
public LeaseService(Session session, Queries queries) {
this.session = session;
this.queries = queries;
}

public void shutdown() {
logger.info("Shutting down");
renewals.shutdownNow();
}

void setTTL(int ttl) {
this.ttl = ttl;
}
Expand Down Expand Up @@ -124,7 +129,7 @@ public void onSuccess(Boolean renewed) {
if (renewed) {
autoRenew(lease, leaseOwner);
} else {
logger.info("Failed to renew " + lease);
logger.info("Failed to renew " + lease + " for " + leaseOwner);
leaseOwner.interrupt();
}
}
Expand All @@ -138,13 +143,6 @@ public void onFailure(Throwable t) {
};
}

public ListenableFuture<Boolean> renew(Lease lease, int ttl) {
ResultSetFuture future = session.executeAsync(queries.renewLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset(),
lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Boolean> finish(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.finishLease.bind(lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset(), lease.getOwner()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class TaskContainer implements Iterable<Task> {

private SortedSet<DateTime> failedTimeSlices = new TreeSet<>();

/**
* Creates a copy of the task container, excluding its {@link #getFailedTimeSlices() failedTimeSlices}.
*/
public static TaskContainer copyWithoutFailures(TaskContainer container) {
TaskContainer newContainer = new TaskContainer();
newContainer.taskType = container.taskType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.joda.time.DateTime.now;
import static org.joda.time.Duration.standardMinutes;
import static org.joda.time.Duration.standardSeconds;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -49,8 +51,9 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import org.hawkular.metrics.tasks.DateTimeService;
import org.hawkular.metrics.tasks.api.TaskExecutionException;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskExecutionException;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskType;
import org.joda.time.DateTime;
import org.joda.time.Duration;
Expand All @@ -60,31 +63,32 @@
/**
* @author jsanda
*/
public class TaskServiceImpl implements org.hawkular.metrics.tasks.api.TaskService {
public class TaskServiceImpl implements TaskService {

private static final Logger logger = LoggerFactory.getLogger(TaskServiceImpl.class);

private Duration timeSliceDuration;

private Session session;

private Queries queries;

private List<TaskType> taskTypes;

private LeaseManager leaseManager;
private LeaseService leaseService;

/**
* The ticker thread pool is responsible for scheduling task execution every tick on the scheduler thread pool.
*/
private ScheduledExecutorService ticker = Executors.newScheduledThreadPool(1);

/**
* Thread pool that schedules or kicks of task execution. Task execution runs on the workers thread pool. The
* Thread pool that schedules or kicks off task execution. Task execution runs on the workers thread pool. The
* scheduler blocks though until task execution for a time slice is finished.
*/
private ExecutorService scheduler = Executors.newSingleThreadExecutor();

/**
* Thread pool for executing tasks.
*/
private ListeningExecutorService workers = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));

/**
Expand All @@ -97,14 +101,20 @@ public class TaskServiceImpl implements org.hawkular.metrics.tasks.api.TaskServi

private DateTimeService dateTimeService;

private TimeUnit tickerUnit = TimeUnit.MINUTES;
/**
* The duration of a time slice for tasks.
*/
private Duration timeSliceDuration = standardMinutes(1);

public TaskServiceImpl(Session session, Queries queries, LeaseManager leaseManager, Duration timeSliceDuration,
List<TaskType> taskTypes) {
/**
* The time units to use for the ticker. This determines the frequency at which jobs are submitted to the scheduler.
*/
private TimeUnit timeUnit = TimeUnit.MINUTES;

public TaskServiceImpl(Session session, Queries queries, LeaseService leaseService, List<TaskType> taskTypes) {
this.session = session;
this.queries = queries;
this.leaseManager = leaseManager;
this.timeSliceDuration = timeSliceDuration;
this.leaseService = leaseService;
this.taskTypes = taskTypes;
try {
owner = InetAddress.getLocalHost().getHostName();
Expand All @@ -114,8 +124,35 @@ public TaskServiceImpl(Session session, Queries queries, LeaseManager leaseManag
dateTimeService = new DateTimeService();
}

void setTickerUnit(TimeUnit tickerUnit) {
this.tickerUnit = tickerUnit;
/**
* <p>
* The time unit determines a couple things. First, it determines the frequency for scheduling jobs. If
* {@link TimeUnit#MINUTES} is used for instance, then jobs are scheduled every minute. In this context, a job
* refers to finding and executing tasks in the queue for a particular time slice, which brings up the second
* thing that <code>timeUnit</code> determines - time slice interval. Time slices are set along fixed intervals,
* e.g., 13:00, 13:01, 13:02, etc.
* </p>
* <p>
* <strong>Note:</strong> This should only be called prior to calling {@link #start()}.
* </p>
*/
void setTimeUnit(TimeUnit timeUnit) {
switch (timeUnit) {
case SECONDS:
this.timeUnit = TimeUnit.SECONDS;
timeSliceDuration = standardSeconds(1);
break;
case MINUTES:
this.timeUnit = TimeUnit.MINUTES;
timeSliceDuration = standardMinutes(1);
break;
case HOURS:
this.timeUnit = TimeUnit.HOURS;
timeSliceDuration = standardMinutes(60);
break;
default:
throw new IllegalArgumentException(timeUnit + " is not a supported time unit");
}
}

@Override
Expand All @@ -124,14 +161,24 @@ public void start() {
DateTime timeSlice = dateTimeService.getTimeSlice(now(), timeSliceDuration);
scheduler.submit(() -> executeTasks(timeSlice));
};
ticker.scheduleAtFixedRate(runnable, 0, 1, tickerUnit);
ticker.scheduleAtFixedRate(runnable, 0, 1, timeUnit);
}

@Override
public void shutdown() {
// ticker.shutdownNow();
// scheduler.shutdownNow();
// List<Runnable> droppedTasks = workers.shutdownNow()
logger.info("Shutting down");

ticker.shutdownNow();
scheduler.shutdownNow();
workers.shutdown();
try {
logger.debug("Waiting for active jobs to finish");
workers.awaitTermination(1, timeUnit);
} catch (InterruptedException e) {
logger.info("The shutdown process has been interrupted. Attempting to forcibly terminate active jobs.");
workers.shutdownNow();
}

}

public ListenableFuture<List<TaskContainer>> findTasks(String type, DateTime timeSlice, int segment) {
Expand Down Expand Up @@ -214,7 +261,7 @@ public void executeTasks(DateTime timeSlice) {
try {
// Execute tasks in order of task types. Once all of the tasks are executed, we delete the lease partition.
taskTypes.forEach(taskType -> executeTasks(timeSlice, taskType));
Uninterruptibles.getUninterruptibly(leaseManager.deleteLeases(timeSlice));
Uninterruptibles.getUninterruptibly(leaseService.deleteLeases(timeSlice));
} catch (ExecutionException e) {
logger.warn("Failed to delete lease partition for time slice " + timeSlice);
}
Expand All @@ -228,7 +275,7 @@ public void executeTasks(DateTime timeSlice) {
*/
private void executeTasks(DateTime timeSlice, TaskType taskType) {
try {
List<Lease> leases = Uninterruptibles.getUninterruptibly(leaseManager.findUnfinishedLeases(timeSlice))
List<Lease> leases = Uninterruptibles.getUninterruptibly(leaseService.findUnfinishedLeases(timeSlice))
.stream().filter(lease -> lease.getTaskType().equals(taskType.getName())).collect(toList());

// A CountDownLatch is used to let us know when to query again for leases. We do not want to query (again)
Expand All @@ -242,11 +289,17 @@ private void executeTasks(DateTime timeSlice, TaskType taskType) {
// that is not finished. When these conditions do not hold, then the leases for the current time slice
// are finished.
while (!(leases.isEmpty() || leases.stream().allMatch(Lease::isFinished))) {
if (Thread.currentThread().isInterrupted()) {
logger.info("Execution of {} tasks for time slice {} has been interrupted.", taskType.getName(),
timeSlice);
break;
}

for (final Lease lease : leases) {
if (lease.getOwner() == null) {
permits.acquire();
lease.setOwner(owner);
ListenableFuture<Boolean> acquiredFuture = leaseManager.acquire(lease);
ListenableFuture<Boolean> acquiredFuture = leaseService.acquire(lease);
Futures.addCallback(acquiredFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean acquired) {
Expand All @@ -268,7 +321,7 @@ public void onSuccess(Boolean acquired) {
ListenableFuture<List<ResultSet>> deletesFuture =
Futures.allAsList(deleteFutures);
ListenableFuture<Boolean> leaseFinishedFuture = Futures.transform(deletesFuture,
(List<ResultSet> resultSets) -> leaseManager.finish(lease), workers);
(List<ResultSet> resultSets) -> leaseService.finish(lease), workers);
Futures.addCallback(leaseFinishedFuture, leaseFinished(lease), workers);
} else {
// someone else has the lease so return the permit and try to
Expand All @@ -288,15 +341,16 @@ public void onFailure(Throwable t) {
}
}
latchRef.get().await();
leases = Uninterruptibles.getUninterruptibly(leaseManager.findUnfinishedLeases(timeSlice))
leases = Uninterruptibles.getUninterruptibly(leaseService.findUnfinishedLeases(timeSlice))
.stream().filter(lease -> lease.getTaskType().equals(taskType.getName())).collect(toList());
latchRef.set(new CountDownLatch(leases.size()));
}

} catch (ExecutionException e) {
logger.warn("Failed to load leases for time slice " + timeSlice, e);
} catch (InterruptedException e) {
logger.warn("There was an interrupt", e);
logger.info("Execution of " + taskType.getName() + " tasks for time slice " + timeSlice +
"was interrupted.", e);
}
}

Expand All @@ -321,20 +375,20 @@ public void onFailure(Throwable t) {
private Function<List<TaskContainer>, List<TaskContainer>> executeTasksSegment = taskContainers -> {
List<TaskContainer> results = new ArrayList<>(taskContainers.size());
taskContainers.forEach(taskContainer -> {
if (Thread.currentThread().isInterrupted()) {
// An interrupt could be due to loss of lease ownership or something else like JVM shutdown. Either
// way, we need to respond to the interrupt which means cancelling task execution.
throw new RuntimeException(Thread.currentThread().getName() + " has been interrupted. Cancelling " +
"task execution");
}
TaskContainer executedTasks = TaskContainer.copyWithoutFailures(taskContainer);
Consumer<Task> taskRunner = taskContainer.getTaskType().getFactory().get();
Consumer<Task> wrappedTaskedRunner = wrapTaskRunner.apply(taskRunner);
try {
taskContainer.forEach(task -> {
wrappedTaskedRunner.accept(task);
taskContainer.getFailedTimeSlices().remove(task.getTimeSlice());
});
taskContainer.forEach(wrappedTaskedRunner::accept);
} catch (TaskExecutionException e) {
logger.warn("Failed to execute " + e.getFailedTask());
executedTasks.getFailedTimeSlices().add(e.getFailedTask().getTimeSlice());
} catch (Throwable t) {
// TODO log details about the specific time slice that failed
logger.error("There was an unexpected error during task execution. This is likely a bug!", t);
}
results.add(executedTasks);
});
Expand Down Expand Up @@ -362,14 +416,12 @@ public void onSuccess(Boolean finished) {

@Override
public void onFailure(Throwable t) {
// We can wind up in the onFailure callback when either any of the task segment deletions fail or when
// marking the lease finished fails with an error. In order to determine what exactly failed, we need
// to register additional callbacks on each future with either Futures.addCallback or
// Futures.withFallback. Neither is particular appealing as it makes the code more complicated. This is
// one of a growing number of reasons I want to prototype a solution using RxJava.

logger.warn("There was an error either while deleting one or more task segments or while attempting "
+ "to mark " + lease + " finished", t);
// There are multiple failure paths including losing lease ownership, failure to delete task segment,
// or failure to mark lease finished. We do not have a good way of determining the exact cause without
// do something like registering additional callbacks with Futures.addCallback or Futures.withFallback.
// Neither is particular appealing as it makes the code more complicated. This is one of a growing
// number of reasons I want to prototype a solution using RxJava.
logger.warn("Failed to process tasks for " + lease, t);
permits.release();
}
};
Expand Down

0 comments on commit 3a8a153

Please sign in to comment.