Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuring minion task timeout in the PinotTaskGenerator #5317

Merged
merged 1 commit into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,26 @@ public synchronized TaskState getTaskQueueState(String taskType) {
* Submit a list of child tasks with same task type to the Minion instances with the default tag.
*
* @param pinotTaskConfigs List of child task configs to be submitted
* @param taskTimeoutMs Timeout in milliseconds for each task
* @param numConcurrentTasksPerInstance Maximum number of concurrent tasks allowed per instance
* @return Name of the submitted parent task
*/
public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, int numConcurrentTasksPerInstance) {
return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE, numConcurrentTasksPerInstance);
public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, long taskTimeoutMs,
int numConcurrentTasksPerInstance) {
return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE, taskTimeoutMs, numConcurrentTasksPerInstance);
}

/**
* Submit a list of child tasks with same task type to the Minion instances with the given tag.
*
* @param pinotTaskConfigs List of child task configs to be submitted
* @param minionInstanceTag Tag of the Minion instances to submit the task to
* @param taskTimeoutMs Timeout in milliseconds for each task
* @param numConcurrentTasksPerInstance Maximum number of concurrent tasks allowed per instance
* @return Name of the submitted parent task
*/
public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, String minionInstanceTag,
int numConcurrentTasksPerInstance) {
long taskTimeoutMs, int numConcurrentTasksPerInstance) {
int numChildTasks = pinotTaskConfigs.size();
Preconditions.checkState(numChildTasks > 0);
Preconditions.checkState(numConcurrentTasksPerInstance > 0);
Expand All @@ -212,8 +215,8 @@ public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, St
// don't want one task failure affects other tasks. Also, if one task failed, next time we will re-schedule it
JobConfig.Builder jobBuilder =
new JobConfig.Builder().addTaskConfigs(helixTaskConfigs).setInstanceGroupTag(minionInstanceTag)
.setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance).setIgnoreDependentJobFailure(true)
.setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
.setTimeoutPerTask(taskTimeoutMs).setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance)
.setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
_taskDriver.enqueueJob(getHelixJobQueueName(taskType), parentTaskName, jobBuilder);

// Wait until task state is available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWi
LOGGER
.info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType, pinotTaskConfigs);
tasksScheduled.put(taskType, _helixTaskResourceManager
.submitTask(pinotTaskConfigs, pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
.submitTask(pinotTaskConfigs, pinotTaskGenerator.getTaskTimeoutMs(),
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.Segment;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
Expand All @@ -46,15 +45,13 @@ public ConvertToRawIndexTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
_clusterInfoProvider = clusterInfoProvider;
}

@Nonnull
@Override
public String getTaskType() {
return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
}

@Nonnull
@Override
public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs) {
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();

// Get the segments that are being converted so that we don't submit them again
Expand All @@ -80,7 +77,7 @@ public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfi
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
tableMaxNumTasks = Integer.valueOf(tableMaxNumTasksConfig);
tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
tableMaxNumTasks = Integer.MAX_VALUE;
}
Expand Down Expand Up @@ -127,13 +124,4 @@ public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfi

return pinotTaskConfigs;
}

@Override
public int getNumConcurrentTasksPerInstance() {
return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
}

@Override
public void nonLeaderCleanUp() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pinot.controller.helix.core.minion.generator;

import java.util.List;
import javax.annotation.Nonnull;
import org.apache.helix.task.JobConfig;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -29,33 +28,42 @@
* The interface <code>PinotTaskGenerator</code> defines the APIs for task generators.
*/
public interface PinotTaskGenerator {
int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;

/**
* Returns the task type of the generator.
*
* @return Task type of the generator
*/
@Nonnull
String getTaskType();

/**
* Generates a list of tasks to schedule based on the given table configs.
*
* @return List of tasks to schedule
*/
@Nonnull
List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs);
List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);

/**
* Returns the maximum number of concurrent tasks allowed per instance.
* Returns the timeout in milliseconds for each task, 3600000 (1 hour) by default.
*
* @return Timeout in milliseconds for each task.
*/
default long getTaskTimeoutMs() {
return JobConfig.DEFAULT_TIMEOUT_PER_TASK;
}

/**
* Returns the maximum number of concurrent tasks allowed per instance, 1 by default.
*
* @return Maximum number of concurrent tasks allowed per instance
*/
int getNumConcurrentTasksPerInstance();
default int getNumConcurrentTasksPerInstance() {
return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
}

/**
* Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
*/
void nonLeaderCleanUp();
default void nonLeaderCleanUp() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,13 @@ public TestTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
_clusterInfoProvider = clusterInfoProvider;
}

@Nonnull
@Override
public String getTaskType() {
return TASK_TYPE;
}

@Nonnull
@Override
public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs) {
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
assertEquals(tableConfigs.size(), 2);

// Generate at most 2 tasks
Expand All @@ -230,15 +228,6 @@ public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfi
}
return taskConfigs;
}

@Override
public int getNumConcurrentTasksPerInstance() {
return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
}

@Override
public void nonLeaderCleanUp() {
}
}

public static class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
Expand Down