Skip to content

Commit

Permalink
Make all ClusterState Task Submits Specify an Executor (#82524)
Browse files Browse the repository at this point in the history
Make all cluster state task submits specify an executor to
make it clear that a task is unbatched when it is.
  • Loading branch information
original-brownbear committed Jan 14, 2022
1 parent 75579f7 commit f19635b
Show file tree
Hide file tree
Showing 120 changed files with 469 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -59,7 +60,7 @@ public void onFailure(String source, Exception e) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
future.onResponse(null);
}
});
}, ClusterStateTaskExecutor.unbatched());
future.actionGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -102,7 +103,7 @@ public TimeValue masterNodeTimeout() {
public ClusterState execute(ClusterState currentState) throws Exception {
return transformationFn.apply(currentState);
}
});
}, ClusterStateTaskExecutor.unbatched());

future.actionGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (keepSubmittingTasks.get()) {
clusterService.submitStateUpdateTask("looping task", this);
clusterService.submitStateUpdateTask("looping task", this, ClusterStateTaskExecutor.unbatched());
} else {
completionFuture.onResponse(null);
}
}
});
}, ClusterStateTaskExecutor.unbatched());

try {
createIndex("index");
Expand Down Expand Up @@ -392,12 +392,12 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (keepSubmittingTasks.get()) {
clusterService.submitStateUpdateTask("looping task", this);
clusterService.submitStateUpdateTask("looping task", this, ClusterStateTaskExecutor.unbatched());
} else {
completionFuture.onResponse(null);
}
}
});
}, ClusterStateTaskExecutor.unbatched());

try {
final ClusterHealthResponse clusterHealthResponse = client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public void onFailure(String source, Exception e) {
failure.set(e);
latch.countDown();
}
});
}, ClusterStateTaskExecutor.unbatched());

logger.debug("--> waiting for cluster state to be processed/rejected");
latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ public void onFailure(String source, Exception e) {
throw new AssertionError(e);
}

});
}, ClusterStateTaskExecutor.unbatched());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -108,7 +109,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {}
});
}, ClusterStateTaskExecutor.unbatched());
ensureGreen(index);
// remove the extra node
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
Expand All @@ -123,7 +124,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {

@Override
public void onFailure(String source, Exception e) {}
});
}, ClusterStateTaskExecutor.unbatched());
}

private <Req extends ActionRequest, Res extends ActionResponse> ActionFuture<Res> executeAndCancelCommittedPublication(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -84,7 +85,8 @@ public void onFailure(String source, Exception e) {
onFailure.set(true);
latch.countDown();
}
}
},
ClusterStateTaskExecutor.unbatched()
);

ensureGreen();
Expand Down Expand Up @@ -140,7 +142,8 @@ public void onFailure(String source, Exception e) {
onFailure.set(true);
latch.countDown();
}
}
},
ClusterStateTaskExecutor.unbatched()
);

ensureGreen();
Expand Down Expand Up @@ -199,7 +202,8 @@ public void onFailure(String source, Exception e) {
onFailure.set(true);
latch.countDown();
}
}
},
ClusterStateTaskExecutor.unbatched()
);

ensureGreen();
Expand Down Expand Up @@ -258,7 +262,8 @@ public void onFailure(String source, Exception e) {
onFailure.set(true);
latch.countDown();
}
}
},
ClusterStateTaskExecutor.unbatched()
);

ensureGreen();
Expand Down Expand Up @@ -296,7 +301,7 @@ public void onFailure(String source, Exception e) {
invoked1.countDown();
fail();
}
});
}, ClusterStateTaskExecutor.unbatched());
invoked1.await();
final CountDownLatch invoked2 = new CountDownLatch(9);
for (int i = 2; i <= 10; i++) {
Expand All @@ -315,7 +320,7 @@ public void onFailure(String source, Exception e) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
invoked2.countDown();
}
});
}, ClusterStateTaskExecutor.unbatched());
}

