Skip to content

Commit

Permalink
[HWKMETRICS-76] LeaseService returns Observable instead of Listenable…
Browse files Browse the repository at this point in the history
…Future
  • Loading branch information
John Sanda committed May 18, 2015
1 parent 070986c commit 053d4af
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hawkular.metrics.tasks.impl.LeaseService;
import org.hawkular.metrics.tasks.impl.Queries;
import org.hawkular.metrics.tasks.impl.TaskServiceImpl;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;

/**
* A builder for creating and configuring a {@link TaskService} instance.
Expand Down Expand Up @@ -68,7 +69,7 @@ public TaskServiceBuilder withTaskTypes(List<TaskType> taskTypes) {

public TaskService build() {
Queries queries = new Queries(session);
LeaseService leaseService = new LeaseService(session, queries);
LeaseService leaseService = new LeaseService(new RxSessionImpl(session), queries);
TaskServiceImpl taskService = new TaskServiceImpl(session, queries, leaseService, taskTypes);
taskService.setTimeUnit(timeUnit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,16 @@
*/
package org.hawkular.metrics.tasks.impl;

import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/**
* @author jsanda
Expand All @@ -46,9 +38,7 @@ public class LeaseService {

public static final int DEFAULT_RENEWAL_RATE = 60;

public static final Function<ResultSet, Void> TO_VOID = resultSet -> null;

private Session session;
private RxSession rxSession;

private Queries queries;

Expand All @@ -58,8 +48,8 @@ public class LeaseService {

private int renewalRate = DEFAULT_RENEWAL_RATE;

public LeaseService(Session session, Queries queries) {
this.session = session;
public LeaseService(RxSession session, Queries queries) {
this.rxSession = session;
this.queries = queries;
}

Expand All @@ -76,31 +66,26 @@ void setRenewalRate(int renewalRate) {
this.renewalRate = renewalRate;
}

public ListenableFuture<List<Lease>> findUnfinishedLeases(DateTime timeSlice) {
ResultSetFuture future = session.executeAsync(queries.findLeases.bind(timeSlice.toDate()));
return Futures.transform(future, (ResultSet resultSet) -> StreamSupport.stream(resultSet.spliterator(), false)
.map(row->new Lease(timeSlice, row.getString(0), row.getInt(1), row.getString(2), row.getBool(3)))
.filter(lease -> !lease.isFinished())
.collect(toList()));
public Observable<Lease> findUnfinishedLeases(DateTime timeSlice) {
return rxSession.execute(queries.findLeases.bind(timeSlice.toDate()))
.flatMap(Observable::from)
.map(row -> new Lease(timeSlice, row.getString(0), row.getInt(1), row.getString(2), row.getBool(3)))
.filter(lease -> !lease.isFinished());
}

public ListenableFuture<Boolean> acquire(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.acquireLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset()));
return Futures.transform(future, ResultSet::wasApplied);
public Observable<Boolean> acquire(Lease lease) {
return rxSession.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset())).map(ResultSet::wasApplied);
}

public ListenableFuture<Boolean> acquire(Lease lease, int ttl) {
ResultSetFuture future = session.executeAsync(queries.acquireLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset()));
return Futures.transform(future, ResultSet::wasApplied);
public Observable<Boolean> acquire(Lease lease, int ttl) {
return rxSession.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset())).map(ResultSet::wasApplied);
}

public ListenableFuture<Boolean> renew(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.renewLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset(),
lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
public Observable<Boolean> renew(Lease lease) {
return rxSession.execute(queries.renewLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset(), lease.getOwner())).map(ResultSet::wasApplied);
}

/**
Expand All @@ -122,36 +107,29 @@ private Runnable createRenewRunnable(Lease lease, Thread leaseOwner) {
if (lease.isFinished()) {
return;
}
ListenableFuture<Boolean> renewedFuture = renew(lease);
Futures.addCallback(renewedFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean renewed) {
if (renewed) {
autoRenew(lease, leaseOwner);
} else {
logger.info("Failed to renew " + lease + " for " + leaseOwner);
leaseOwner.interrupt();
}
}

@Override
public void onFailure(Throwable t) {
logger.warn("Failed to renew " + lease + " for " + leaseOwner);
// TODO figure out what to do in this scenario
}
});
renew(lease).subscribe(
renewed -> {
if (renewed) {
autoRenew(lease, leaseOwner);
} else {
logger.info("Failed to renew " + lease + " for " + leaseOwner);
leaseOwner.interrupt();
}
},
t -> {
logger.warn("Failed to renew " + lease + " for " + leaseOwner);
// TODO figure out what to do in this scenario
});
};
}

public ListenableFuture<Boolean> finish(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.finishLease.bind(lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset(), lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
public Observable<Boolean> finish(Lease lease) {
return rxSession.execute(queries.finishLease.bind(lease.getTimeSlice().toDate(), lease.getTaskType(),
lease.getSegmentOffset(), lease.getOwner())).map(ResultSet::wasApplied);
}

public ListenableFuture<Void> deleteLeases(DateTime timeSlice) {
ResultSetFuture future = session.executeAsync(queries.deleteLeases.bind(timeSlice.toDate()));
return Futures.transform(future, TO_VOID);
public Observable<Void> deleteLeases(DateTime timeSlice) {
return rxSession.execute(queries.deleteLeases.bind(timeSlice.toDate())).flatMap(resultSet -> null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -44,28 +43,31 @@
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.DateTimeService;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskExecutionException;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskType;
import org.hawkular.rx.cassandra.driver.RxUtil;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/**
* @author jsanda
*/
public class TaskServiceImpl implements TaskService {
public class
TaskServiceImpl implements TaskService {

private static final Logger logger = LoggerFactory.getLogger(TaskServiceImpl.class);

Expand Down Expand Up @@ -270,8 +272,9 @@ public void executeTasks(DateTime timeSlice) {
try {
// Execute tasks in order of task types. Once all of the tasks are executed, we delete the lease partition.
taskTypes.forEach(taskType -> executeTasks(timeSlice, taskType));
Uninterruptibles.getUninterruptibly(leaseService.deleteLeases(timeSlice));
} catch (ExecutionException e) {
leaseService.deleteLeases(timeSlice).toBlocking().last();
// Uninterruptibles.getUninterruptibly(leaseService.deleteLeases(timeSlice));
} catch (Exception e) {
logger.warn("Failed to delete lease partition for time slice " + timeSlice);
}
}
Expand All @@ -284,8 +287,10 @@ public void executeTasks(DateTime timeSlice) {
*/
private void executeTasks(DateTime timeSlice, TaskType taskType) {
try {
List<Lease> leases = Uninterruptibles.getUninterruptibly(leaseService.findUnfinishedLeases(timeSlice))
.stream().filter(lease -> lease.getTaskType().equals(taskType.getName())).collect(toList());
List<Lease> leases = ImmutableList.copyOf(leaseService.findUnfinishedLeases(timeSlice)
.filter(lease -> lease.getTaskType().equals(taskType.getName()))
.toBlocking()
.toIterable());

// A CountDownLatch is used to let us know when to query again for leases. We do not want to query (again)
// for leases until we have gone through each one. If a lease already has an owner, then we just count
Expand All @@ -308,55 +313,63 @@ private void executeTasks(DateTime timeSlice, TaskType taskType) {
if (lease.getOwner() == null) {
permits.acquire();
lease.setOwner(owner);
ListenableFuture<Boolean> acquiredFuture = leaseService.acquire(lease);
Futures.addCallback(acquiredFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean acquired) {
latchRef.get().countDown();
if (acquired) {
List<ListenableFuture<ResultSet>> deleteFutures = new ArrayList<>();
TaskType taskType = findTaskType(lease.getTaskType());
for (int i = lease.getSegmentOffset(); i < taskType.getSegments(); ++i) {
ListenableFuture<List<TaskContainer>> tasksFuture =
findTasks(lease.getTaskType(), timeSlice, i);
ListenableFuture<List<TaskContainer>> resultsFuture =
Futures.transform(tasksFuture, executeTasksSegment, workers);
ListenableFuture<List<DateTime>> nextExecutionsFuture = Futures.transform(
resultsFuture, scheduleNextExecution, workers);
ListenableFuture<ResultSet> deleteFuture = Futures.transform(
nextExecutionsFuture, deleteTaskSegment(timeSlice, taskType, i));
deleteFutures.add(deleteFuture);
Observable<Boolean> acquiredObservable = leaseService.acquire(lease);

acquiredObservable.subscribe(
acquired -> {
latchRef.get().countDown();
if (acquired) {
List<ListenableFuture<ResultSet>> deleteFutures = new ArrayList<>();
TaskType type = findTaskType(lease.getTaskType());
for (int i = lease.getSegmentOffset(); i < type.getSegments(); ++i) {
ListenableFuture<List<TaskContainer>> tasksFuture =
findTasks(lease.getTaskType(), timeSlice, i);
ListenableFuture<List<TaskContainer>> resultsFuture =
Futures.transform(tasksFuture, executeTasksSegment, workers);
ListenableFuture<List<DateTime>> nextExecutionsFuture = Futures.transform(
resultsFuture, scheduleNextExecution, workers);
ListenableFuture<ResultSet> deleteFuture = Futures.transform(
nextExecutionsFuture, deleteTaskSegment(timeSlice, type, i));
deleteFutures.add(deleteFuture);
}
ListenableFuture<List<ResultSet>> deletesFuture =
Futures.allAsList(deleteFutures);
RxUtil.from(deletesFuture, workers)
.flatMap(resultSets -> leaseService.finish(lease)).subscribe(
finished -> {
if (!finished) {
logger.warn("All tasks for {} have completed but unable to " +
"set it to finished", lease);
}
permits.release();
},
t -> {
logger.warn("Failed to process tasks for " + lease, t);
permits.release();
});
} else {
// someone else has the lease so return the permit and try to
// acquire another lease
permits.release();
}
ListenableFuture<List<ResultSet>> deletesFuture =
Futures.allAsList(deleteFutures);
ListenableFuture<Boolean> leaseFinishedFuture = Futures.transform(deletesFuture,
(List<ResultSet> resultSets) -> leaseService.finish(lease), workers);
Futures.addCallback(leaseFinishedFuture, leaseFinished(lease), workers);
} else {
// someone else has the lease so return the permit and try to
// acquire another lease
permits.release();
},
t -> {
logger.warn("There was an error trying to acquire a lease", t);
latchRef.get().countDown();
}
}

@Override
public void onFailure(Throwable t) {
logger.warn("There was an error trying to acquire a lease", t);
latchRef.get().countDown();
}
}, workers);
);
} else {
latchRef.get().countDown();
}
}
latchRef.get().await();
leases = Uninterruptibles.getUninterruptibly(leaseService.findUnfinishedLeases(timeSlice))
.stream().filter(lease -> lease.getTaskType().equals(taskType.getName())).collect(toList());
leases = ImmutableList.copyOf(leaseService.findUnfinishedLeases(timeSlice)
.filter(lease -> lease.getTaskType().equals(taskType.getName()))
.toBlocking()
.toIterable());
latchRef.set(new CountDownLatch(leases.size()));
}

} catch (ExecutionException e) {
logger.warn("Failed to load leases for time slice " + timeSlice, e);
} catch (InterruptedException e) {
logger.info("Execution of " + taskType.getName() + " tasks for time slice " + timeSlice +
"was interrupted.", e);
Expand Down

0 comments on commit 053d4af

Please sign in to comment.