Skip to content

Commit

Permalink
Extend tests for master service batching (#83300)
Browse files Browse the repository at this point in the history
Adds a test to demonstrate that multiple submissions of tasks are
batched together when they share an executor.

Co-authored-by: Ievgen Degtiarenko <ievgen.degtiarenko@gmail.com>
  • Loading branch information
DaveCTurner and idegtiarenko committed Jan 31, 2022
1 parent 3d0655f commit c2162a1
Showing 1 changed file with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class MasterServiceTests extends ESTestCase {

Expand Down Expand Up @@ -463,6 +465,118 @@ public void onFailure(Exception e) {
}
}

public void testMultipleSubmissionBatching() throws Exception {

class Task implements ClusterStateTaskListener {
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
}

final int executorCount = between(1, 5);
final var executionCountDown = new CountDownLatch(executorCount);

class Executor implements ClusterStateTaskExecutor<Task> {

final AtomicBoolean executed = new AtomicBoolean();

int expectedTaskCount;

public void addExpectedTaskCount(int taskCount) {
expectedTaskCount += taskCount;
}

@Override
public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
assertTrue("Should execute all tasks at once", executed.compareAndSet(false, true));
assertThat("Should execute all tasks at once", tasks.size(), equalTo(expectedTaskCount));
executionCountDown.countDown();
return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
}
}

final var executors = new Executor[executorCount];
for (int i = 0; i < executors.length; i++) {
executors[i] = new Executor();
}

try (var masterService = createMasterService(true)) {

final var executionBarrier = new CyclicBarrier(2);

masterService.submitStateUpdateTask(
"block",
new Task(),
ClusterStateTaskConfig.build(Priority.NORMAL),
(currentState, tasks) -> {
executionBarrier.await(10, TimeUnit.SECONDS); // notify test thread that the master service is blocked
executionBarrier.await(10, TimeUnit.SECONDS); // wait for test thread to release us
return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
}
);

executionBarrier.await(10, TimeUnit.SECONDS); // wait for the master service to be blocked

final var submissionLatch = new CountDownLatch(1);

final var submitThreads = new Thread[between(1, 10)];
for (int i = 0; i < submitThreads.length; i++) {
final var executor = randomFrom(executors);
final var tasks = randomList(1, 10, Task::new);
executor.addExpectedTaskCount(tasks.size());
submitThreads[i] = new Thread(() -> {
try {
assertTrue(submissionLatch.await(10, TimeUnit.SECONDS));
masterService.submitStateUpdateTasks(
Thread.currentThread().getName(),
tasks,
ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor
);
} catch (InterruptedException e) {
throw new AssertionError(e);
}

}, "submit-thread-" + i);
}

for (var executor : executors) {
if (executor.expectedTaskCount == 0) {
executionCountDown.countDown();
}
}

for (var submitThread : submitThreads) {
submitThread.start();
}

submissionLatch.countDown();

for (var submitThread : submitThreads) {
submitThread.join();
}

for (var executor : executors) {
assertFalse(executor.executed.get());
}

assertThat(masterService.numberOfPendingTasks(), equalTo(submitThreads.length + 1));
final var sources = masterService.pendingTasks().stream().map(t -> t.getSource().string()).collect(Collectors.toSet());
assertThat(sources, hasSize(submitThreads.length + 1));
assertTrue(sources.contains("block"));
for (int i = 0; i < submitThreads.length; i++) {
assertTrue("submit-thread-" + i, sources.contains("submit-thread-" + i));
}

executionBarrier.await(10, TimeUnit.SECONDS); // release block on master service
assertTrue(executionCountDown.await(10, TimeUnit.SECONDS));
for (var executor : executors) {
assertTrue(executor.executed.get() != (executor.expectedTaskCount == 0));
}
}
}

public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException {

AtomicInteger executedTasks = new AtomicInteger();
Expand Down

0 comments on commit c2162a1

Please sign in to comment.