Skip to content

Commit

Permalink
[HWKMETRICS=52] initial support for automatic lease renewal
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 12, 2015
1 parent d68d873 commit b07119c
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,58 @@
import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author jsanda
*/
public class LeaseManager {

private static final Logger logger = LoggerFactory.getLogger(LeaseManager.class);

public static final int DEFAULT_LEASE_TTL = 180;

public static final int DEFAULT_RENEWAL_RATE = 60;

public static final Function<ResultSet, Void> TO_VOID = resultSet -> null;

private Session session;

private Queries queries;

private ScheduledExecutorService renewals = Executors.newScheduledThreadPool(1);

private int ttl = DEFAULT_LEASE_TTL;

private int renewalRate = DEFAULT_RENEWAL_RATE;

public LeaseManager(Session session, Queries queries) {
this.session = session;
this.queries = queries;
}

void setTTL(int ttl) {
this.ttl = ttl;
}

void setRenewalRate(int renewalRate) {
this.renewalRate = renewalRate;
}

public ListenableFuture<List<Lease>> findUnfinishedLeases(DateTime timeSlice) {
ResultSetFuture future = session.executeAsync(queries.findLeases.bind(timeSlice.toDate()));
return Futures.transform(future, (ResultSet resultSet) -> StreamSupport.stream(resultSet.spliterator(), false)
Expand All @@ -56,7 +80,7 @@ public ListenableFuture<List<Lease>> findUnfinishedLeases(DateTime timeSlice) {
}

public ListenableFuture<Boolean> acquire(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.acquireLease.bind(DEFAULT_LEASE_TTL, lease.getOwner(),
ResultSetFuture future = session.executeAsync(queries.acquireLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset()));
return Futures.transform(future, ResultSet::wasApplied);
}
Expand All @@ -68,12 +92,52 @@ public ListenableFuture<Boolean> acquire(Lease lease, int ttl) {
}

public ListenableFuture<Boolean> renew(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.renewLease.bind(DEFAULT_LEASE_TTL, lease.getOwner(),
ResultSetFuture future = session.executeAsync(queries.renewLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset(),
lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
}

/**
* Schedules the lease to be automatically renewed every {@link #DEFAULT_RENEWAL_RATE} seconds in a background
* thread. Renewals will stop once the lease is set to finished. If the lease cannot be renewed, then the lease
* owner, i.e., the calling thread, will be interrupted. It therefore important for lease owners to handle
* InterruptedExceptions appropriately.
*/
public void autoRenew(Lease lease) {
autoRenew(lease, Thread.currentThread());
}

private void autoRenew(Lease lease, Thread leaseOwner) {
renewals.schedule(createRenewRunnable(lease, leaseOwner), renewalRate, TimeUnit.SECONDS);
}

private Runnable createRenewRunnable(Lease lease, Thread leaseOwner) {
return () -> {
if (lease.isFinished()) {
return;
}
ListenableFuture<Boolean> renewedFuture = renew(lease);
Futures.addCallback(renewedFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean renewed) {
if (renewed) {
autoRenew(lease, leaseOwner);
} else {
logger.info("Failed to renew " + lease);
leaseOwner.interrupt();
}
}

@Override
public void onFailure(Throwable t) {
logger.warn("Failed to renew " + lease + " for " + leaseOwner);
// TODO figure out what to do in this scenario
}
});
};
}

public ListenableFuture<Boolean> renew(Lease lease, int ttl) {
ResultSetFuture future = session.executeAsync(queries.renewLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,51 @@
*/
package org.hawkular.metrics.tasks.impl;

import static java.util.Collections.singletonList;
import static org.joda.time.DateTime.now;
import static org.joda.time.Duration.standardMinutes;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import com.datastax.driver.core.PreparedStatement;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import org.hawkular.metrics.tasks.BaseTest;
import org.joda.time.DateTime;
import org.joda.time.Minutes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/**
* @author jsanda
*/
public class LeaseManagerTest extends BaseTest {

// private static final long FUTURE_TIMEOUT = 3;
//
// private Session session;
//
// private DateTimeService dateTimeService;
//
// private Queries queries;
//
private static Logger logger = LoggerFactory.getLogger(LeaseManagerTest.class);

private LeaseManager leaseManager;
//

private PreparedStatement createdFinishedLease;
//

@BeforeClass
public void initClass() {
leaseManager = new LeaseManager(session, queries);
createdFinishedLease = session.prepare(
"INSERT INTO leases (time_slice, task_type, segment_offset, finished) VALUES (?, ?, ?, ?)");
}
//
// @BeforeMethod
// public void resetDB() {
// session.execute("TRUNCATE leases");
// }

@BeforeMethod
public void init() {
leaseManager = new LeaseManager(session, queries);
}

@Test
public void findUnfinishedLeases() throws Exception {
Expand All @@ -74,7 +75,7 @@ public void findUnfinishedLeases() throws Exception {
session.execute(queries.createLease.bind(timeSlice.toDate(), taskType3, 0));
session.execute(queries.createLease.bind(timeSlice.toDate(), taskType4, 0));

session.execute(createdFinishedLease.bind(timeSlice.toDate(), taskType3, 0, true));
session.execute(createdFinishedLease.bind(timeSlice.toDate(), taskType3, 0, true));
session.execute(createdFinishedLease.bind(timeSlice.toDate(), taskType4, 0, true));

ListenableFuture<List<Lease>> future = leaseManager.findUnfinishedLeases(timeSlice);
Expand Down Expand Up @@ -154,4 +155,114 @@ public void renewLeaseBeforeItExpires() throws Exception {
assertTrue(renewed, "Expected lease to be renewed");
}

@Test
public void autoRenewLease() throws Exception {
int ttl = 3;
int renewalRate = 1;

DateTime timeSlice = dateTimeService.getTimeSlice(now(), standardMinutes(1));
Lease lease = new Lease(timeSlice, "autoRenew", 0, null, false);

session.execute(queries.createLease.bind(lease.getTimeSlice().toDate(), lease.getTaskType(),
lease.getSegmentOffset()));

leaseManager.setTTL(ttl);
leaseManager.setRenewalRate(renewalRate);

lease.setOwner("server1");
Boolean acquired = getUninterruptibly(leaseManager.acquire(lease));
assertTrue(acquired, "Expected to acquire " + lease);

executeInNewThread(() -> {
leaseManager.autoRenew(lease);
List<Lease> expectedLeases = singletonList(lease);
try {
for (int i = 0; i < 5; ++i) {
Thread.sleep(ttl * 1000);
List<Lease> remainingLeases = getUninterruptibly(leaseManager.findUnfinishedLeases(timeSlice));
logger.info("Remaining leases = {}", remainingLeases);
assertEquals(remainingLeases, expectedLeases,
"The unfinished leases do not match expected values");
}
Boolean finished = getUninterruptibly(leaseManager.finish(lease));
assertTrue(finished, "Expected " + lease + " to be finished");
} catch (Exception e) {
fail("There was an unexpected error", e);
}
}, 20000);

List<Lease> unfinishedLeases = getUninterruptibly(leaseManager.findUnfinishedLeases(timeSlice));
assertTrue(unfinishedLeases.isEmpty(), "There should not be any unfinished leases but found " +
unfinishedLeases);
}

@Test
public void failToRenewLease() throws Exception {
int ttl = 2;
int renewalRate = 10;
DateTime timeSlice = dateTimeService.getTimeSlice(now(), standardMinutes(1));
Lease lease = new Lease(timeSlice, "failToRenew", 0, null, false);

session.execute(queries.createLease.bind(lease.getTimeSlice().toDate(), lease.getTaskType(),
lease.getSegmentOffset()));

leaseManager.setTTL(ttl);
leaseManager.setRenewalRate(renewalRate);

lease.setOwner("server1");
Boolean acquired = getUninterruptibly(leaseManager.acquire(lease));
assertTrue(acquired, "Expected to acquire " + lease);

executeInNewThread(() -> {
leaseManager.autoRenew(lease);

try {
lease.setOwner("server2");
Thread.sleep(2000);
Boolean acquiredByNewOwner = getUninterruptibly(leaseManager.acquire(lease));
assertTrue(acquiredByNewOwner, "Expected " + lease + " to be acquired by new owner");
} catch (Exception e) {
fail("There was an unexpected error trying to acquire " + lease, e);
}

InterruptedException exception = null;
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
exception = e;
}
assertNotNull(exception, "Expected an " + InterruptedException.class.getSimpleName() +
" to be thrown.");
}, 20000);
}

/**
* Executes the runnable in a new thread. If an AssertionError or any other exception is
* thrown in the new thread, it will be caught and rethrown in the calling thread as an
* AssertionError.
*
* @param runnable
* @param wait
* @throws InterruptedException
*/
private void executeInNewThread(Runnable runnable, long wait) throws InterruptedException {
AtomicReference<AssertionError> errorRef = new AtomicReference<>();
Runnable wrappedRunnable = () -> {
try {
runnable.run();
} catch (AssertionError e) {
errorRef.set(e);
} catch (Throwable t) {
errorRef.set(new AssertionError("There was an unexpected exception", t));
}
};
Thread thread = new Thread(wrappedRunnable);
thread.start();
thread.join(wait);

if (errorRef.get() != null) {
throw errorRef.get();
}
}

}

0 comments on commit b07119c

Please sign in to comment.