New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split cluster state update tasks into roles #14899

Merged
merged 9 commits into from Nov 30, 2015

Conversation

Projects
None yet
4 participants
@jasontedor
Member

jasontedor commented Nov 20, 2015

This commit splits cluster state update tasks into roles. Those roles
are:

  • task info
  • task configuration
  • task executor
  • task listener

All tasks that have the same executor will be executed in batches. This
removes the need for local batching as was previously in
MetaDataMappingService.

Additionally, this commit reintroduces batching on mapping update calls.

Relates #13627

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/ClusterService.java Outdated
@@ -101,12 +100,16 @@
void add(@Nullable TimeValue timeout, TimeoutClusterStateListener listener);
/**
* Submits a task that will update the cluster state.
* Submits a task that will update the cluster state, using the given config. result will communicated
* to the given listener

This comment has been minimized.

@bleskes

bleskes Nov 25, 2015

Member

can we add note about the executor semantics?

This comment has been minimized.

@jasontedor

jasontedor Nov 25, 2015

Member

Added docs in cab59785023f6ba31a955421bd5f3a66f7799a96.

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/ClusterService.java Outdated
/**
* Submits a task that will update the cluster state (the task has a default priority of {@link Priority#NORMAL}).
* Submits a task that will update the cluster state;

This comment has been minimized.

@bleskes

bleskes Nov 25, 2015

Member

same here - can we add a note about the batching? i.e., not batched

This comment has been minimized.

@jasontedor

jasontedor Nov 25, 2015

Member

Added docs in cab59785023f6ba31a955421bd5f3a66f7799a96.

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskConfig.java Outdated
@Nullable
TimeValue timeout();
Priority priority();

This comment has been minimized.

@bleskes

bleskes Nov 25, 2015

Member

Java Docs?

This comment has been minimized.

@jasontedor

jasontedor Nov 25, 2015

Member

Added docs in cab59785023f6ba31a955421bd5f3a66f7799a96.

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java Outdated
return true;
}
class Result {

This comment has been minimized.

@bleskes

bleskes Nov 25, 2015

Member

we should document the relation ship between the failure list and the task list.

This comment has been minimized.

@jasontedor

jasontedor Nov 25, 2015

Member

@bleskes I agree on documentation, but I'm also pondering making it more explicit by having it be a Map<T, Throwable>. What do you think? I'm thinking something like this?

diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java
index 861b924..7a63c79 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java
@@ -18,15 +18,16 @@
  */
 package org.elasticsearch.cluster;