// there might be other tasks in this node, make sure to only take the ones we add into account in this test
Expand Down Expand Up @@ -366,7 +371,7 @@ public void onFailure(String source, Exception e) {
invoked3.countDown();
fail();
}
});
}, ClusterStateTaskExecutor.unbatched());
invoked3.await();

for (int i = 2; i <= 5; i++) {
Expand All @@ -380,7 +385,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Exception e) {
fail();
}
});
}, ClusterStateTaskExecutor.unbatched());
}
Thread.sleep(100);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.FollowersChecker;
Expand Down Expand Up @@ -244,7 +245,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("failure [{}]", source), e);
}
});
}, ClusterStateTaskExecutor.unbatched());

// Save the new elected master node
final String newMasterNode = internalCluster().getMasterName(majoritySide.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -158,7 +159,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
public void onFailure(String source, Exception e) {
throw new AssertionError("unexpected", e);
}
});
}, ClusterStateTaskExecutor.unbatched());

masterBlockedLatch.await();
final IndexRequestBuilder indexRequestBuilder = client().prepareIndex("index")
Expand Down Expand Up @@ -195,7 +196,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
public void onFailure(String source, Exception e) {
throw new AssertionError("unexpected", e);
}
});
}, ClusterStateTaskExecutor.unbatched());

masterBlockedLatch.await();
final IndexRequestBuilder indexRequestBuilder = client().prepareIndex("index").setId("2").setSource("field2", "value2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateObserver.Listener;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -173,7 +174,7 @@ public void onTimeout(TimeValue timeout) {
observer.waitForNextChange(clusterStateListener, allNodesRemoved);
}
}
});
}, ClusterStateTaskExecutor.unbatched());
}

private static Set<VotingConfigExclusion> resolveVotingConfigExclusionsAndCheckMaximum(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateObserver.Listener;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -152,7 +153,8 @@ public void onFailure(String source, Exception e) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(ActionResponse.Empty.INSTANCE);
}
}
},
ClusterStateTaskExecutor.unbatched()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalClusterUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
Expand Down Expand Up @@ -108,34 +109,36 @@ private void waitForEventsAndExecuteHealth(
) {
assert request.waitForEvents() != null;
if (request.local()) {
clusterService.submitStateUpdateTask(
"cluster_health (wait_for_events [" + request.waitForEvents() + "])",
new LocalClusterUpdateTask(request.waitForEvents()) {
@Override
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
return unchanged();
}
var updateTask = new LocalClusterUpdateTask(request.waitForEvents()) {
@Override
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
return unchanged();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
request.timeout(newTimeout);
executeHealth(
request,
clusterService.state(),
listener,
waitCount,
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis)
);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
request.timeout(newTimeout);
executeHealth(
request,
clusterService.state(),
listener,
waitCount,
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis)
);
}

@Override
public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
listener.onFailure(e);
}
@Override
public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
listener.onFailure(e);
}
};
clusterService.submitStateUpdateTask(
"cluster_health (wait_for_events [" + request.waitForEvents() + "])",
updateTask,
updateTask
);
} else {
final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()));
Expand Down Expand Up @@ -186,7 +189,8 @@ public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
}
}
},
ClusterStateTaskExecutor.unbatched()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
Expand Down Expand Up @@ -120,7 +121,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
public void onFailure(String source, Exception e) {
logger.warn("Failed to remove repository cleanup task [{}] from cluster state", repositoryCleanupInProgress);
}
}
},
ClusterStateTaskExecutor.unbatched()
);
}
});
Expand Down Expand Up @@ -299,10 +301,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
listener.onFailure(failure);
}
}
}
},
ClusterStateTaskExecutor.unbatched()
);
}
}
},
ClusterStateTaskExecutor.unbatched()
);
}, listener::onFailure);
}
Expand Down

0 comments on commit f19635b

Please sign in to comment.