Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,5 @@ public enum AttributeName {
PipelineType,
LastRebalanceFinishTimeStamp,
ControllerDataProvider,
STATEFUL_REBALANCER,
// This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
TO_BE_PURGED_WORKFLOWS,
// This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
TO_BE_PURGED_JOBS_MAP
STATEFUL_REBALANCER
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package org.apache.helix.controller.stages;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.helix.HelixManager;
Expand All @@ -13,12 +10,9 @@
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
Expand All @@ -29,87 +23,34 @@ public AsyncWorkerType getAsyncWorkerType() {
}

@Override
public void process(ClusterEvent event) throws Exception {
// Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed.
// This is to avoid race conditions since the cache will be modified. After this work, then the
// async work will happen.
public void execute(ClusterEvent event) {
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
if (manager == null) {

if (dataProvider == null || manager == null) {
LOG.warn(
"HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
"ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}

Map<String, Set<String>> expiredJobsMap = new HashMap<>();
Set<String> workflowsToBePurged = new HashSet<>();
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
Set<WorkflowConfig> existingWorkflows =
new HashSet<>(dataProvider.getWorkflowConfigMap().values());
for (WorkflowConfig workflowConfig : existingWorkflows) {
// clean up the expired jobs if it is a queue.
if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
.isJobQueue())) {
WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());
long purgeInterval = workflowConfig.getJobPurgeInterval();
long currentTime = System.currentTimeMillis();
if (purgeInterval > 0
&& workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
// Find jobs that are ready to be purged
Set<String> expiredJobs =
TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext);
if (!expiredJobs.isEmpty()) {
expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
}
scheduleNextJobPurge(workflowConfig.getWorkflowId(), currentTime, purgeInterval,
_rebalanceScheduler, manager);
try {
TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
_rebalanceScheduler);
} catch (Exception e) {
LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
workflowConfig.getWorkflowId(), e.toString()));
}
} else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId()
.equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
// Find workflows that need to be purged
workflowsToBePurged.add(entry.getKey());
}
}
event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
Collections.unmodifiableMap(expiredJobsMap));
event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
Collections.unmodifiableSet(workflowsToBePurged));

super.process(event);
}

@Override
public void execute(ClusterEvent event) {
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
if (manager == null) {
LOG.warn(
"HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}

Map<String, Set<String>> expiredJobsMap =
event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
Set<String> toBePurgedWorkflows =
event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());

for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
try {
TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager,
_rebalanceScheduler);
} catch (Exception e) {
LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e);
}
}

TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
}

private static void scheduleNextJobPurge(String workflow, long currentTime, long purgeInterval,
RebalanceScheduler rebalanceScheduler, HelixManager manager) {
long nextPurgeTime = currentTime + purgeInterval;
long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
}
}
}
173 changes: 70 additions & 103 deletions helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -735,60 +735,26 @@ protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor,
for (String job : workflowConfig.getJobDag().getAllNodes()) {
JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
if (jobConfig == null) {
LOG.error(String.format(
"Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.",
job, workflowConfig.getWorkflowId()));
// Add the job name to expiredJobs so that purge operation will be tried again on this job
expiredJobs.add(job);
continue;
}
long expiry = jobConfig.getExpiry();
if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED
&& System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
expiredJobs.add(job);
}
}
}
}
return expiredJobs;
}

/**
* Based on a workflow's config or context, create a set of jobs that are either expired, which
* means they are COMPLETED and have passed their expiration time, or don't have JobConfigs,
* meaning that the job might have been deleted manually from the a job queue, or is left in the
* DAG due to a failed clean-up attempt from last purge. The difference between this function and
* getExpiredJobs() is that this function gets JobConfig and JobContext from a
* WorkflowControllerDataProvider instead of Zk.
* @param workflowControllerDataProvider
* @param workflowConfig
* @param workflowContext
* @return
*/
public static Set<String> getExpiredJobsFromCache(
WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig,
WorkflowContext workflowContext) {
Set<String> expiredJobs = new HashSet<>();
Map<String, TaskState> jobStates = workflowContext.getJobStates();
for (String job : workflowConfig.getJobDag().getAllNodes()) {
JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
expiredJobs.add(job);
}
}
return expiredJobs;
}