-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;

 public interface ClusterStateTaskExecutor<T> {
     /**
      * Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state
      * should be changed.
      */
-    Result execute(ClusterState currentState, List<T> tasks) throws Exception;
+    Result<T> execute(ClusterState currentState, List<T> tasks) throws Exception;

     /**
      * indicates whether this task should only run if current node is master
@@ -35,19 +36,19 @@ public interface ClusterStateTaskExecutor<T> {
         return true;
     }

-    class Result {
+    class Result<T> {
         final public ClusterState resultingState;
-        final public List<Throwable> failures;
+        final public Map<T, Throwable> failures;

-        public Result(ClusterState resultingState, int numberOfTasks) {
+        @SuppressWarnings("unchecked")
+        public Result(ClusterState resultingState) {
             this.resultingState = resultingState;
-            failures = Arrays.asList(new Throwable[numberOfTasks]);
+            failures = (Map<T, Throwable>)Collections.EMPTY_MAP;
         }

-        public Result(ClusterState resultingState, List<Throwable> failures) {
+        public Result(ClusterState resultingState, Map<T, Throwable> failures) {
             this.resultingState = resultingState;
             this.failures = failures;
         }
     }
-
 }
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java
index 17c4635..9fe41b7 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java
@@ -41,9 +41,9 @@ abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig,
     }

     @Override
-    final public Result execute(ClusterState currentState, List<Void> tasks) throws Exception {
+    final public Result<Void> execute(ClusterState currentState, List<Void> tasks) throws Exception {
         ClusterState result = execute(currentState);
-        return new Result(result, tasks.size());
+        return new Result<>(result);
     }

     /**
diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java
index 44e3853..ec1e9a8 100644
--- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java
@@ -81,9 +81,9 @@ public class MetaDataMappingService extends AbstractComponent {

     class RefreshTaskExecutor implements ClusterStateTaskExecutor<RefreshTask> {
         @Override
-        public Result execute(ClusterState currentState, List<RefreshTask> tasks) throws Exception {
+        public Result<RefreshTask> execute(ClusterState currentState, List<RefreshTask> tasks) throws Exception {
             ClusterState newClusterState = executeRefresh(currentState, tasks);
-            return new Result(newClusterState, tasks.size());
+            return new Result<>(newClusterState);
         }
     }

@@ -221,9 +221,9 @@ public class MetaDataMappingService extends AbstractComponent {

     class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> {
         @Override
-        public Result execute(ClusterState currentState, List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
+        public Result<PutMappingClusterStateUpdateRequest> execute(ClusterState currentState, List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
             List<String> indicesToClose = new ArrayList<>();
-            ArrayList<Throwable> failures = new ArrayList<>(tasks.size());
+            Map<PutMappingClusterStateUpdateRequest, Throwable> failures = new HashMap<>();
             try {
                 // precreate incoming indices;
                 for (PutMappingClusterStateUpdateRequest request : tasks) {
@@ -250,13 +250,12 @@ public class MetaDataMappingService extends AbstractComponent {
                 for (PutMappingClusterStateUpdateRequest request : tasks) {
                     try {
                         currentState = applyRequest(currentState, request);
-                        failures.add(null);
                     } catch (Throwable t) {
-                        failures.add(t);
+                        failures.put(request, t);
                     }
                 }

-                return new Result(currentState, failures);
+                return new Result<>(currentState, failures);
             } finally {
                 for (String index : indicesToClose) {
                     indicesService.removeIndex(index, "created for mapping processing");
diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
index c97c55d..e32ec30 100644
--- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
@@ -395,7 +395,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
             toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
             return;
         }
-        ClusterStateTaskExecutor.Result result;
+        ClusterStateTaskExecutor.Result<T> result;
         long startTimeNS = System.nanoTime();
         try {
             List<T> inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
@@ -410,20 +410,20 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
                 logger.trace(sb.toString(), e);
             }
             warnAboutSlowTaskIfNeeded(executionTime, source);
-            result = new ClusterStateTaskExecutor.Result(previousClusterState, Collections.nCopies(toExecute.size(), e));
+            Map<T, Throwable> failures =
+                    toExecute.stream().collect(Collectors.toMap(updateTask -> updateTask.task, updateTask -> e));
+            result = new ClusterStateTaskExecutor.Result<>(previousClusterState, failures);
         }
-        assert result.failures.size() == toExecute.size();

         ClusterState newClusterState = result.resultingState;
         final ArrayList<UpdateTask<T>> proccessedListeners = new ArrayList<>();
         // fail all tasks that have failed and extract those that are waiting for results
-        for (int i = 0; i < toExecute.size(); i++) {
-            final UpdateTask<T> task = toExecute.get(i);
-            final Throwable failure = result.failures.get(i);
+        for (UpdateTask<T> updateTask : toExecute) {
+            final Throwable failure = result.failures.get(updateTask.task);
             if (failure == null) {
-                proccessedListeners.add(task);
+                proccessedListeners.add(updateTask);
             } else {
-                task.listener.onFailure(task.source, failure);
+                updateTask.listener.onFailure(updateTask.source, failure);
             }
         }

diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java
index 60e7fb2..7de963f 100644
--- a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java
@@ -755,10 +755,10 @@ public class ClusterServiceIT extends ESIntegTestCase {
             private AtomicInteger counter = new AtomicInteger();

             @Override
-            public Result execute(ClusterState currentState, List<Task> tasks) throws Exception {
+            public Result<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
                 tasks.forEach(task -> task.execute());
                 counter.addAndGet(tasks.size());
-                return new Result(currentState, tasks.size());
+                return new Result<>(currentState);
             }

             @Override
diff --git a/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java b/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java
index 3845a71..5225228 100644
--- a/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java
+++ b/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java
@@ -191,12 +191,12 @@ public class TestClusterService implements ClusterService {
             logger.debug("failed [{}], no longer master", source);
             return;
         }
-        ClusterStateTaskExecutor.Result result;
+        ClusterStateTaskExecutor.Result<T> result;
         ClusterState previousClusterState = state;
         try {
             result = executor.execute(previousClusterState, Arrays.asList(task));
         } catch (Exception e) {
-            result = new ClusterStateTaskExecutor.Result(previousClusterState, Arrays.asList(e));
+            result = new ClusterStateTaskExecutor.Result<>(previousClusterState, Collections.singletonMap(task, e));
         }
         if (result.failures.get(0) != null) {
             listener.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]",
@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java Outdated
}
if (config.timeout() != null) {
updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), new Runnable() {

This comment has been minimized.

@bleskes

bleskes Nov 25, 2015

Member

since it seem we are going all gang-ho on java8 , rename the runnable to lambdas? :)

This comment has been minimized.

@jasontedor

jasontedor Nov 25, 2015

Member

Done in a7a07c77c50220dd69b60bc232cd799f33490fe3. :)

@ywelsch

View changes

core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java Outdated
* called when the task was rejected because the local node is no longer master
*/
default void onNoLongerMaster(String source) {
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));

