diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index 16b652aca0..284479ce74 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -19,10 +19,12 @@ * under the License. */ +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import org.apache.helix.HelixProperty; import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; @@ -33,7 +35,10 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.task.JobConfig; import org.apache.helix.task.TaskConstants; +import org.apache.helix.task.WorkflowConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,18 +60,43 @@ public void process(ClusterEvent event) throws Exception { throw new StageException("Missing attributes in event:" + event + ". Requires DataCache"); } - Map idealStates = cache.getIdealStates(); - Map resourceMap = new LinkedHashMap<>(); Map resourceToRebalance = new LinkedHashMap<>(); + Map idealStates = cache.getIdealStates(); boolean isTaskCache = cache instanceof WorkflowControllerDataProvider; + processIdealStates(cache, resourceMap, resourceToRebalance, idealStates, isTaskCache); + + // Add TaskFramework resources from workflow and job configs as Task Framework will no longer + // use IdealState + if (isTaskCache) { + WorkflowControllerDataProvider taskDataCache = + event.getAttribute(AttributeName.ControllerDataProvider.name()); + processWorkflowConfigs(taskDataCache, resourceMap, resourceToRebalance); + processJobConfigs(taskDataCache, resourceMap, resourceToRebalance, idealStates); + } + + // It's important to get partitions from CurrentState as well since the + // idealState might be removed. + processCurrentStates(cache, resourceMap, resourceToRebalance, idealStates, isTaskCache); + + event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceToRebalance); + } + + /* + * Construct Resources based on IdealStates and add them to resource maps + */ + private void processIdealStates(BaseControllerDataProvider cache, + Map resourceMap, Map resourceToRebalance, + Map idealStates, boolean isTaskCache) { if (idealStates != null && idealStates.size() > 0) { for (IdealState idealState : idealStates.values()) { if (idealState == null) { continue; } + Set partitionSet = idealState.getPartitionSet(); String resourceName = idealState.getResourceName(); if (!resourceMap.containsKey(resourceName)) { @@ -74,9 +104,10 @@ public void process(ClusterEvent event) throws Exception { cache.getResourceConfig(resourceName)); resourceMap.put(resourceName, resource); - if (!idealState.isValid() && !isTaskCache - || idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && isTaskCache - || !idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && !isTaskCache) { + // If this is a resource pipeline and the IdealState is invalid or has a non-task + // stateModelDef, add it to resourceToRebalance + if (!isTaskCache && (!idealState.isValid() || !idealState.getStateModelDefRef() + .equals(TaskConstants.STATE_MODEL_NAME))) { resourceToRebalance.put(resourceName, resource); } resource.setStateModelDefRef(idealState.getStateModelDefRef()); @@ -97,9 +128,59 @@ public void process(ClusterEvent event) throws Exception { } } } + } - // It's important to get partitions from CurrentState as well since the - // idealState might be removed. + /* + * Construct Resources based on WorkflowConfigs and add them to the two resource maps + */ + private void processWorkflowConfigs(WorkflowControllerDataProvider taskDataCache, Map resourceMap, + Map resourceToRebalance) { + for (Map.Entry workflowConfigEntry : taskDataCache + .getWorkflowConfigMap().entrySet()) { + // The resource could have been created by IS - always overwrite with config values + String resourceName = workflowConfigEntry.getKey(); + WorkflowConfig workflowConfig = workflowConfigEntry.getValue(); + addResourceConfigToResourceMap(resourceName, workflowConfig, taskDataCache.getClusterConfig(), + resourceMap, resourceToRebalance); + addPartition(resourceName, resourceName, resourceMap); + } + } + + /* + * Construct Resources based on JobConfigs and add them to the two resource maps + */ + private void processJobConfigs(WorkflowControllerDataProvider taskDataCache, Map resourceMap, + Map resourceToRebalance, Map idealStates) { + for (Map.Entry jobConfigEntry : taskDataCache.getJobConfigMap() + .entrySet()) { + // always overwrite, because the resource could be created by IS + String resourceName = jobConfigEntry.getKey(); + JobConfig jobConfig = jobConfigEntry.getValue(); + addResourceConfigToResourceMap(resourceName, jobConfig, taskDataCache.getClusterConfig(), + resourceMap, resourceToRebalance); + int numPartitions = jobConfig.getTaskConfigMap().size(); + // If there is no task config, this is a targeted job. We get task counts based on target + // resource IdealState + if (numPartitions == 0 && idealStates != null) { + IdealState targetIs = idealStates.get(jobConfig.getTargetResource()); + if (targetIs == null) { + LOG.warn("Target resource " + jobConfig.getTargetResource() + " does not exist for job " + resourceName); + } else { + numPartitions = targetIs.getPartitionSet().size(); + } + } + for (int i = 0; i < numPartitions; i++) { + addPartition(resourceName + "_" + i, resourceName, resourceMap); + } + } + } + + /* + * Construct Resources based on CurrentStates and add them to resource maps + */ + private void processCurrentStates(BaseControllerDataProvider cache, + Map resourceMap, Map resourceToRebalance, + Map idealStates, boolean isTaskCache) throws StageException { Map availableInstances = cache.getLiveInstances(); if (availableInstances != null && availableInstances.size() > 0) { @@ -124,17 +205,15 @@ public void process(ClusterEvent event) throws Exception { // don't overwrite ideal state settings if (!resourceMap.containsKey(resourceName)) { - addResource(resourceName, resourceMap); - Resource resource = resourceMap.get(resourceName); + Resource resource = new Resource(resourceName); resource.setStateModelDefRef(currentState.getStateModelDefRef()); resource.setStateModelFactoryName(currentState.getStateModelFactoryName()); resource.setBucketSize(currentState.getBucketSize()); resource.setBatchMessageMode(currentState.getBatchMessageMode()); - if (resource.getStateModelDefRef() == null && !isTaskCache - || resource.getStateModelDefRef() != null && ( - resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && isTaskCache - || !resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) - && !isTaskCache)) { + // if state model def is null, it's added during resource pipeline; if it's not null, + // it's added when it matches the pipeline type + if (isTaskCache == TaskConstants.STATE_MODEL_NAME + .equals(resource.getStateModelDefRef())) { resourceToRebalance.put(resourceName, resource); } @@ -143,6 +222,7 @@ public void process(ClusterEvent event) throws Exception { resource.setResourceGroupName(idealState.getResourceGroupName()); resource.setResourceTag(idealState.getInstanceGroupTag()); } + resourceMap.put(resourceName, resource); } if (currentState.getStateModelDefRef() == null) { @@ -150,8 +230,8 @@ public void process(ClusterEvent event) throws Exception { "state model def is null." + "resource:" + currentState.getResourceName() + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: " + currentState.getPartitionStateMap().values()); - throw new StageException("State model def is null for resource:" - + currentState.getResourceName()); + throw new StageException( + "State model def is null for resource:" + currentState.getResourceName()); } for (String partition : resourceStateMap.keySet()) { @@ -160,18 +240,6 @@ public void process(ClusterEvent event) throws Exception { } } } - - event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); - event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceToRebalance); - } - - private void addResource(String resource, Map resourceMap) { - if (resource == null || resourceMap == null) { - return; - } - if (!resourceMap.containsKey(resource)) { - resourceMap.put(resource, new Resource(resource)); - } } private void addPartition(String partition, String resourceName, Map resourceMap) { @@ -185,4 +253,21 @@ private void addPartition(String partition, String resourceName, Map resourceMap, + Map resourceToRebalance) { + Resource resource = new Resource(resourceName, clusterConfig, resourceConfig); + resourceMap.put(resourceName, resource); + resource.setStateModelDefRef(TaskConstants.STATE_MODEL_NAME); + resource.setStateModelFactoryName(resourceConfig.getStateModelFactoryName()); + boolean batchMessageMode = resourceConfig.getBatchMessageMode(); + if (clusterConfig != null) { + batchMessageMode |= clusterConfig.getBatchMessageMode(); + } + resource.setBatchMessageMode(batchMessageMode); + resource.setResourceGroupName(resourceConfig.getResourceGroupName()); + resource.setResourceTag(resourceConfig.getInstanceGroupTag()); + resourceToRebalance.put(resourceName, resource); + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java index 5b4d580e09..9746bf9ef8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java @@ -34,26 +34,20 @@ import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; -import org.apache.helix.controller.rebalancer.Rebalancer; -import org.apache.helix.controller.rebalancer.SemiAutoRebalancer; -import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.task.AssignableInstanceManager; import org.apache.helix.task.TaskConstants; -import org.apache.helix.task.TaskRebalancer; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; import org.apache.helix.task.WorkflowDispatcher; import org.apache.helix.task.assigner.AssignableInstance; -import org.apache.helix.util.HelixUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,117 +104,17 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map { String _workflowId; Long _rankingValue; diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index aeb0b5d3e8..cd240b5515 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -48,6 +48,7 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.monitoring.mbeans.JobMonitor; import org.apache.helix.task.assigner.AssignableInstance; +import org.apache.helix.util.RebalanceUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -519,6 +520,9 @@ protected void handleJobTimeout(JobContext jobCtx, WorkflowContext workflowCtx, _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.TIMED_OUT); _rebalanceScheduler.removeScheduledRebalance(jobResource); TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); + // New pipeline trigger for workflow status update + // TODO: Enhance the pipeline and remove this because this operation is expansive + RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false); } protected void failJob(String jobName, WorkflowContext workflowContext, JobContext jobContext, @@ -535,6 +539,9 @@ protected void failJob(String jobName, WorkflowContext workflowContext, JobConte _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobName), TaskState.FAILED); _rebalanceScheduler.removeScheduledRebalance(jobName); TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName); + // New pipeline trigger for workflow status update + // TODO: Enhance the pipeline and remove this because this operation is expansive + RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false); } // Compute real assignment from theoretical calculation with applied throttling @@ -936,6 +943,10 @@ protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConf protected void scheduleJobCleanUp(long expiry, WorkflowConfig workflowConfig, long currentTime) { + if (expiry < 0) { + // If the expiry is negative, it's an invalid clean up. Return. + return; + } long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) == -1 ? Long.MAX_VALUE : _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()); diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 2d1d8ecadb..9a48339b32 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -30,6 +30,7 @@ import java.util.TreeSet; import com.google.common.collect.ImmutableMap; +import org.apache.helix.util.RebalanceUtil; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; import org.apache.helix.controller.stages.CurrentStateOutput; @@ -85,7 +86,7 @@ public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName, // completed) TaskState workflowState = workflowCtx.getWorkflowState(); TaskState jobState = workflowCtx.getJobState(jobName); - // The job is already in a final state (completed/failed). + // Do not include workflowState == TIMED_OUT here, as later logic needs to handle this case if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED || jobState == TaskState.TIMED_OUT) { @@ -94,6 +95,9 @@ public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName, workflowResource, jobName, workflowState, jobState)); finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), workflowResource, jobName); TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName); + // New pipeline trigger for workflow status update + // TODO: Enhance the pipeline and remove this because this operation is expansive + RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false); _rebalanceScheduler.removeScheduledRebalance(jobName); return buildEmptyAssignment(jobName, currStateOutput); } @@ -288,6 +292,9 @@ private ResourceAssignment computeResourceMapping(String jobResource, jobCtx.getFinishTime() - jobCtx.getStartTime()); _rebalanceScheduler.removeScheduledRebalance(jobResource); TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); + // New pipeline trigger for workflow status update + // TODO: Enhance the pipeline and remove this because this operation is expansive + RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false); return buildEmptyAssignment(jobResource, currStateOutput); } @@ -298,6 +305,7 @@ private ResourceAssignment computeResourceMapping(String jobResource, handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg); finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(), jobResource); + scheduleJobCleanUp(jobCfg.getTerminalStateExpiry(), workflowConfig, currentTime); return buildEmptyAssignment(jobResource, currStateOutput); } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 506eca9c72..d463cecdd6 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -166,9 +166,6 @@ public void start(Workflow flow) { "Failed to add workflow configuration for workflow %s. It's possible that a workflow of the same name already exists or there was a connection issue. JobConfig deletion attempted but failed for the following jobs: %s", flow.getName(), failedJobRemoval)); } - - // Finally add workflow resource. - addWorkflowResource(flow.getName()); } /** @@ -524,10 +521,6 @@ public void enqueueJobs(final String queue, final List jobs, } throw new HelixException("Failed to enqueue job"); } - - // This is to make it back-compatible with old Helix task driver. - addWorkflowResourceIfNecessary(queue); - } /** @@ -572,40 +565,6 @@ public void cleanupQueue(String queue) { TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true); } - /** Posts new workflow resource to cluster */ - private void addWorkflowResource(String workflow) { - // Add workflow resource - _admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME); - - IdealState is = buildWorkflowIdealState(workflow); - TaskUtil.createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE)); - - _admin.setResourceIdealState(_clusterName, workflow, is); - } - - /** - * Posts new workflow resource to cluster if it does not exist - */ - private void addWorkflowResourceIfNecessary(String workflow) { - IdealState is = _admin.getResourceIdealState(_clusterName, workflow); - if (is == null) { - addWorkflowResource(workflow); - } - } - - private IdealState buildWorkflowIdealState(String workflow) { - CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow); - IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK).setNumReplica(1).setNumPartitions(1) - .setStateModel(TaskConstants.STATE_MODEL_NAME).disableExternalView(); - - IdealState is = IsBuilder.build(); - is.getRecord().setListField(workflow, new ArrayList<>()); - is.getRecord().setMapField(workflow, new HashMap<>()); - is.setRebalancerClassName(WorkflowRebalancer.class.getName()); - - return is; - } - /** * Add new job config to cluster by way of create */ @@ -731,6 +690,8 @@ private void removeWorkflowFromZK(String workflow) { } /** + * TODO: IdealStates are no longer used by Task Framework. This function deletes IdealStates for + * TODO: backward compatability purpose; this behavior will be removed later. * Public synchronized method to wait for a delete operation to fully complete with timeout. * When this method returns, it means that a queue (workflow) has been completely deleted, meaning * its IdealState, WorkflowConfig, and WorkflowContext have all been deleted. diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index a5a608693f..52b81ce8ac 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -595,30 +595,36 @@ public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor accessor, } /** + * TODO: Task Framework no longer uses IdealState; this is left in for backward compability * Cleans up IdealState and external view associated with a job. * @param accessor * @param job * @return True if remove success, otherwise false */ + @Deprecated protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor accessor, String job) { return cleanupIdealStateExtView(accessor, job); } /** + * TODO: Task Framework no longer uses IdealState; this is left in for backward compability * Cleans up IdealState and external view associated with a workflow. * @param accessor * @param workflow * @return True if remove success, otherwise false */ + @Deprecated protected static boolean cleanupWorkflowIdealStateExtView(final HelixDataAccessor accessor, String workflow) { return cleanupIdealStateExtView(accessor, workflow); } /** + * TODO: Task Framework no longer uses IdealState; this is left in for backward compability * Cleans up IdealState and external view associated with a job/workflow resource. */ + @Deprecated private static boolean cleanupIdealStateExtView(final HelixDataAccessor accessor, String workflowJobResource) { boolean success = true; diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index 4c9bd18cad..68a5be52b7 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -84,6 +84,7 @@ public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, TargetState targetState = workflowCfg.getTargetState(); if (targetState == TargetState.DELETE) { LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context."); + updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput); cleanupWorkflow(workflow); return; } @@ -126,6 +127,7 @@ && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) { // Step 4: Handle finished workflows if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) { LOG.info("Workflow " + workflow + " is finished."); + updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput); long expiryTime = workflowCfg.getExpiry(); // Check if this workflow has been finished past its expiry. if (workflowCtx.getFinishTime() + expiryTime <= currentTime) { @@ -149,19 +151,7 @@ && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) { } } - // Update jobs already inflight - RuntimeJobDag runtimeJobDag = _clusterDataCache.getTaskDataCache().getRuntimeJobDag(workflow); - if (runtimeJobDag != null) { - for (String inflightJob : runtimeJobDag.getInflightJobList()) { - if (System.currentTimeMillis() >= workflowCtx.getJobStartTime(inflightJob)) { - processJob(inflightJob, currentStateOutput, bestPossibleOutput, workflowCtx); - } - } - } else { - LOG.warn(String.format( - "Failed to find runtime job DAG for workflow %s, existing runtime jobs may not be processed correctly for it", - workflow)); - } + updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput); // Step 5: handle workflow that should STOP // For workflows that have already reached final states, STOP should not take into effect. @@ -187,6 +177,23 @@ && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) { _clusterDataCache.updateWorkflowContext(workflow, workflowCtx); } + private void updateInflightJobs(String workflow, WorkflowContext workflowCtx, + CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) { + // Update jobs already inflight + RuntimeJobDag runtimeJobDag = _clusterDataCache.getTaskDataCache().getRuntimeJobDag(workflow); + if (runtimeJobDag != null) { + for (String inflightJob : runtimeJobDag.getInflightJobList()) { + if (System.currentTimeMillis() >= workflowCtx.getJobStartTime(inflightJob)) { + processJob(inflightJob, currentStateOutput, bestPossibleOutput, workflowCtx); + } + } + } else { + LOG.warn(String.format( + "Failed to find runtime job DAG for workflow %s, existing runtime jobs may not be processed correctly for it", + workflow)); + } + } + public void assignWorkflow(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) { @@ -290,7 +297,6 @@ private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, // Time is not ready. Set a trigger and update the start time. // Check if the job is ready to be executed. if (System.currentTimeMillis() >= workflowCtx.getJobStartTime(job)) { - scheduleSingleJob(job, jobConfig); workflowCtx.setJobState(job, TaskState.NOT_STARTED); processJob(job, currentStateOutput, bestPossibleOutput, workflowCtx); scheduledJobs++; @@ -323,61 +329,17 @@ private void processJob(String job, CurrentStateOutput currentStateOutput, } /** - * Posts new job to cluster + * Jobs that are missing corresponding JobConfigs or WorkflowConfigs or WorkflowContexts need to + * be dropped */ - private void scheduleSingleJob(String jobResource, JobConfig jobConfig) { - HelixAdmin admin = _manager.getClusterManagmentTool(); - - IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource); - if (jobIS != null) { - LOG.info("Job " + jobResource + " idealstate already exists!"); - return; - } - - // Set up job resource based on partitions from target resource - - // Create the UserContentStore for the job first - TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource, - new ZNRecord(TaskUtil.USER_CONTENT_NODE)); - - int numPartitions = jobConfig.getTaskConfigMap().size(); - if (numPartitions == 0) { - IdealState targetIs = - admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource()); - if (targetIs == null) { - LOG.warn("Target resource does not exist for job " + jobResource); - // do not need to fail here, the job will be marked as failure immediately when job starts - // running. - } else { - numPartitions = targetIs.getPartitionSet().size(); - } - } - - admin.addResource(_manager.getClusterName(), jobResource, numPartitions, - TaskConstants.STATE_MODEL_NAME); - - // Push out new ideal state based on number of target partitions - IdealStateBuilder builder = new CustomModeISBuilder(jobResource); - builder.setRebalancerMode(IdealState.RebalanceMode.TASK); - builder.setNumReplica(1); - builder.setNumPartitions(numPartitions); - builder.setStateModel(TaskConstants.STATE_MODEL_NAME); - - if (jobConfig.getInstanceGroupTag() != null) { - builder.setNodeGroup(jobConfig.getInstanceGroupTag()); - } - - if (jobConfig.isDisableExternalView()) { - builder.disableExternalView(); - } - - jobIS = builder.build(); - for (int i = 0; i < numPartitions; i++) { - jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<>()); - jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<>()); + public void processJobForDrop(String resourceName, CurrentStateOutput currentStateOutput, + BestPossibleStateOutput bestPossibleStateOutput) { + JobConfig jobConfig = _clusterDataCache.getJobConfig(resourceName); + if (jobConfig == null || _clusterDataCache.getWorkflowConfig(jobConfig.getWorkflow()) == null + || _clusterDataCache.getWorkflowContext(jobConfig.getWorkflow()) == null) { + ResourceAssignment emptyAssignment = buildEmptyAssignment(resourceName, currentStateOutput); + updateBestPossibleStateOutput(resourceName, emptyAssignment, bestPossibleStateOutput); } - jobIS.setRebalancerClassName(JobRebalancer.class.getName()); - admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS); } /** diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java index da9843a8be..cb181d1caa 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java @@ -103,7 +103,6 @@ public Map assignTasks( // Assign Map assignResults = new HashMap<>(); - TaskAssignResult lastFailure = null; for (TaskConfig task : tasks) { // Dedup @@ -112,36 +111,18 @@ public Map assignTasks( continue; } - // TODO: Review this logic - // TODO: 1. It assumes that the only mode of failure is due to insufficient capacity. This - // assumption may not always be true. Verify - // TODO: 2. All TaskAssignResults will get failureReason/Description/TaskID for the first task - // that failed. This will need correction - // Every time we try to assign the task to the least-used instance, if that fails, - // we assume all subsequent tasks will fail with same reason - if (lastFailure != null) { - assignResults.put(task.getId(), - new TaskAssignResult(task, quotaType, null, false, lastFailure.getFitnessScore(), - lastFailure.getFailureReason(), lastFailure.getFailureDescription())); - continue; - } - // Try to assign the task to least used instance AssignableInstance instance = queue.poll(); TaskAssignResult result = instance.tryAssign(task, quotaType); assignResults.put(task.getId(), result); - if (!result.isSuccessful()) { - // For all failure reasons other than duplicated assignment, we can fail - // subsequent tasks - lastFailure = result; - } else { + if (result.isSuccessful()) { // If the task is successfully accepted by the instance, assign it to the instance assignableInstanceManager.assign(instance.getInstanceName(), result); - - // requeue the instance to rank again - queue.offer(instance); } + + // requeue the instance to rank again + queue.offer(instance); } logger.info("Finished assigning tasks with quota type {}", quotaType); return assignResults; diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java index eb84fe5bfa..057bea89a2 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java @@ -51,12 +51,11 @@ public void beforeClass() throws Exception { public void testQuotaConstraintSkipWorkflowAssignment() throws Exception { ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown); WorkflowControllerDataProvider cache = new WorkflowControllerDataProvider(CLUSTER_NAME); - JobConfig.Builder job = new JobConfig.Builder(); - - job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100000")); TaskDriver driver = new TaskDriver(_manager); for (int i = 0; i < 10; i++) { Workflow.Builder workflow = new Workflow.Builder("Workflow" + i); + JobConfig.Builder job = new JobConfig.Builder(); + job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100000")); job.setWorkflow("Workflow" + i); TaskConfig taskConfig = new TaskConfig(MockTask.TASK_COMMAND, new HashMap(), null, null); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java index 43fda005c9..0810e09eb0 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java @@ -40,7 +40,6 @@ import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobQueue; -import org.apache.helix.task.TaskConstants; import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; import org.apache.helix.task.WorkflowContext; @@ -84,14 +83,6 @@ public void testPersistContextData() { // Manually trigger a cache refresh cache.refresh(new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor)); - // Create the IdealState ZNode for the jobs - _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "0", 1, - TaskConstants.STATE_MODEL_NAME); - _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "1", 1, - TaskConstants.STATE_MODEL_NAME); - _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "2", 1, - TaskConstants.STATE_MODEL_NAME); - // Create the context WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW)); wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED); @@ -181,11 +172,10 @@ protected void handleEvent(Runnable event) { TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage(); garbageCollectionStage.process(_event); - // Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths - // IdealState check - checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "0"); - checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "1"); - checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2"); + // Check that contexts have been purged for the 2 jobs in both old and new ZNode paths + checkForContextRemoval(_testWorkflow, _testJobPrefix + "0"); + checkForContextRemoval(_testWorkflow, _testJobPrefix + "1"); + checkForContextRemoval(_testWorkflow, _testJobPrefix + "2"); } @Test(dependsOnMethods = "testPartialDataPurge") @@ -217,8 +207,8 @@ protected void handleEvent(Runnable event) { TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage(); garbageCollectionStage.process(_event); - // Check that IS and contexts have been purged for the workflow - checkForIdealStateAndContextRemoval(_testWorkflow); + // Check that contexts have been purged for the workflow + checkForContextRemoval(_testWorkflow); worker.shutdown(); } @@ -231,23 +221,20 @@ private void deleteJobConfigs(String workflowName, String jobName) { _baseAccessor.remove(newPath, AccessOption.PERSISTENT); } - private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception { + private void checkForContextRemoval(String workflow, String job) throws Exception { // JobContexts in old ZNode path String oldPath = String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job); String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath(); Assert.assertTrue(TestHelper.verify( - () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT) - && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor + () -> !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor .exists(newPath, AccessOption.PERSISTENT), 120000)); } - private void checkForIdealStateAndContextRemoval(String workflow) throws Exception { - Assert.assertTrue(TestHelper.verify(() -> - !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT) - && !_baseAccessor - .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT), + private void checkForContextRemoval(String workflow) throws Exception { + Assert.assertTrue(TestHelper.verify(() -> !_baseAccessor + .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT), 120000)); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java new file mode 100644 index 0000000000..8a1990416e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java @@ -0,0 +1,62 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.task.TaskConstants; +import org.apache.helix.task.TaskPartitionState; +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Test to make sure that current states of jobs are dropped if no JobConfig exists + */ +public class TestCurrentStateDropWithoutConfigs extends TaskTestBase { + protected HelixDataAccessor _accessor; + + @Test + public void testCurrentStateDropWithoutConfigs() throws Exception { + String jobName = TestHelper.getTestMethodName() + "_0"; + String taskName = jobName + "_0"; + + _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + LiveInstance liveInstance = _accessor + .getProperty(_accessor.keyBuilder().liveInstance(_participants[0].getInstanceName())); + CurrentState currentState = new CurrentState(jobName); + currentState.setSessionId(liveInstance.getEphemeralOwner()); + currentState.setStateModelDefRef(TaskConstants.STATE_MODEL_NAME); + currentState.setState(taskName, TaskPartitionState.RUNNING.name()); + currentState.setPreviousState(taskName, TaskPartitionState.INIT.name()); + currentState.setStartTime(taskName, System.currentTimeMillis()); + currentState.setEndTime(taskName, System.currentTimeMillis()); + _accessor.setProperty(_accessor.keyBuilder() + .currentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(), + jobName), currentState); + + Assert.assertTrue(TestHelper.verify(() -> _accessor.getProperty(_accessor.keyBuilder() + .currentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(), + jobName)) == null, TestHelper.WAIT_DURATION * 10)); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java index ed48975899..6b31b1f34c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteJobFromJobQueue.java @@ -68,8 +68,6 @@ public void testForceDeleteJobFromJobQueue() throws InterruptedException { Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job2"))); Assert .assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job2"))); - Assert.assertNotNull(_manager.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME, - TaskUtil.getNamespacedJobName(jobQueueName, "job2"))); // The following force delete for the job should go through without getting an exception _driver.deleteJob(jobQueueName, "job2", true); @@ -77,7 +75,5 @@ public void testForceDeleteJobFromJobQueue() throws InterruptedException { // Check that the job has been force-deleted (fully gone from ZK) Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job2"))); Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job2"))); - Assert.assertNull(_manager.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME, - TaskUtil.getNamespacedJobName(jobQueueName, "job2"))); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java index c435e82b64..8d11b12c2c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java @@ -63,7 +63,6 @@ public void testDeleteWorkflow() throws InterruptedException { // queue Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName)); Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName)); - Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); // Pause the Controller so that the job queue won't get deleted admin.enableCluster(CLUSTER_NAME, false); @@ -84,7 +83,6 @@ public void testDeleteWorkflow() throws InterruptedException { // Check that the deletion operation completed Assert.assertNull(_driver.getWorkflowConfig(jobQueueName)); Assert.assertNull(_driver.getWorkflowContext(jobQueueName)); - Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); } @Test @@ -100,22 +98,18 @@ public void testDeleteWorkflowForcefully() throws InterruptedException { _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job1"), TaskState.IN_PROGRESS); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this job + // Check that WorkflowConfig, WorkflowContext, JobConfig, and JobContext are indeed created for this job // queue Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName)); Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName)); Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); Assert .assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); - Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); - - // Delete the idealstate of workflow - HelixDataAccessor accessor = _manager.getHelixDataAccessor(); - PropertyKey.Builder keyBuild = accessor.keyBuilder(); - accessor.removeProperty(keyBuild.idealStates(jobQueueName)); - Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); - // Attempt the deletion and and it should time out since idealstate does not exist anymore. + // Pause the Controller so that the job queue won't get deleted + admin.enableCluster(CLUSTER_NAME, false); + Thread.sleep(1000); + // Attempt the deletion and time out try { _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY); Assert.fail( @@ -123,7 +117,6 @@ public void testDeleteWorkflowForcefully() throws InterruptedException { } catch (HelixException e) { // Pass } - // delete forcefully _driver.delete(jobQueueName, true); @@ -131,7 +124,6 @@ public void testDeleteWorkflowForcefully() throws InterruptedException { Assert.assertNull(_driver.getWorkflowContext(jobQueueName)); Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); - Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); } @Test @@ -147,19 +139,16 @@ public void testDeleteHangingJobs() throws InterruptedException { _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job1"), TaskState.IN_PROGRESS); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this job - // queue + // Check that WorkflowConfig and WorkflowContext are indeed created for this job queue Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName)); Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName)); Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); Assert .assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); - Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); - // Delete the idealstate, workflowconfig and context of workflow + // Delete the workflowconfig and context of workflow HelixDataAccessor accessor = _manager.getHelixDataAccessor(); PropertyKey.Builder keyBuild = accessor.keyBuilder(); - accessor.removeProperty(keyBuild.idealStates(jobQueueName)); accessor.removeProperty(keyBuild.resourceConfig(jobQueueName)); accessor.removeProperty(keyBuild.workflowContext(jobQueueName)); @@ -169,15 +158,12 @@ public void testDeleteHangingJobs() throws InterruptedException { Assert.assertTrue(verifier.verifyByPolling()); // Sometimes it's a ZK write fail - delete one more time to lower test failure rate - if (admin.getResourceIdealState(CLUSTER_NAME, jobQueueName) != null - || _driver.getWorkflowConfig(jobQueueName) != null + if (_driver.getWorkflowConfig(jobQueueName) != null || _driver.getWorkflowContext(jobQueueName) != null) { - accessor.removeProperty(keyBuild.idealStates(jobQueueName)); accessor.removeProperty(keyBuild.resourceConfig(jobQueueName)); accessor.removeProperty(keyBuild.workflowContext(jobQueueName)); } - Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); Assert.assertNull(_driver.getWorkflowConfig(jobQueueName)); Assert.assertNull(_driver.getWorkflowContext(jobQueueName)); @@ -202,7 +188,5 @@ public void testDeleteHangingJobs() throws InterruptedException { Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); - Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, - TaskUtil.getNamespacedJobName(jobQueueName, "job1"))); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java index ae724f08cc..2ac9f519b5 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java @@ -102,8 +102,8 @@ public void beforeClass() throws Exception { @Test public void testDeleteCompletedWorkflowForcefully() throws Exception { - // Create a simple workflow and wait for its completion. Then delete the IdealState, - // WorkflowContext and WorkflowConfig using ForceDelete. + // Create a simple workflow and wait for its completion. Then delete the WorkflowContext and + // WorkflowConfig using ForceDelete. String workflowName = TestHelper.getTestMethodName(); Workflow.Builder builder = createCustomWorkflow(workflowName, SHORT_EXECUTION_TIME, "0"); _driver.start(builder.build()); @@ -111,24 +111,20 @@ public void testDeleteCompletedWorkflowForcefully() throws Exception { // Wait until workflow is created and completed. _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this - // workflow + // Check that WorkflowConfig and WorkflowContext are indeed created for this workflow Assert.assertNotNull(_driver.getWorkflowConfig(workflowName)); Assert.assertNotNull(_driver.getWorkflowContext(workflowName)); - Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName)); // Stop the Controller _controller.syncStop(); _driver.delete(workflowName, true); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed deleted for this - // workflow. + // Check that WorkflowConfig and WorkflowContext are indeed deleted for this workflow. boolean isWorkflowDeleted = TestHelper.verify(() -> { WorkflowConfig wfcfg = _driver.getWorkflowConfig(workflowName); WorkflowContext wfctx = _driver.getWorkflowContext(workflowName); - IdealState is = _admin.getResourceIdealState(CLUSTER_NAME, workflowName); - return (wfcfg == null && wfctx == null && is == null); + return (wfcfg == null && wfctx == null); }, 60 * 1000); Assert.assertTrue(isWorkflowDeleted); } @@ -136,7 +132,7 @@ public void testDeleteCompletedWorkflowForcefully() throws Exception { @Test(dependsOnMethods = "testDeleteCompletedWorkflowForcefully") public void testDeleteRunningWorkflowForcefully() throws Exception { // Create a simple workflow and wait until it reaches the Running state. Then delete the - // IdealState, WorkflowContext and WorkflowConfig using ForceDelete. + // WorkflowContext and WorkflowConfig using ForceDelete. // Start the Controller String controllerName = CONTROLLER_PREFIX + "_0"; @@ -154,24 +150,20 @@ public void testDeleteRunningWorkflowForcefully() throws Exception { _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, "JOB0"), TaskState.IN_PROGRESS); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this - // workflow + // Check that WorkflowConfig and WorkflowContext are indeed created for this workflow Assert.assertNotNull(_driver.getWorkflowConfig(workflowName)); Assert.assertNotNull(_driver.getWorkflowContext(workflowName)); - Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName)); // Stop the Controller _controller.syncStop(); _driver.delete(workflowName, true); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed deleted for this - // workflow + // Check that WorkflowConfig and WorkflowContext are indeed deleted for this workflow. boolean isWorkflowDeleted = TestHelper.verify(() -> { WorkflowConfig wfcfg = _driver.getWorkflowConfig(workflowName); WorkflowContext wfctx = _driver.getWorkflowContext(workflowName); - IdealState is = _admin.getResourceIdealState(CLUSTER_NAME, workflowName); - return (wfcfg == null && wfctx == null && is == null); + return (wfcfg == null && wfctx == null); }, 60 * 1000); Assert.assertTrue(isWorkflowDeleted); } @@ -179,7 +171,7 @@ public void testDeleteRunningWorkflowForcefully() throws Exception { @Test(dependsOnMethods = "testDeleteRunningWorkflowForcefully") public void testDeleteStoppedWorkflowForcefully() throws Exception { // Create a simple workflow. Stop the workflow and wait until it's fully stopped. Then delete - // the IdealState, WorkflowContext and WorkflowConfig using ForceDelete. + // the WorkflowContext and WorkflowConfig using ForceDelete. // Start the Controller String controllerName = CONTROLLER_PREFIX + "_0"; @@ -210,24 +202,20 @@ public void testDeleteStoppedWorkflowForcefully() throws Exception { }, 60 * 1000); Assert.assertTrue(areJobsStopped); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this - // workflow. + // Check that WorkflowConfig and WorkflowContext are indeed created for this workflow. Assert.assertNotNull(_driver.getWorkflowConfig(workflowName)); Assert.assertNotNull(_driver.getWorkflowContext(workflowName)); - Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName)); // Stop the Controller _controller.syncStop(); _driver.delete(workflowName, true); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed deleted for this - // workflow. + // Check that WorkflowConfig and WorkflowContext are indeed deleted for this workflow. boolean isWorkflowDeleted = TestHelper.verify(() -> { WorkflowConfig wfcfg = _driver.getWorkflowConfig(workflowName); WorkflowContext wfctx = _driver.getWorkflowContext(workflowName); - IdealState is = _admin.getResourceIdealState(CLUSTER_NAME, workflowName); - return (wfcfg == null && wfctx == null && is == null); + return (wfcfg == null && wfctx == null); }, 60 * 1000); Assert.assertTrue(isWorkflowDeleted); } @@ -276,20 +264,17 @@ public void testDeleteStoppingStuckWorkflowForcefully() throws Exception { Assert.assertNotNull(_driver.getWorkflowConfig(workflowName)); Assert.assertNotNull(_driver.getWorkflowContext(workflowName)); - Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName)); // Stop the Controller. _controller.syncStop(); _driver.delete(workflowName, true); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed deleted for this - // workflow. + // Check that WorkflowConfig and WorkflowContext are indeed deleted for this workflow. boolean isWorkflowDeleted = TestHelper.verify(() -> { WorkflowConfig wfcfg = _driver.getWorkflowConfig(workflowName); WorkflowContext wfctx = _driver.getWorkflowContext(workflowName); - IdealState is = _admin.getResourceIdealState(CLUSTER_NAME, workflowName); - return (wfcfg == null && wfctx == null && is == null); + return (wfcfg == null && wfctx == null); }, 60 * 1000); Assert.assertTrue(isWorkflowDeleted); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java deleted file mode 100644 index f4fe3e7ff8..0000000000 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java +++ /dev/null @@ -1,99 +0,0 @@ -package org.apache.helix.integration.task; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import com.google.common.collect.Sets; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.helix.PropertyKey; -import org.apache.helix.TestHelper; -import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.model.MasterSlaveSMD; -import org.apache.helix.task.JobConfig; -import org.apache.helix.task.JobQueue; -import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.TaskState; -import org.apache.helix.task.TaskUtil; -import org.apache.helix.task.WorkflowConfig; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; -import com.google.common.collect.ImmutableMap; - -/** - * Test to make sure that a job which is running will be able to continue its progress even when its - * IdealState gets deleted. - */ -public class TestJobQueueDeleteIdealState extends TaskTestBase { - private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB; - protected HelixDataAccessor _accessor; - - @BeforeClass - public void beforeClass() throws Exception { - _numPartitions = 1; - _numNodes = 3; - super.beforeClass(); - } - - @Test - public void testJobQueueDeleteIdealState() throws Exception { - String jobQueueName = TestHelper.getTestMethodName(); - - _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); - - JobConfig.Builder jobBuilder0 = - new JobConfig.Builder().setWorkflow(jobQueueName).setTargetResource(DATABASE) - .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name())) - .setCommand(MockTask.TASK_COMMAND).setExpiry(5000L) - .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000")); - - JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName); - jobQueue.enqueueJob("JOB0", jobBuilder0); - jobQueue.enqueueJob("JOB1", jobBuilder0); - - WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(jobQueue.getWorkflowConfig()); - cfgBuilder.setJobPurgeInterval(1000L); - jobQueue.setWorkflowConfig(cfgBuilder.build()); - - _driver.start(jobQueue.build()); - - _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "JOB0"), - TaskState.COMPLETED); - - // Wait until JOB1 goes to IN_PROGRESS - _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "JOB1"), - TaskState.IN_PROGRESS); - - // Remove IdealState of JOB1 - PropertyKey isKey = - _accessor.keyBuilder().idealStates(TaskUtil.getNamespacedJobName(jobQueueName, "JOB1")); - if (_accessor.getPropertyStat(isKey) != null) { - _accessor.removeProperty(isKey); - } - - // Make sure IdealState has been successfully deleted - Assert.assertNull(_accessor.getPropertyStat(isKey)); - - // JOB1 should reach completed state even without IS - _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "JOB1"), - TaskState.COMPLETED); - } -} diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java index 25b1571982..1ab747a7f8 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java @@ -109,9 +109,8 @@ public void testWhenAllowOverlapJobAssignment() throws Exception { for (int i = 0; i < PARALLEL_COUNT; i++) { List taskConfigs = new ArrayList(); for (int j = 0; j < TASK_COUNT; j++) { - taskConfigs.add( - new TaskConfig.Builder().setTaskId("task_" + j).setCommand(MockTask.TASK_COMMAND) - .build()); + taskConfigs.add(new TaskConfig.Builder().setTaskId("job_" + (i + 1) + "_task_" + j) + .setCommand(MockTask.TASK_COMMAND).build()); } jobConfigBuilders.add(new JobConfig.Builder().addTaskConfigs(taskConfigs)); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java index 31010cf321..8172ea9391 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java @@ -20,18 +20,15 @@ */ import org.apache.helix.AccessOption; -import org.apache.helix.HelixAdmin; import org.apache.helix.TestHelper; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.integration.manager.ClusterControllerManager; -import org.apache.helix.model.IdealState; import org.apache.helix.task.JobConfig; import org.apache.helix.task.TaskState; import org.apache.helix.task.Workflow; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; import org.testng.Assert; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.collect.ImmutableMap; @@ -41,68 +38,6 @@ * Test workflow context will be deleted if workflow config has been removed. */ public class TestWorkflowContextWithoutConfig extends TaskTestBase { - private HelixAdmin _admin; - - @BeforeClass - public void beforeClass() throws Exception { - super.beforeClass(); - _admin = _gSetupTool.getClusterManagementTool(); - } - - @Test - public void testWorkflowContextWithoutConfig() throws Exception { - String workflowName1 = TestHelper.getTestMethodName() + "_1"; - Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName1); - _driver.start(builder1.build()); - - // Wait until workflow is created and IN_PROGRESS state. - _driver.pollForWorkflowState(workflowName1, TaskState.IN_PROGRESS); - - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this - // workflow - Assert.assertNotNull(_driver.getWorkflowConfig(workflowName1)); - Assert.assertNotNull(_driver.getWorkflowContext(workflowName1)); - Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName1)); - - String idealStatePath = "/" + CLUSTER_NAME + "/IDEALSTATES/" + workflowName1; - ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(idealStatePath, - null, AccessOption.PERSISTENT); - Assert.assertNotNull(record); - - // Wait until workflow is completed. - _driver.pollForWorkflowState(workflowName1, TaskState.COMPLETED); - - // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got - // expired. - boolean workflowExpired = TestHelper.verify(() -> { - WorkflowContext wCtx = _driver.getWorkflowContext(workflowName1); - WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName1); - IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName1); - return (wCtx == null && wCfg == null && idealState == null); - }, TestHelper.WAIT_DURATION); - Assert.assertTrue(workflowExpired); - - // Write idealState to ZooKeeper - _manager.getHelixDataAccessor().getBaseDataAccessor().set(idealStatePath, record, - AccessOption.PERSISTENT); - - // Create and start a new workflow just to make sure pipeline runs several times and context - // will not be created for workflow1 again - String workflowName2 = TestHelper.getTestMethodName() + "_2"; - Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2); - _driver.start(builder2.build()); - _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED); - - // Verify that context is not created after IdealState is written back to ZK. - boolean workflowContextNotCreated = TestHelper.verify(() -> { - WorkflowContext wCtx = _driver.getWorkflowContext(workflowName1); - WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName1); - IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName1); - return (wCtx == null && wCfg == null && idealState != null); - }, TestHelper.WAIT_DURATION); - Assert.assertTrue(workflowContextNotCreated); - } - @Test public void testWorkflowContextGarbageCollection() throws Exception { String workflowName = TestHelper.getTestMethodName(); @@ -112,11 +47,9 @@ public void testWorkflowContextGarbageCollection() throws Exception { // Wait until workflow is created and IN_PROGRESS state. _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS); - // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this - // workflow + // Check that WorkflowConfig and WorkflowContext are indeed created for this workflow Assert.assertNotNull(_driver.getWorkflowConfig(workflowName)); Assert.assertNotNull(_driver.getWorkflowContext(workflowName)); - Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName)); String workflowContextPath = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context"; @@ -128,13 +61,11 @@ public void testWorkflowContextGarbageCollection() throws Exception { // Wait until workflow is completed. _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); - // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got - // expired. + // Verify that WorkflowConfig and WorkflowContext are removed after workflow got expired. boolean workflowExpired = TestHelper.verify(() -> { WorkflowContext wCtx = _driver.getWorkflowContext(workflowName); WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName); - IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName); - return (wCtx == null && wCfg == null && idealState == null); + return (wCtx == null && wCfg == null); }, TestHelper.WAIT_DURATION); Assert.assertTrue(workflowExpired); diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java index ff63d63f0e..5fe869795c 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java @@ -75,6 +75,10 @@ public void testGetLastScheduledTaskExecInfo() throws Exception { // Stop and delete the queue _driver.stop(queueName); + TestHelper.verify(() -> { + WorkflowContext workflowContext = _driver.getWorkflowContext(queueName); + return workflowContext.getWorkflowState().equals(TaskState.STOPPED); + }, TestHelper.WAIT_DURATION); _driver.deleteAndWaitForCompletion(queueName, DELETE_DELAY); // Start the new queue with new task configuration.