Skip to content

Commit

Permalink
[HWKMETRICS-76] fix race condition with updating/deleting leases
Browse files Browse the repository at this point in the history
With my previous commit ( 83b6600) it was possible to delete the leases
partition before all leases were marked finished. This would cause the leases
partition to get recreated. I noticed this with sporadic test failures.
  • Loading branch information
John Sanda committed May 21, 2015
1 parent 83b6600 commit 18e53aa
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public boolean isFinished() {
return finished;
}

public Lease setFinished(boolean finished) {
this.finished = finished;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ public Observable<Lease> findUnfinishedLeases(DateTime timeSlice) {
// not want to poll again until there is at least one worker thread (i.e., one of
// the threads executing tasks) free.
return Observable.create(subscriber ->
loadLeases(timeSlice)
.flatMap(Observable::from)
.filter(lease -> lease.getOwner() == null)
.flatMap(lease ->
acquire(lease).map(acquired ->
acquired ? lease : null))
.subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted)
loadLeases(timeSlice)
.flatMap(Observable::from)
.filter(lease -> lease.getOwner() == null)
.flatMap(lease ->
acquire(lease).map(acquired ->
acquired ? lease : null))
.subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted)
);
}

Expand Down Expand Up @@ -149,8 +149,12 @@ public Observable<Boolean> finish(Lease lease) {
lease.getSegmentOffset(), lease.getOwner())).map(ResultSet::wasApplied);
}

public Observable<Void> deleteLeases(DateTime timeSlice) {
return rxSession.execute(queries.deleteLeases.bind(timeSlice.toDate())).flatMap(resultSet -> null);
// public Observable<Void> deleteLeases(DateTime timeSlice) {
// return rxSession.execute(queries.deleteLeases.bind(timeSlice.toDate())).flatMap(resultSet -> null);
// }

public void deleteLeases(DateTime timeSlice) {
rxSession.getSession().execute(queries.deleteLeases.bind(timeSlice.toDate()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ public void executeTasks(DateTime timeSlice) {
try {
// Execute tasks in order of task types. Once all of the tasks are executed, we delete the lease partition.
taskTypes.forEach(taskType -> executeTasks(timeSlice, taskType));
leaseService.deleteLeases(timeSlice).toBlocking().last();
// Uninterruptibles.getUninterruptibly(leaseService.deleteLeases(timeSlice));
// leaseService.deleteLeases(timeSlice).toBlocking().lastOrDefault(null);
leaseService.deleteLeases(timeSlice);
} catch (Exception e) {
logger.warn("Failed to delete lease partition for time slice " + timeSlice);
logger.warn("Failed to delete lease partition for time slice " + timeSlice, e);
}
}

Expand Down Expand Up @@ -302,10 +302,17 @@ private void executeTasks(DateTime timeSlice, TaskType taskType) {
.flatMap(this::rescheduleTask)
.map(this::deleteTaskSegment)
.flatMap(resultSet -> Observable.just(lease)))
.flatMap(lease -> leaseService.finish(lease).map(lease::setFinished))
.subscribe(
lease -> leaseService.finish(lease).doOnError(t -> logger.warn("Failed to delete " + lease, t)),
lease -> {
// TODO we eventually want to treat this as error scenario
if (!lease.isFinished()) {
logger.warn("Failed to mark " + lease + " finished");
}
},
t -> logger.warn("Task execution failed", t),
latch::countDown);
latch::countDown
);

try {
latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ public void doNotScheduleTaskHavingInvalidType() throws Exception {
@Test
public void executeTasksOfOneType() throws Exception {
DateTime timeSlice = dateTimeService.getTimeSlice(now().minusMinutes(1), standardMinutes(1));
String type = "test";
String type = "singleType-test";
int interval = 5;
int window = 15;
int segment0 = 0;
int segment1 = 1;
int segmentOffset = 0;

TaskType taskType = new TaskType().setName("test").setSegments(5).setSegmentOffsets(1);
TaskType taskType = new TaskType().setName(type).setSegments(5).setSegmentOffsets(1);

String metric1 = "metric1.5min";
String metric2 = "metric2.5min";
Expand All @@ -121,9 +121,7 @@ public void executeTasksOfOneType() throws Exception {
executedTasks.put(task1, false);
executedTasks.put(task2, false);

taskType.setFactory(() -> task -> {
executedTasks.put(task, true);
});
taskType.setFactory(() -> task -> executedTasks.put(task, true));

session.execute(queries.createTask.bind(type, timeSlice.toDate(), metric1Segment, metric1,
ImmutableSet.of("metric1"), interval, window));
Expand Down Expand Up @@ -157,8 +155,8 @@ public void executeTasksOfOneType() throws Exception {
@Test
public void executeTasksOfMultipleTypes() throws Exception {
DateTime timeSlice = dateTimeService.getTimeSlice(now().minusMinutes(1), standardMinutes(1));
String type1 = "test1";
String type2 = "test2";
String type1 = "multiType-test1";
String type2 = "multiType-test2";
int interval = 5;
int window = 15;
int segmentOffset = 0;
Expand Down Expand Up @@ -238,7 +236,6 @@ public void tryToExecuteTasksWhenAllLeasesAreAlreadyReserved() throws Exception
int segmentOffset = 0;

TaskExecutionHistory executionHistory = new TaskExecutionHistory();
// Supplier<Consumer<Task>> taskFactory = () -> task -> executionHistory.add(task.getTarget());
Supplier<Consumer<Task>> taskFactory = () -> task -> executionHistory.add(task.getTarget());

TaskType taskType1 = new TaskType().setName(type1).setSegments(5).setSegmentOffsets(1).setFactory(taskFactory);
Expand Down Expand Up @@ -293,7 +290,7 @@ public void tryToExecuteTasksWhenAllLeasesAreAlreadyReserved() throws Exception
@Test
public void executeTaskThatFails() throws Exception {
DateTime timeSlice = dateTimeService.getTimeSlice(now().minusMinutes(1), standardMinutes(1));
String type = "test";
String type = "fails-test";
TaskType taskType = new TaskType().setName(type).setSegments(1).setSegmentOffsets(1)
.setFactory(() -> task -> {
throw new RuntimeException();
Expand Down Expand Up @@ -326,7 +323,7 @@ public void executeTaskThatFails() throws Exception {
@Test
public void executeTaskThatPreviouslyFailed() throws Exception {
DateTime timeSlice = dateTimeService.getTimeSlice(now().minusMinutes(2), standardMinutes(1));
String type = "test";
String type = "previouslyFailed-test";
TaskType taskType = new TaskType().setName(type).setSegments(1).setSegmentOffsets(1);
String metric = "metric.5min";
int interval = 5;
Expand Down Expand Up @@ -366,7 +363,9 @@ private void assertTasksPartitionDeleted(String taskType, DateTime timeSlice, in
private void assertLeasePartitionDeleted(DateTime timeSlice) {
ResultSet leasesResultSet = session.execute(queries.findLeases.bind(timeSlice.toDate()));
assertTrue(leasesResultSet.isExhausted(), "Expected lease partition for time slice " + timeSlice +
" to be empty");
" to be empty but found " + StreamSupport.stream(leasesResultSet.spliterator(), false)
.map(row -> new Lease(timeSlice, row.getString(0), row.getInt(1), row.getString(2), row.getBool(3)))
.collect(toList()));
}

/**
Expand Down

0 comments on commit 18e53aa

Please sign in to comment.