/*
* Checks if a job is expired and should be purged. This includes a special case when jobConfig
* is null. That happens when a job might have been deleted manually from the a job queue, or is
* left in the DAG due to a failed clean-up attempt from last purge.
*/
private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext,
TaskState jobState) {
if (jobConfig == null) {
LOG.warn(
"Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.",
jobName);
return true;
}
long expiry = jobConfig.getExpiry();
return jobContext != null && jobState == TaskState.COMPLETED
&& jobContext.getFinishTime() != WorkflowContext.UNFINISHED
&& System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
}

/**
* Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name.
* @param accessor
Expand Down Expand Up @@ -1011,71 +977,72 @@ public static boolean isJobStarted(String job, WorkflowContext workflowContext)
}

/**
* Clean up all jobs that are marked as expired.
* Clean up all jobs that are COMPLETED and passes its expiry time.
* @param workflowConfig
* @param workflowContext
*/
public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
HelixManager manager, RebalanceScheduler rebalanceScheduler) {
Set<String> failedJobRemovals = new HashSet<>();
for (String job : expiredJobs) {
if (!TaskUtil
.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) {
failedJobRemovals.add(job);
LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow);
}
rebalanceScheduler.removeScheduledRebalance(job);
public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
WorkflowContext workflowContext, HelixManager manager,
RebalanceScheduler rebalanceScheduler) {
if (workflowContext == null) {
LOG.warn(String.format("Workflow %s context does not exist!", workflow));
return;
}
long purgeInterval = workflowConfig.getJobPurgeInterval();
long currentTime = System.currentTimeMillis();
final Set<String> expiredJobs = Sets.newHashSet();
if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
manager.getHelixPropertyStore(), workflowConfig, workflowContext));
if (expiredJobs.isEmpty()) {
LOG.info("No job to purge for the queue " + workflow);
} else {
LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
Set<String> failedJobRemovals = new HashSet<>();
for (String job : expiredJobs) {
if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
job)) {
failedJobRemovals.add(job);
LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
}
rebalanceScheduler.removeScheduledRebalance(job);
}

// If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
// removal will be tried again at next purge
expiredJobs.removeAll(failedJobRemovals);
// If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
// removal will be tried again at next purge
expiredJobs.removeAll(failedJobRemovals);

if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs,
workflow);
}
if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs,
true)) {
LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
+ " from the workflow " + workflow);
}

if (expiredJobs.size() > 0) {
// Update workflow context will be in main pipeline not here. Otherwise, it will cause
// concurrent write issue. It is possible that jobs got purged but there is no event to
// trigger the pipeline to clean context.
HelixDataAccessor accessor = manager.getHelixDataAccessor();
List<String> resourceConfigs =
accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
if (resourceConfigs.size() > 0) {
RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
} else {
LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!",
expiredJobs);
if (expiredJobs.size() > 0) {
// Update workflow context will be in main pipeline not here. Otherwise, it will cause
// concurrent write issue. It is possible that jobs got purged but there is no event to
// trigger the pipeline to clean context.
HelixDataAccessor accessor = manager.getHelixDataAccessor();
List<String> resourceConfigs =
accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
if (resourceConfigs.size() > 0) {
RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
} else {
LOG.warn(
"No resource config to trigger rebalance for clean up contexts for" + expiredJobs);
}
}
}
}
setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
}

/**
* The function that removes IdealStates and workflow contexts of the workflows that need to be
* deleted.
* @param toBePurgedWorkflows
* @param manager
*/
public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows,
final HelixManager manager) {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();

for (String workflowName : toBePurgedWorkflows) {
LOG.warn(
"WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
workflowName);

// TODO: We dont need this in the future when TF is not relying on IS/EV anymore.
if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.",
workflowName);
continue;
}

if (!removeWorkflowContext(propertyStore, workflowName)) {
LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName);
}
private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
RebalanceScheduler rebalanceScheduler, HelixManager manager) {
long nextPurgeTime = currentTime + purgeInterval;
long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,20 @@ private void scheduleSingleJob(String jobResource, JobConfig jobConfig) {
admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
TaskConstants.STATE_MODEL_NAME);

HelixDataAccessor accessor = _manager.getHelixDataAccessor();

// Set the job configuration
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
HelixProperty resourceConfig = new HelixProperty(jobResource);
resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
if (taskConfigMap != null) {
for (TaskConfig taskConfig : taskConfigMap.values()) {
resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
}
}
accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);

// Push out new ideal state based on number of target partitions
IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
Expand Down
Loading