Skip to content

Commit

Permalink
Allow configuring minion task timeout in the PinotTaskGenerator (#5317)
Browse files Browse the repository at this point in the history
The default task timeout is set to 1 hour. For certain expensive tasks, this might not be enough.
Making it configurable for each task type
  • Loading branch information
Jackie-Jiang committed Apr 30, 2020
1 parent 4c6050f commit 6a1565c
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,26 @@ public synchronized TaskState getTaskQueueState(String taskType) {
* Submit a list of child tasks with same task type to the Minion instances with the default tag.
*
* @param pinotTaskConfigs List of child task configs to be submitted
* @param taskTimeoutMs Timeout in milliseconds for each task
* @param numConcurrentTasksPerInstance Maximum number of concurrent tasks allowed per instance
* @return Name of the submitted parent task
*/
public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, int numConcurrentTasksPerInstance) {
return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE, numConcurrentTasksPerInstance);
public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, long taskTimeoutMs,
int numConcurrentTasksPerInstance) {
return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE, taskTimeoutMs, numConcurrentTasksPerInstance);
}

/**
* Submit a list of child tasks with same task type to the Minion instances with the given tag.
*
* @param pinotTaskConfigs List of child task configs to be submitted
* @param minionInstanceTag Tag of the Minion instances to submit the task to
* @param taskTimeoutMs Timeout in milliseconds for each task
* @param numConcurrentTasksPerInstance Maximum number of concurrent tasks allowed per instance
* @return Name of the submitted parent task
*/
public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, String minionInstanceTag,
int numConcurrentTasksPerInstance) {
long taskTimeoutMs, int numConcurrentTasksPerInstance) {
int numChildTasks = pinotTaskConfigs.size();
Preconditions.checkState(numChildTasks > 0);
Preconditions.checkState(numConcurrentTasksPerInstance > 0);
Expand All @@ -212,8 +215,8 @@ public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, St
// don't want one task failure affects other tasks. Also, if one task failed, next time we will re-schedule it
JobConfig.Builder jobBuilder =
new JobConfig.Builder().addTaskConfigs(helixTaskConfigs).setInstanceGroupTag(minionInstanceTag)
.setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance).setIgnoreDependentJobFailure(true)
.setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
.setTimeoutPerTask(taskTimeoutMs).setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance)
.setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
_taskDriver.enqueueJob(getHelixJobQueueName(taskType), parentTaskName, jobBuilder);

// Wait until task state is available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWi
LOGGER
.info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType, pinotTaskConfigs);
tasksScheduled.put(taskType, _helixTaskResourceManager
.submitTask(pinotTaskConfigs, pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
.submitTask(pinotTaskConfigs, pinotTaskGenerator.getTaskTimeoutMs(),
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.Segment;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
Expand All @@ -46,15 +45,13 @@ public ConvertToRawIndexTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
_clusterInfoProvider = clusterInfoProvider;
}

@Nonnull
@Override
public String getTaskType() {
return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
}

@Nonnull
@Override
public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs) {
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();

// Get the segments that are being converted so that we don't submit them again
Expand All @@ -80,7 +77,7 @@ public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfi
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
tableMaxNumTasks = Integer.valueOf(tableMaxNumTasksConfig);
tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
tableMaxNumTasks = Integer.MAX_VALUE;
}
Expand Down Expand Up @@ -127,13 +124,4 @@ public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfi

return pinotTaskConfigs;
}

@Override
public int getNumConcurrentTasksPerInstance() {
return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
}

@Override
public void nonLeaderCleanUp() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pinot.controller.helix.core.minion.generator;

import java.util.List;
import javax.annotation.Nonnull;
import org.apache.helix.task.JobConfig;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -29,33 +28,42 @@
* The interface <code>PinotTaskGenerator</code> defines the APIs for task generators.
*/
public interface PinotTaskGenerator {
int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;

/**
* Returns the task type of the generator.
*
* @return Task type of the generator
*/
@Nonnull
String getTaskType();

/**
* Generates a list of tasks to schedule based on the given table configs.
*
* @return List of tasks to schedule
*/
@Nonnull
List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs);
List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);

/**
* Returns the maximum number of concurrent tasks allowed per instance.
* Returns the timeout in milliseconds for each task, 3600000 (1 hour) by default.
*
* @return Timeout in milliseconds for each task.
*/
default long getTaskTimeoutMs() {
return JobConfig.DEFAULT_TIMEOUT_PER_TASK;
}

/**
* Returns the maximum number of concurrent tasks allowed per instance, 1 by default.
*
* @return Maximum number of concurrent tasks allowed per instance
*/
int getNumConcurrentTasksPerInstance();
default int getNumConcurrentTasksPerInstance() {
return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
}

/**
* Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
*/
void nonLeaderCleanUp();
default void nonLeaderCleanUp() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,13 @@ public TestTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
_clusterInfoProvider = clusterInfoProvider;
}

@Nonnull
@Override
public String getTaskType() {
return TASK_TYPE;
}

@Nonnull
@Override
public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs) {
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
assertEquals(tableConfigs.size(), 2);

// Generate at most 2 tasks
Expand All @@ -230,15 +228,6 @@ public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfi
}
return taskConfigs;
}

@Override
public int getNumConcurrentTasksPerInstance() {
return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
}

@Override
public void nonLeaderCleanUp() {
}
}

public static class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
Expand Down

0 comments on commit 6a1565c

Please sign in to comment.