From 667b1887f524fb499e472026f746bde4b5ceacca Mon Sep 17 00:00:00 2001 From: narendly Date: Mon, 26 Mar 2018 14:49:43 -0700 Subject: [PATCH] [HELIX-687] Add synchronized delete for workflows This commit adds the method deleteAndWaitForCompletion that deletes and returns only after the delete operation has completed. The pre-existing delete does not guarantee that the operation is complete; however, deleteAndWaitForCompletion does. --- .../org/apache/helix/task/TaskDriver.java | 60 +++++++++++++++-- .../integration/task/TestDeleteWorkflow.java | 67 +++++++++++++++++++ 2 files changed, 120 insertions(+), 7 deletions(-) create mode 100644 helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 44aa9309b5..957333c712 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -257,12 +257,12 @@ public void deleteJob(final String queue, final String job) { deleteJobFromQueue(queue, job); } - /** - * delete a job from a scheduled (non-recurrent) queue. - * - * @param queue - * @param job - */ + /** + * delete a job from a scheduled (non-recurrent) queue. + * + * @param queue + * @param job + */ private void deleteJobFromQueue(final String queue, final String job) { WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue); @@ -570,6 +570,52 @@ public void delete(String workflow) { } } + /** + * Public synchronized method to wait for a delete operation to fully complete with timeout. + * When this method returns, it means that a queue (workflow) has been completely deleted, meaning + * its IdealState, WorkflowConfig, and WorkflowContext have all been deleted. + * + * @param workflow workflow/jobqueue name + * @param timeout duration to give to delete operation to completion + */ + public void deleteAndWaitForCompletion(String workflow, long timeout) throws InterruptedException { + delete(workflow); + long endTime = System.currentTimeMillis() + timeout; + + // For checking whether delete completed + BaseDataAccessor baseDataAccessor = _accessor.getBaseDataAccessor(); + PropertyKey.Builder keyBuilder = _accessor.keyBuilder(); + + String idealStatePath = keyBuilder.idealStates(workflow).getPath(); + String workflowConfigPath = keyBuilder.resourceConfig(workflow).getPath(); + String workflowContextPath = keyBuilder.workflowContext(workflow).getPath(); + + while (System.currentTimeMillis() <= endTime) { + if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT) + || baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT) + || baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) { + Thread.sleep(1000); + } else { + return; + } + } + + // Deletion failed: check which step of deletion failed to complete and create an error message + StringBuilder failed = new StringBuilder(); + if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)) { + failed.append("IdealState "); + } + if (baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)) { + failed.append("WorkflowConfig "); + } + if (baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) { + failed.append("WorkflowContext "); + } + throw new HelixException(String + .format("Failed to delete the workflow/queue %s within %d milliseconds. " + + "The following components still remain: %s", workflow, timeout, failed.toString())); + } + /** * Helper function to change target state for a given workflow */ @@ -810,4 +856,4 @@ private void validateZKNodeLimitation(int newConfigNodeCount) { "Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS."); } } -} +} \ No newline at end of file diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java new file mode 100644 index 0000000000..91b7f32659 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java @@ -0,0 +1,67 @@ +package org.apache.helix.integration.task; + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixException; +import org.apache.helix.TestHelper; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskUtil; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestDeleteWorkflow extends TaskTestBase { + private static final int DELETE_DELAY = 3000; + + private HelixAdmin admin; + + @BeforeClass + public void beforeClass() throws Exception { + _numParitions = 1; + admin = _gSetupTool.getClusterManagementTool(); + super.beforeClass(); + } + + @Test + public void testDeleteWorkflow() throws InterruptedException { + String jobQueueName = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG) + .setMaxAttemptsPerTask(1) + .setWorkflow(jobQueueName) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000")); + + JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName); + jobQueue.enqueueJob("job1", jobBuilder); + _driver.start(jobQueue.build()); + _driver.pollForJobState(jobQueueName, + TaskUtil.getNamespacedJobName(jobQueueName, "job1"), TaskState.IN_PROGRESS); + + // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this job queue + Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName)); + Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName)); + Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); + + // Pause the Controller so that the job queue won't get deleted + admin.enableCluster(CLUSTER_NAME, false); + + // Attempt the deletion and time out + try { + _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY); + Assert.fail("Delete must time out and throw a HelixException with the Controller paused, but did not!"); + } catch (HelixException e) { + // Pass + } + + // Resume the Controller and call delete again + admin.enableCluster(CLUSTER_NAME, true); + _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY); + + // Check that the deletion operation completed + Assert.assertNull(_driver.getWorkflowConfig(jobQueueName)); + Assert.assertNull(_driver.getWorkflowContext(jobQueueName)); + Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName)); + } +} \ No newline at end of file