This comment has been minimized.

@ywelsch

ywelsch Nov 25, 2015

Contributor

should be NotMasterException? (see method that was removed from class ClusterStateUpdateTask)

This comment has been minimized.

@jasontedor

jasontedor Nov 25, 2015

Member

Agree. I pushed ffe83934c3014528bada23481bec0fb41482f7af.

@ywelsch

View changes

core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java Outdated
synchronized (updateTasksPerExecutor) {
List<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
if (pending != null) {
for (Iterator<UpdateTask> iter = pending.iterator(); iter.hasNext(); ) {

This comment has been minimized.

@ywelsch

ywelsch Nov 25, 2015

Contributor

nit: why the iterator? good old for (UpdateTask task : pending)

This comment has been minimized.

@jasontedor

jasontedor Nov 25, 2015

Member

@ywelsch Thanks. Addressed in 2b36991a228273329fa83993e0eff646fe2adf69.

@ywelsch

View changes

core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java Outdated
for (UpdateTask<T> updateTask : toExecute) {
final Throwable failure = result.failures.get(updateTask.task);
if (failure == null) {
proccessedListeners.add(updateTask);

This comment has been minimized.

@ywelsch

ywelsch Nov 25, 2015

Contributor

nit: proccessed -> processed

@ywelsch

This comment has been minimized.

Contributor

ywelsch commented Nov 25, 2015

Oops, I got confused with the Github interface (commenting on commit instead of PR). I like the change of associating failures with their originating task using a map instead of position 👍.

@jasontedor

This comment has been minimized.

Member

jasontedor commented Nov 25, 2015

@ywelsch I liked your suggestion a lot so I pushed 66af98ea5b0a925e9c3dc55465c8aceb91813b6f.

@clintongormley clintongormley added :Cluster and removed :Internal labels Nov 28, 2015

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskConfig.java Outdated
*/
public interface ClusterStateTaskConfig {
/**

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

left over?

This comment has been minimized.

@jasontedor

jasontedor Nov 30, 2015

Member

Fixed in ff01dc904a13870bec4d11e7c9260940cbc6e6c4.

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java Outdated
* Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task
*/
class Result<T> {

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

Maybe call this BatchResult and the other one TaskResult? looking for brevity..

This comment has been minimized.

@jasontedor

jasontedor Nov 30, 2015

Member

Renamed in d1fbf76159b239a360c02b1ec983b2c56e6ab88e.

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java Outdated
default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
;

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

left over..

This comment has been minimized.

@jasontedor

jasontedor Nov 30, 2015

Member

Fixed in ff01dc904a13870bec4d11e7c9260940cbc6e6c4.

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java Outdated
/**
* A task that can update the cluster state.
*/
abstract public class ClusterStateUpdateTask {
abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<Object>, ClusterStateTaskListener {

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

I kind of like the usage of Void as a parameter here. Didn't it work out?

This comment has been minimized.

@jasontedor

jasontedor Nov 30, 2015

Member

@bleskes Because of the change in e4c6bd4a12ef824a2944e85b1d2b0f56adf9addf to more explicitly associate tasks with their execution results, tasks that pass through ClusterService#submitStateUpdateTask(String, ClusterStateUpdateTask) and previously delegated to ClusterService#submitStateUpdateTask(String, Object, config, executor, listener) by passing null for the task now need to pass something not null. As Void is uninstantiable, I changed it to Object. However, I'm thinking that c19c30b7a8e4315f61900b8854cf7dbba9c62d2f might be better. What do you think?

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

awesome. yes.

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java Outdated
public final ClusterStateUpdateTask updateTask;
assert result.executionResults != null;

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

can we also assert the size is equal to toExecute.size?

This comment has been minimized.

@jasontedor

jasontedor Nov 30, 2015

Member

Added in d1fbf76159b239a360c02b1ec983b2c56e6ab88e.

@bleskes

View changes

core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java Outdated
assert result.executionResults.containsKey(updateTask.task) : "missing " + updateTask.task.toString();
final ClusterStateTaskExecutor.ClusterStateTaskExecutionResult executionResult =
result.executionResults.get(updateTask.task);
executionResult.handle(() -> proccessedListeners.add(updateTask), ex -> updateTask.listener.onFailure(updateTask.source, ex));

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

a question of taste, but this feels a bit too fancy for it's own good. Clearer to read with if (result.getFailuire() != null) .. not a big deal.

return false;
}
}
int numberOfThreads = randomIntBetween(2, 256);

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

I think we need to reduce this... we don't need so many. 10 is enough imo

This comment has been minimized.

@jasontedor

jasontedor Nov 30, 2015

Member

Yeah; leftover from when I was using a poor man's approach to test the impact of contention on the intrinsic lock onInternalClusterService.updateTasksPerExecutor. Fixed in 7172e972e4d821048c4278117a8425f357fccc4c.

This comment has been minimized.

@jasontedor

jasontedor Dec 4, 2015

Member

This was lost on squash but I've integrated into master now in bbef8ac.

counts.merge(executor, 1, (previous, one) -> previous + one);
}
CountDownLatch startingGun = new CountDownLatch(1 + numberOfThreads);

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

use a barrier?

This comment has been minimized.

@jasontedor

jasontedor Nov 30, 2015

Member

A CyclicBarrier? I'm not seeing the difference, other than I think CyclicBarrier is a little more complicated than a CountDownLatch?

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

CyclingBarrier yeah - it actually guarantees the threads will start together. Now the main thread can release waiting threads before the rest are started, but not a biggy.

}
// wait until all the cluster state updates have been processed
latch.await();

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

can we cal it something less generic than latch?

This comment has been minimized.

@jasontedor

jasontedor Nov 30, 2015

Member

Fixed in 7172e972e4d821048c4278117a8425f357fccc4c.

This comment has been minimized.

@jasontedor

jasontedor Dec 4, 2015

Member

Lost on squash but integrated into master now in bbef8ac.

clusterService.submitStateUpdateTask(
Thread.currentThread().getName(),
new Task(),
ClusterStateTaskConfig.build(Priority.NORMAL),

This comment has been minimized.

@bleskes

bleskes Nov 30, 2015

Member

randomize?

This comment has been minimized.

@jasontedor

jasontedor Nov 30, 2015

Member

Fixed in 7172e972e4d821048c4278117a8425f357fccc4c.

This comment has been minimized.

@jasontedor

jasontedor Dec 4, 2015

Member

This was lost on squash but has been integrated into master now in bbef8ac.

@bleskes

This comment has been minimized.

Member

bleskes commented Nov 30, 2015

Good job. LGTM. Left some minor comments. no need for another review.

bleskes and others added some commits Oct 23, 2015

@jasontedor jasontedor removed the review label Nov 30, 2015

jasontedor added a commit that referenced this pull request Nov 30, 2015

Merge pull request #14899 from jasontedor/cluster-state-batch
Split cluster state update tasks into roles

@jasontedor jasontedor merged commit c4a2298 into elastic:master Nov 30, 2015

1 check passed

CLA Commit author is a member of Elasticsearch
Details

@jasontedor jasontedor deleted the jasontedor:cluster-state-batch branch Nov 30, 2015

jasontedor added a commit that referenced this pull request Dec 4, 2015

Cleanup ClusterServiceIT#testClusterStateBatchedUpdates
This commit addresses some issues that arose during the review of #14899
but were lost during squash while integrating into master.
 - the number of test threads is dropped to at most eight
 - a local variable is renamed for clarity
 - task priorities are randomized
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment