Skip to content

Commit

Permalink
[Transform] Scheduler concurrency fix (#89716) (#89753)
Browse files Browse the repository at this point in the history
fix concurrency issue in transform scheduler, by combining the empty check
and the task getter

fixes #88991
  • Loading branch information
Hendrik Muhs committed Aug 31, 2022
1 parent 5270bc5 commit c770958
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 21 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/89716.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 89716
summary: Scheduler concurrency fix
area: Transform
type: bug
issues:
- 88991
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,14 @@ class TransformScheduledTaskQueue {
}

/**
* @return whether the queue is empty.
*/
public synchronized boolean isEmpty() {
return tasks.isEmpty();
}

/**
* @return the task with the *lowest* priority.
* @return the task with the *lowest* priority or null if the queue is empty.
*/
public synchronized TransformScheduledTask first() {
// gh#88991 concurrent access: the empty check must run within the synchronized context
if (tasks.isEmpty()) {
return null;
}

return tasks.first();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,14 @@ void processScheduledTasks() {
}

private boolean processScheduledTasksInternal() {
if (scheduledTasks.isEmpty()) {
TransformScheduledTask scheduledTask = scheduledTasks.first();

if (scheduledTask == null) {
// There are no scheduled tasks, hence, nothing to do
return false;
}
long currentTimeMillis = clock.millis();
TransformScheduledTask scheduledTask = scheduledTasks.first();

// Check if the task is eligible for processing
if (currentTimeMillis < scheduledTask.getNextScheduledTimeMillis()) {
// It is too early to process this task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
Expand All @@ -27,7 +26,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class TransformScheduledTaskQueueTests extends ESTestCase {

Expand All @@ -52,14 +52,14 @@ public void testEmptyQueue() {

public void testNonEmptyQueue() {
queue.add(createTask("task-1", 5));
assertThat(queue.isEmpty(), is(false));
assertThat(queue.first(), is(notNullValue()));
}

public void testAddAndRemove() {
queue.add(createTask("task-1", 5));
queue.add(createTask("task-2", 1));
queue.add(createTask("task-3", 9));
assertThat(queue.isEmpty(), is(false));
assertThat(queue.first(), is(notNullValue()));
assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2", "task-3"));
assertThat(queue.first(), is(equalTo(createTask("task-2", 1))));

Expand All @@ -83,7 +83,7 @@ public void testConcurrentAddAndRemove() throws Exception {
assertThat(taskAdded, is(true));
}
}
assertThat(queue.isEmpty(), is(false));
assertThat(queue.first(), is(notNullValue()));
assertThat(queue.getTransformIds(), hasSize(100));

{
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testRemoveNoOp() {
queue.add(createTask("task-1", 5));
queue.remove("task-non-existent");
// Verify that the remove operation had no effect
assertThat(queue.isEmpty(), is(false));
assertThat(queue.first(), is(notNullValue()));
assertThat(queue.getTransformIds(), containsInAnyOrder("task-1"));
assertThat(queue.first(), is(equalTo(createTask("task-1", 5))));
}
Expand All @@ -126,7 +126,7 @@ public void testUpdateNoOp() {
queue.add(createTask("task-1", 5));
queue.update("task-non-existent", task -> createTask(task.getTransformId(), -999));
// Verify that the update operation had no effect
assertThat(queue.isEmpty(), is(false));
assertThat(queue.first(), is(notNullValue()));
assertThat(queue.getTransformIds(), containsInAnyOrder("task-1"));
assertThat(queue.first(), is(equalTo(createTask("task-1", 5))));
}
Expand All @@ -147,15 +147,15 @@ public void testRemoveAll() {
queue.add(createTask("task-7", 0));
queue.add(createTask("task-8", 2));
queue.add(createTask("task-9", 4));
assertThat(queue.isEmpty(), is(false));
assertThat(queue.first(), is(notNullValue()));
assertThat(
queue.getTransformIds(),
containsInAnyOrder("task-1", "task-2", "task-3", "task-4", "task-5", "task-6", "task-7", "task-8", "task-9")
);
assertThat(queue.first(), is(equalTo(createTask("task-7", 0))));

List<TransformScheduledTask> tasksByPriority = new ArrayList<>();
while (queue.isEmpty() == false) {
while (queue.first() != null) {
TransformScheduledTask task = queue.first();
tasksByPriority.add(task);
queue.remove(task.getTransformId());
Expand Down Expand Up @@ -210,8 +210,7 @@ private static void failUnexpectedCall(Event event) {
}

private void assertThatQueueIsEmpty() {
assertThat(queue.isEmpty(), is(true));
assertThat(queue.first(), is(nullValue()));
assertThat(queue.getTransformIds(), is(empty()));
expectThrows(NoSuchElementException.class, () -> queue.first());
}
}

0 comments on commit c770958

Please sign in to comment.