Skip to content

Commit

Permalink
[7.17][Transform] Prevent stopping of transforms due to threadpool li…
Browse files Browse the repository at this point in the history
…mitation (#83539)

remove the indexer threadpool and use the generic threadpool instead(The indexer threadpool was only used on start)

fixes #81796
backport #81912
  • Loading branch information
Hendrik Muhs committed Feb 7, 2022
1 parent b3b99ee commit a57f929
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -147,7 +145,6 @@
public class Transform extends Plugin implements SystemIndexPlugin, PersistentTaskPlugin {

public static final String NAME = "transform";
public static final String TASK_THREAD_POOL_NAME = "transform_indexing";

private static final Logger logger = LogManager.getLogger(Transform.class);

Expand Down Expand Up @@ -289,17 +286,6 @@ public List<RestHandler> getRestHandlers(
);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
if (transportClientMode) {
return emptyList();
}

FixedExecutorBuilder indexing = new FixedExecutorBuilder(settingsToUse, TASK_THREAD_POOL_NAME, 4, 4, "transform.task_thread_pool");

return Collections.singletonList(indexing);
}

@Override
public Collection<Object> createComponents(
Client client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse sea

@Override
public synchronized boolean maybeTriggerAsyncJob(long now) {
// threadpool: trigger_engine_scheduler if triggered from the scheduler, generic if called from the task on start
if (context.getTaskState() == TransformTaskState.FAILED) {
logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getJobId());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public TransformPersistentTasksExecutor(
Settings settings,
IndexNameExpressionResolver resolver
) {
super(TransformField.TASK_NAME, Transform.TASK_THREAD_POOL_NAME);
super(TransformField.TASK_NAME, ThreadPool.Names.GENERIC);
this.client = client;
this.transformServices = transformServices;
this.threadPool = threadPool;
Expand All @@ -100,6 +100,13 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState
) {
/* Note:
*
* This method is executed on the _master_ node. The master and transform node might be on a different version.
* Therefore certain checks must happen on the corresponding node, e.g. the existence of the internal index.
*
* Operations on the transform node happen in {@link #nodeOperation()}
*/
if (TransformMetadata.getTransformMetadata(clusterState).isResetMode()) {
return new PersistentTasksCustomMetadata.Assignment(
null,
Expand Down Expand Up @@ -177,6 +184,12 @@ static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat

@Override
protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTaskParams params, PersistentTaskState state) {
/* Note:
*
* This method is executed on the _transform_ node. The master and transform node might be on a different version.
* Operations on master happen in {@link #getAssignment()}
*/

final String transformId = params.getId();
final TransformTask buildTask = (TransformTask) task;
// NOTE: TransformPersistentTasksExecutor#createTask pulls in the stored task state from the ClusterState when the object
Expand Down Expand Up @@ -205,6 +218,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa

// <6> load next checkpoint
ActionListener<TransformCheckpoint> getTransformNextCheckpointListener = ActionListener.wrap(nextCheckpoint -> {
// threadpool: system_read

if (nextCheckpoint.isEmpty()) {
// extra safety: reset position and progress if next checkpoint is empty
Expand All @@ -228,8 +242,9 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa

// <5> load last checkpoint
ActionListener<TransformCheckpoint> getTransformLastCheckpointListener = ActionListener.wrap(lastCheckpoint -> {
indexerBuilder.setLastCheckpoint(lastCheckpoint);
// threadpool: system_read

indexerBuilder.setLastCheckpoint(lastCheckpoint);
logger.trace("[{}] Loaded last checkpoint [{}], looking for next checkpoint", transformId, lastCheckpoint.getCheckpoint());
transformServices.getConfigManager()
.getTransformCheckpoint(transformId, lastCheckpoint.getCheckpoint() + 1, getTransformNextCheckpointListener);
Expand All @@ -244,6 +259,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
// Schedule execution regardless
ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> transformStatsActionListener = ActionListener.wrap(
stateAndStatsAndSeqNoPrimaryTermAndIndex -> {
// threadpool: system_read

TransformStoredDoc stateAndStats = stateAndStatsAndSeqNoPrimaryTermAndIndex.v1();
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = stateAndStatsAndSeqNoPrimaryTermAndIndex.v2();
// Since we have not set the value for this yet, it SHOULD be null
Expand Down Expand Up @@ -289,6 +306,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa

// <3> Validate the transform, assigning it to the indexer, and get the previous stats (if they exist)
ActionListener<TransformConfig> getTransformConfigListener = ActionListener.wrap(config -> {
// threadpool: system_read
ValidationException validationException = config.validate(null);
if (validationException == null) {
indexerBuilder.setTransformConfig(config);
Expand Down Expand Up @@ -371,9 +389,12 @@ private void startTask(
Long previousCheckpoint,
ActionListener<StartTransformAction.Response> listener
) {
buildTask.initializeIndexer(indexerBuilder);
// TransformTask#start will fail if the task state is FAILED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
// switch the threadpool to generic, because the caller is on the system_read threadpool
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
buildTask.initializeIndexer(indexerBuilder);
// TransformTask#start will fail if the task state is FAILED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
});
}

private void setNumFailureRetries(int numFailureRetries) {
Expand Down

0 comments on commit a57f929

Please sign in to comment.