From 49ceac0e9449940546e37628780e5098ac4e8678 Mon Sep 17 00:00:00 2001 From: Lei Xia Date: Thu, 19 Mar 2015 13:54:05 -0700 Subject: [PATCH] [HELIX-578] NPE while deleting a job from a recurrent job queue. --- .../org/apache/helix/task/TaskDriver.java | 98 ++++++++++++--- .../task/TestTaskRebalancerStopResume.java | 115 +++++++++++++++++- .../helix/integration/task/TestUtil.java | 13 ++ 3 files changed, 206 insertions(+), 20 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 87d81a186e..b8ca141300 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -281,17 +281,59 @@ public ZNRecord update(ZNRecord currentData) { /** Delete a job from an existing named queue, the queue has to be stopped prior to this call */ public void deleteJob(final String queueName, final String jobName) { - HelixProperty workflowConfig = _accessor.getProperty(_accessor.keyBuilder().resourceConfig(queueName)); - if (workflowConfig == null) { + WorkflowConfig workflowCfg = + TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName); + + if (workflowCfg == null) { throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!"); } - boolean isTerminable = - workflowConfig.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true); - if (isTerminable) { + if (workflowCfg.isTerminable()) { throw new IllegalArgumentException(queueName + " is not a queue!"); } + boolean isRecurringWorkflow = + (workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring()); + + if (isRecurringWorkflow) { + WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName); + + String lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow(); + + // delete the current scheduled one + deleteJobFromScheduledQueue(lastScheduledQueue, jobName); + + // Remove the job from the original queue template's DAG + removeJobFromDag(queueName, jobName); + + // delete the ideal state and resource config for the template job + final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); + _admin.dropResource(_clusterName, namespacedJobName); + + // Delete the job template from property store + String jobPropertyPath = + Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName); + _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT); + } else { + deleteJobFromScheduledQueue(queueName, jobName); + } + } + + + /** delete a job from a scheduled (non-recurrent) queue.*/ + private void deleteJobFromScheduledQueue(final String queueName, final String jobName) { + WorkflowConfig workflowCfg = + TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName); + + if (workflowCfg == null) { + throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!"); + } + WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName); + if (wCtx != null && wCtx.getWorkflowState() == null) { + throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!"); + } + String workflowState = (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name(); @@ -300,7 +342,26 @@ public void deleteJob(final String queueName, final String jobName) { } // Remove the job from the queue in the DAG + removeJobFromDag(queueName, jobName); + + // delete the ideal state and resource config for the job + final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); + _admin.dropResource(_clusterName, namespacedJobName); + + // update queue's property to remove job from JOB_STATES if it is already started. + removeJobStateFromQueue(queueName, jobName); + + // Delete the job from property store + String jobPropertyPath = + Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName); + _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT); + } + + /** Remove the job name from the DAG from the queue configuration */ + private void removeJobFromDag(final String queueName, final String jobName) { final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); + DataUpdater updater = new DataUpdater() { @Override public ZNRecord update(ZNRecord currentData) { @@ -338,7 +399,7 @@ public ZNRecord update(ZNRecord currentData) { currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson()); } catch (Exception e) { throw new IllegalStateException( - "Could not remove job " + jobName + " from queue " + queueName, e); + "Could not remove job " + jobName + " from DAG of queue " + queueName, e); } return currentData; } @@ -347,17 +408,20 @@ public ZNRecord update(ZNRecord currentData) { String path = _accessor.keyBuilder().resourceConfig(queueName).getPath(); boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT); if (!status) { - throw new IllegalArgumentException("Could not enqueue job"); + throw new IllegalArgumentException( + "Could not remove job " + jobName + " from DAG of queue " + queueName); } + } - // delete the ideal state and resource config for the job - _admin.dropResource(_clusterName, namespacedJobName); - - // update queue's property to remove job from JOB_STATES if it is already started. + /** update queue's property to remove job from JOB_STATES if it is already started. + */ + private void removeJobStateFromQueue(final String queueName, final String jobName) { + final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); String queuePropertyPath = Joiner.on("/") .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE); - updater = new DataUpdater() { + + DataUpdater updater = new DataUpdater() { @Override public ZNRecord update(ZNRecord currentData) { if (currentData != null) { @@ -369,13 +433,9 @@ public ZNRecord update(ZNRecord currentData) { return currentData; } }; - _propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT); - - // Delete the job from property store - String jobPropertyPath = - Joiner.on("/") - .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName); - _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT); + if (!_propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT)) { + LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName); + } } /** Adds a new job to the end an existing named queue */ diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java index 500f029316..9f723631a0 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java @@ -20,7 +20,9 @@ */ import java.util.ArrayList; +import java.util.Calendar; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ import org.apache.helix.task.TaskUtil; import org.apache.helix.task.Workflow; import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.log4j.Logger; @@ -254,6 +257,7 @@ public void stopAndResumeNamedQueue() throws Exception { verifyJobNotInQueue(queueName, namespacedJob2); } + @Test public void stopDeleteAndResumeNamedQueue() throws Exception { String queueName = TestHelper.getTestMethodName(); @@ -319,9 +323,19 @@ public void stopDeleteAndResumeNamedQueue() throws Exception { LOG.info("Resuming job-queue: " + queueName); _driver.resume(queueName); - // Ensure the jobs left are successful completed in the correct order currentJobNames.remove(deletedJob1); currentJobNames.remove(deletedJob2); + + // add job 3 back + JobConfig.Builder job = + new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")); + LOG.info("Enqueuing job: " + deletedJob2); + _driver.enqueueJob(queueName, deletedJob2, job); + currentJobNames.add(deletedJob2); + + // Ensure the jobs left are successful completed in the correct order long preJobFinish = 0; for (int i = 0; i < currentJobNames.size(); i++) { String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i)); @@ -346,6 +360,105 @@ public void stopDeleteAndResumeNamedQueue() throws Exception { } } + private JobQueue buildRecurrentJobQueue(String jobQueueName) + { + Map cfgMap = new HashMap(); + cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(50000)); + cfgMap.put(WorkflowConfig.START_TIME, WorkflowConfig.DEFAULT_DATE_FORMAT.format( + Calendar.getInstance().getTime())); + cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60)); + cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS"); + return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build(); + } + + @Test + public void stopDeleteAndResumeRecurrentNamedQueue() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue queue = buildRecurrentJobQueue(queueName); + _driver.createQueue(queue); + + // Create and Enqueue jobs + List currentJobNames = new ArrayList(); + Map commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500)); + for (int i = 0; i <= 4; i++) { + String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; + + JobConfig.Builder job = + new JobConfig.Builder().setCommand("Reindex") + .setJobCommandConfigMap(commandConfig) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet(targetPartition)); + String jobName = targetPartition.toLowerCase() + "Job" + i; + LOG.info("Enqueuing job: " + jobName); + _driver.enqueueJob(queueName, jobName, job); + currentJobNames.add(i, jobName); + } + + WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); + String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); + + // ensure job 1 is started before deleting it + String deletedJob1 = currentJobNames.get(0); + String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1); + TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS); + + // stop the queue + LOG.info("Pausing job-queue: " + scheduledQueue); + _driver.stop(queueName); + TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED); + TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED); + + // delete the in-progress job (job 1) and verify it being deleted + _driver.deleteJob(queueName, deletedJob1); + verifyJobDeleted(queueName, namedSpaceDeletedJob1); + verifyJobDeleted(scheduledQueue, namedSpaceDeletedJob1); + + LOG.info("Resuming job-queue: " + queueName); + _driver.resume(queueName); + + // ensure job 2 is started + TestUtil.pollForJobState(_manager, scheduledQueue, + String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS); + + // stop the queue + LOG.info("Pausing job-queue: " + queueName); + _driver.stop(queueName); + TestUtil.pollForJobState(_manager, scheduledQueue, + String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED); + TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED); + + // Ensure job 3 is not started before deleting it + String deletedJob2 = currentJobNames.get(2); + String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2); + TestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2); + + // delete not-started job (job 3) and verify it being deleted + _driver.deleteJob(queueName, deletedJob2); + verifyJobDeleted(queueName, namedSpaceDeletedJob2); + verifyJobDeleted(scheduledQueue, namedSpaceDeletedJob2); + + LOG.info("Resuming job-queue: " + queueName); + _driver.resume(queueName); + + // Ensure the jobs left are successful completed in the correct order + currentJobNames.remove(deletedJob1); + currentJobNames.remove(deletedJob2); + long preJobFinish = 0; + for (int i = 0; i < currentJobNames.size(); i++) { + String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i)); + TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED); + + JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName); + long jobStart = jobContext.getStartTime(); + Assert.assertTrue(jobStart >= preJobFinish); + preJobFinish = jobContext.getFinishTime(); + } + // verify the job is not there for the next recurrence of queue schedule + } + private void verifyJobDeleted(String queueName, String jobName) throws Exception { HelixDataAccessor accessor = _manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java index 413b98aed1..27e827a61f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java @@ -81,4 +81,17 @@ public boolean verify() throws Exception { }, _default_timeout); Assert.assertTrue(succeed); } + + public static WorkflowContext pollForWorkflowContext(HelixManager manager, String workflowResource) + throws InterruptedException { + // Wait for completion. + long st = System.currentTimeMillis(); + WorkflowContext ctx; + do { + Thread.sleep(100); + ctx = TaskUtil.getWorkflowContext(manager, workflowResource); + } while (ctx == null && System.currentTimeMillis() < st + _default_timeout); + Assert.assertNotNull(ctx); + return ctx; + } }