Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -80,12 +81,15 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.jersey.server.ManagedAsync;
Expand Down Expand Up @@ -646,29 +650,26 @@ public Map<String, String> scheduleTasks(
Map<String, String> response = new HashMap<>();
List<String> generationErrors = new ArrayList<>();
List<String> schedulingErrors = new ArrayList<>();
TaskSchedulingContext context = new TaskSchedulingContext()
.setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name())
.setMinionInstanceTag(minionInstanceTag)
.setLeader(false);
if (taskType != null) {
// Schedule task for the given task type
PinotTaskManager.TaskSchedulingInfo taskInfos = tableName != null
? _pinotTaskManager.scheduleTaskForTable(taskType, DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag);
response.put(taskType, StringUtils.join(taskInfos.getScheduledTaskNames(), ','));
generationErrors.addAll(taskInfos.getGenerationErrors());
schedulingErrors.addAll(taskInfos.getSchedulingErrors());
context.setTasksToSchedule(Collections.singleton(taskType));
}
if (tableName != null) {
context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName, headers)));
} else {
// Schedule tasks for all task types
Map<String, PinotTaskManager.TaskSchedulingInfo> allTaskInfos = tableName != null
? _pinotTaskManager.scheduleAllTasksForTable(DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag);
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
}
generationErrors.addAll(value.getGenerationErrors());
schedulingErrors.addAll(value.getSchedulingErrors());
});
context.setDatabasesToSchedule(Collections.singleton(database));
}
Map<String, TaskSchedulingInfo> allTaskInfos = _pinotTaskManager.scheduleTasks(context);
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
}
generationErrors.addAll(value.getGenerationErrors());
schedulingErrors.addAll(value.getSchedulingErrors());
});
response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.pinot.controller.helix.core.minion;

import java.util.Collections;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
Expand Down Expand Up @@ -64,8 +66,12 @@ public void execute(JobExecutionContext jobExecutionContext)
ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L);
return;
}
TaskSchedulingContext context = new TaskSchedulingContext()
.setTablesToSchedule(Collections.singleton(table))
.setTasksToSchedule(Collections.singleton(taskType))
.setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name());
long jobStartTime = System.currentTimeMillis();
pinotTaskManager.scheduleTaskForTable(taskType, table, null);
pinotTaskManager.scheduleTasks(context);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
jobExecutionContext.getNextFireTime());
pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table, taskType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,11 @@ private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext workflowCont
if (jobFinishTimeMs > 0) {
taskDebugInfo.setFinishTime(DateTimeUtils.epochToDefaultDateFormat(jobFinishTimeMs));
}
String triggeredBy = jobConfig.getTaskConfigMap().values().stream().findFirst()
.map(TaskConfig::getConfigMap)
.map(taskConfigs -> taskConfigs.get(PinotTaskManager.TRIGGERED_BY))
.orElse("");
taskDebugInfo.setTriggeredBy(triggeredBy);
Set<Integer> partitionSet = jobContext.getPartitionSet();
TaskCount subtaskCount = new TaskCount();
for (int partition : partitionSet) {
Expand All @@ -890,6 +895,7 @@ private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext workflowCont
String taskIdForPartition = jobContext.getTaskIdForPartition(partition);
subtaskDebugInfo.setTaskId(taskIdForPartition);
subtaskDebugInfo.setState(partitionState);
subtaskDebugInfo.setTriggeredBy(triggeredBy);
long subtaskStartTimeMs = jobContext.getPartitionStartTime(partition);
if (subtaskStartTimeMs > 0) {
subtaskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(subtaskStartTimeMs));
Expand Down Expand Up @@ -987,7 +993,8 @@ public Map<String, Map<String, Long>> getTaskMetadataLastUpdateTimeMs() {
return MinionTaskMetadataUtils.getAllTaskMetadataLastUpdateTimeMs(propertyStore);
}

@JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "finishTime", "subtaskInfos"})
@JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "finishTime", "triggeredBy",
"subtaskInfos"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class TaskDebugInfo {
// Time at which the task (which may have multiple subtasks) got created.
Expand All @@ -998,6 +1005,7 @@ public static class TaskDebugInfo {
private String _finishTime;
private TaskState _taskState;
private TaskCount _subtaskCount;
private String _triggeredBy;
private List<SubtaskDebugInfo> _subtaskInfos;

public TaskDebugInfo() {
Expand Down Expand Up @@ -1046,6 +1054,15 @@ public TaskState getTaskState() {
return _taskState;
}

public String getTriggeredBy() {
return _triggeredBy;
}

public TaskDebugInfo setTriggeredBy(String triggeredBy) {
_triggeredBy = triggeredBy;
return this;
}

public TaskCount getSubtaskCount() {
return _subtaskCount;
}
Expand All @@ -1055,7 +1072,7 @@ public List<SubtaskDebugInfo> getSubtaskInfos() {
}
}

@JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", "participant", "info", "taskConfig"})
@JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", "participant", "info", "triggeredBy", "taskConfig"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class SubtaskDebugInfo {
private String _taskId;
Expand All @@ -1064,6 +1081,7 @@ public static class SubtaskDebugInfo {
private String _finishTime;
private String _participant;
private String _info;
private String _triggeredBy;
private PinotTaskConfig _taskConfig;

public SubtaskDebugInfo() {
Expand Down Expand Up @@ -1121,6 +1139,15 @@ public String getInfo() {
return _info;
}

public String getTriggeredBy() {
return _triggeredBy;
}

public SubtaskDebugInfo setTriggeredBy(String triggeredBy) {
_triggeredBy = triggeredBy;
return this;
}

public PinotTaskConfig getTaskConfig() {
return _taskConfig;
}
Expand Down
Loading
Loading