From 0a18726fcad7b8a0fe5e77d7a2c9848b86461ccc Mon Sep 17 00:00:00 2001 From: Junkai Xue Date: Tue, 13 Sep 2016 15:28:57 -0700 Subject: [PATCH 1/3] [HELIX-635] GenericTaskAssignmentCalculator rebalance with consistent hashing 1. Implement consistent hashing mapping calculation 2. Remove reassign logics and applied in consistent hashing 3. Add tests for GenericTaskAssignmentCalculator --- .../strategy/CrushRebalanceStrategy.java | 2 +- .../crushMapping/CRUSHPlacementAlgorithm.java | 1 + .../task/GenericTaskAssignmentCalculator.java | 238 +++++++----------- .../crushMapping => util}/JenkinsHash.java | 2 +- .../TestGenericTaskAssignmentCalculator.java | 171 +++++++++++++ .../task/TestIndependentTaskRebalancer.java | 8 +- 6 files changed, 266 insertions(+), 156 deletions(-) rename helix-core/src/main/java/org/apache/helix/{controller/rebalancer/strategy/crushMapping => util}/JenkinsHash.java (98%) create mode 100644 helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java index a8fe107f0e..b91d26cbd2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java @@ -24,7 +24,7 @@ import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm; -import org.apache.helix.controller.rebalancer.strategy.crushMapping.JenkinsHash; +import org.apache.helix.util.JenkinsHash; import org.apache.helix.controller.rebalancer.topology.Node; import org.apache.helix.controller.rebalancer.topology.Topology; import org.apache.helix.controller.stages.ClusterDataCache; diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java index 870656cf3e..b7c1c68bb3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.util.JenkinsHash; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java index ac96768afb..fbc7af336b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java @@ -19,30 +19,26 @@ * under the License. */ -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; -import org.apache.helix.ZNRecord; +import org.apache.helix.HelixException; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.util.JenkinsHash; import org.apache.log4j.Logger; -import com.google.common.base.Function; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -54,9 +50,6 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator { private static final Logger LOG = Logger.getLogger(GenericTaskAssignmentCalculator.class); - /** Reassignment policy for this algorithm */ - private RetryPolicy _retryPolicy = new DefaultRetryReassigner(); - @Override public Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { @@ -96,14 +89,7 @@ public Map> getTaskAssignment(CurrentStateOutput curr // Transform from partition id to fully qualified partition name List partitionNums = Lists.newArrayList(partitionSet); Collections.sort(partitionNums); - final String resourceId = prevAssignment.getResourceName(); - List partitions = - new ArrayList(Lists.transform(partitionNums, new Function() { - @Override - public String apply(Integer partitionNum) { - return resourceId + "_" + partitionNum; - } - })); + String resourceId = prevAssignment.getResourceName(); // Compute the current assignment Map> currentMapping = Maps.newHashMap(); @@ -122,156 +108,108 @@ public String apply(Integer partitionNum) { } // Get the assignment keyed on partition - RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, partitions, states); - List allNodes = - Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache)); - Collections.sort(allNodes); - ZNRecord record = - strategy.computePartitionAssignment(allNodes, allNodes, currentMapping, cache); - Map> preferenceLists = record.getListFields(); - - // Convert to an assignment keyed on participant - Map> taskAssignment = Maps.newHashMap(); - for (Map.Entry> e : preferenceLists.entrySet()) { - String partitionName = e.getKey(); - partitionName = String.valueOf(TaskUtil.getPartitionId(partitionName)); - List preferenceList = e.getValue(); - for (String participantName : preferenceList) { - if (!taskAssignment.containsKey(participantName)) { - taskAssignment.put(participantName, new TreeSet()); - } - taskAssignment.get(participantName).add(Integer.valueOf(partitionName)); - } + if (jobCfg.getTargetResource() != null) { + LOG.error( + "Target resource is not null, should call FixedTaskAssignmentCalculator, target resource : " + + jobCfg.getTargetResource()); + return new HashMap>(); } - // Finally, adjust the assignment if tasks have been failing - taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment); + List allNodes = Lists.newArrayList(instances); + ConsistentHashingPlacement placement = new ConsistentHashingPlacement(allNodes); + Map> taskAssignment = + placement.computeMapping(jobCfg, jobContext, partitionNums, resourceId); + return taskAssignment; } - /** - * Filter a list of instances based on targeted resource policies - * @param jobCfg the job configuration - * @param currStateOutput the current state of all instances in the cluster - * @param instances valid instances - * @param cache current snapshot of the cluster - * @return a set of instances that can be assigned to - */ - private Set getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput, - Iterable instances, ClusterDataCache cache) { - // No target resource means any instance is available - Set allInstances = Sets.newHashSet(instances); - String targetResource = jobCfg.getTargetResource(); - if (targetResource == null) { - return allInstances; - } + private class ConsistentHashingPlacement { + private JenkinsHash _hashFunction; + private ConsistentHashSelector _selector; + private int _numInstances; - // Bad ideal state means don't assign - IdealState idealState = cache.getIdealState(targetResource); - if (idealState == null) { - return Collections.emptySet(); + public ConsistentHashingPlacement(List potentialInstances) { + _hashFunction = new JenkinsHash(); + _selector = new ConsistentHashSelector(potentialInstances); + _numInstances = potentialInstances.size(); } - // Get the partitions on the target resource to use - Set partitions = idealState.getPartitionSet(); - List targetPartitions = jobCfg.getTargetPartitions(); - if (targetPartitions != null && !targetPartitions.isEmpty()) { - partitions.retainAll(targetPartitions); - } + public Map> computeMapping(JobConfig jobConfig, + JobContext jobContext, List partitions, String resourceId) { + if (_numInstances == 0) { + return new HashMap>(); + } + + Map> taskAssignment = Maps.newHashMap(); + + for (int partition : partitions) { + long hashedValue = new String(resourceId + "_" + partition).hashCode(); + int shiftTimes; + int numAttempts = jobContext.getPartitionNumAttempts(partition); + int maxAttempts = jobConfig.getMaxAttemptsPerTask(); - // Based on state matches, add eligible instances - Set eligibleInstances = Sets.newHashSet(); - Set targetStates = jobCfg.getTargetPartitionStates(); - for (String partition : partitions) { - Map stateMap = - currStateOutput.getCurrentStateMap(targetResource, new Partition(partition)); - Map pendingStateMap = - currStateOutput.getPendingStateMap(targetResource, new Partition(partition)); - for (Map.Entry e : stateMap.entrySet()) { - String instanceName = e.getKey(); - String state = e.getValue(); - String pending = pendingStateMap.get(instanceName); - if (pending != null) { - continue; + if (jobConfig.getMaxAttemptsPerTask() < _numInstances) { + shiftTimes = numAttempts == -1 ? 0 : numAttempts; + } else { + shiftTimes = (maxAttempts == 0) + ? 0 + : jobContext.getPartitionNumAttempts(partition) / (maxAttempts / _numInstances); + } + // Hash the value based on the shifting time. The default shift time will be 0. + for (int i = 0; i <= shiftTimes; i++) { + hashedValue = _hashFunction.hash(hashedValue); } - if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) { - eligibleInstances.add(instanceName); + String selectedInstance = select(hashedValue); + if (selectedInstance != null) { + if (!taskAssignment.containsKey(selectedInstance)) { + taskAssignment.put(selectedInstance, new TreeSet()); + } + taskAssignment.get(selectedInstance).add(partition); } } + return taskAssignment; } - allInstances.retainAll(eligibleInstances); - return allInstances; - } - public interface RetryPolicy { - /** - * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently - * assigned - * @param jobCfg the job configuration - * @param jobCtx the job context - * @param instances instances that can serve tasks - * @param origAssignment the unmodified assignment - * @return the adjusted assignment - */ - Map> reassign(JobConfig jobCfg, JobContext jobCtx, - Collection instances, Map> origAssignment); - } + private String select(long data) throws HelixException { + return _selector.get(data); + } + + private class ConsistentHashSelector { + private final static int DEFAULT_TOKENS_PER_INSTANCE = 1000; + private final SortedMap circle = new TreeMap(); + protected int instanceSize = 0; - private static class DefaultRetryReassigner implements RetryPolicy { - @Override - public Map> reassign(JobConfig jobCfg, JobContext jobCtx, - Collection instances, Map> origAssignment) { - // Compute an increasing integer ID for each instance - BiMap instanceMap = HashBiMap.create(instances.size()); - int instanceIndex = 0; - for (String instance : instances) { - instanceMap.put(instance, instanceIndex++); + public ConsistentHashSelector(List instances) { + for (String instance : instances) { + long tokenCount = DEFAULT_TOKENS_PER_INSTANCE; + add(instance, tokenCount); + instanceSize++; + } } - // Move partitions - Map> newAssignment = Maps.newHashMap(); - for (Map.Entry> e : origAssignment.entrySet()) { - String instance = e.getKey(); - SortedSet partitions = e.getValue(); - Integer instanceId = instanceMap.get(instance); - if (instanceId != null) { - for (int p : partitions) { - // Determine for each partition if there have been failures with the current assignment - // strategy, and if so, force a shift in assignment for that partition only - int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p); - int newInstanceId = (instanceId + shiftValue) % instances.size(); - String newInstance = instanceMap.inverse().get(newInstanceId); - if (newInstance == null) { - newInstance = instance; - } - if (!newAssignment.containsKey(newInstance)) { - newAssignment.put(newInstance, new TreeSet()); - } - newAssignment.get(newInstance).add(p); - } - } else { - // In case something goes wrong, just keep the previous assignment - newAssignment.put(instance, partitions); + public void add(String instance, long numberOfReplicas) { + for (int i = 0; i < numberOfReplicas; i++) { + circle.put(_hashFunction.hash(instance.hashCode(), i), instance); + } + } + + public void remove(String instance, long numberOfReplicas) { + for (int i = 0; i < numberOfReplicas; i++) { + circle.remove(_hashFunction.hash(instance.hashCode(), i)); } } - return newAssignment; - } - /** - * In case tasks fail, we may not want to schedule them in the same place. This method allows us - * to compute a shifting value so that we can systematically choose other instances to try - * @param jobCfg the job configuration - * @param jobCtx the job context - * @param instances instances that can be chosen - * @param p the partition to look up - * @return the shifting value - */ - private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx, - Collection instances, int p) { - int numAttempts = jobCtx.getPartitionNumAttempts(p); - int maxNumAttempts = jobCfg.getMaxAttemptsPerTask(); - int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1); - return numAttempts / (maxNumAttempts / numInstances); + public String get(long data) { + if (circle.isEmpty()) { + return null; + } + long hash = _hashFunction.hash(data); + if (!circle.containsKey(hash)) { + SortedMap tailMap = circle.tailMap(hash); + hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); + } + return circle.get(hash); + } } } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java b/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java similarity index 98% rename from helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java rename to helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java index 66566f8b32..3ccd1f409e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java +++ b/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java @@ -13,7 +13,7 @@ * limitations under the License. */ -package org.apache.helix.controller.rebalancer.strategy.crushMapping; +package org.apache.helix.util; public class JenkinsHash { // max value to limit it to 4 bytes diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java new file mode 100644 index 0000000000..0410db20d8 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java @@ -0,0 +1,171 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskResult; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.Workflow; +import org.apache.helix.tools.ClusterSetup; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testng.collections.Sets; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class TestGenericTaskAssignmentCalculator extends TaskTestBase { + private Set _invokedClasses = Sets.newHashSet(); + private Map _runCounts = Maps.newHashMap(); + private TaskConfig _taskConfig; + private Map _jobCommandMap; + + @BeforeClass + public void beforeClass() throws Exception { + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + + // Setup cluster and instances + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + setupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < _numNodes; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // start dummy participants + for (int i = 0; i < _numNodes; i++) { + final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + + // Set task callbacks + Map taskFactoryReg = new HashMap(); + + taskFactoryReg.put("TaskOne", new TaskFactory() { + @Override public Task createNewTask(TaskCallbackContext context) { + return new TaskOne(context, instanceName); + } + }); + + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", + new TaskStateModelFactory(_participants[i], taskFactoryReg)); + _participants[i].syncStart(); + } + + // Start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // Start an admin connection + _manager = HelixManagerFactory + .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + _driver = new TaskDriver(_manager); + + Map taskConfigMap = Maps.newHashMap(); + _taskConfig = new TaskConfig("TaskOne", taskConfigMap, false); + _jobCommandMap = Maps.newHashMap(); + } + + @Test + public void testMultipleJobAssignment() throws InterruptedException { + String workflowName = TestHelper.getTestMethodName(); + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName); + List taskConfigs = Lists.newArrayListWithCapacity(1); + taskConfigs.add(_taskConfig); + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(_jobCommandMap); + + for (int i = 0; i < 25; i++) { + workflowBuilder.addJob("JOB" + i, jobBuilder); + } + + _driver.start(workflowBuilder.build()); + _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); + + Assert.assertEquals(_runCounts.size(), 5); + } + + @Test + public void testMultipleTaskAssignment() throws InterruptedException { + String workflowName = TestHelper.getTestMethodName(); + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName); + + List taskConfigs = Lists.newArrayListWithCapacity(20); + for (int i = 0; i < 50; i++) { + Map taskConfigMap = Maps.newHashMap(); + taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap, false)); + } + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setCommand("DummyCommand").setJobCommandConfigMap(_jobCommandMap) + .addTaskConfigs(taskConfigs); + workflowBuilder.addJob("JOB", jobBuilder); + _driver.start(workflowBuilder.build()); + _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); + + Assert.assertEquals(_runCounts.size(), 5); + } + + private class TaskOne extends MockTask { + private final String _instanceName; + + public TaskOne(TaskCallbackContext context, String instanceName) { + super(context); + + // Initialize the count for this instance if not already done + if (!_runCounts.containsKey(instanceName)) { + _runCounts.put(instanceName, 0); + } + _instanceName = instanceName; + } + + @Override + public TaskResult run() { + _invokedClasses.add(getClass().getName()); + _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1); + return new TaskResult(TaskResult.Status.COMPLETED, ""); + } + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index 49b4bf438b..c4d588c879 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -215,12 +215,12 @@ public void beforeMethod() { } @Test public void testReassignment() throws Exception { - final int NUM_INSTANCES = 2; + final int NUM_INSTANCES = 5; String jobName = TestHelper.getTestMethodName(); Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); List taskConfigs = Lists.newArrayListWithCapacity(2); - Map taskConfigMap = Maps.newHashMap( - ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + _startPort)); + Map taskConfigMap = Maps.newHashMap(ImmutableMap + .of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1))); TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false); taskConfigs.add(taskConfig1); Map jobCommandMap = Maps.newHashMap(); @@ -242,7 +242,7 @@ public void beforeMethod() { // Ensure that this was tried on two different instances, the first of which exhausted the // attempts number, and the other passes on the first try - Assert.assertEquals(_runCounts.size(), NUM_INSTANCES); + Assert.assertEquals(_runCounts.size(), 2); Assert.assertTrue( _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES)); Assert.assertTrue(_runCounts.values().contains(1)); From 7bb2a9db2396a00bb9a721634a2432240679c657 Mon Sep 17 00:00:00 2001 From: Junkai Xue Date: Tue, 13 Sep 2016 16:00:08 -0700 Subject: [PATCH 2/3] Refactor TaskAssignmentCalculator API Refactoring TaskAssignmentCalculator API, since ClusterDataCache is too large and not all the contents inside are used. --- .../controller/stages/ClusterDataCache.java | 36 +++++++++++++------ .../FixedTargetTaskAssignmentCalculator.java | 35 +++++++----------- .../helix/task/FixedTargetTaskRebalancer.java | 4 +-- .../task/GenericTaskAssignmentCalculator.java | 6 ++-- .../helix/task/GenericTaskRebalancer.java | 4 +-- .../org/apache/helix/task/JobRebalancer.java | 11 +++--- .../helix/task/TaskAssignmentCalculator.java | 10 +++--- 7 files changed, 56 insertions(+), 50 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index dacf98dd16..c8ca941a85 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -453,21 +453,35 @@ public ClusterConstraints getConstraint(ConstraintType type) { } /** - * Return all the nodes that are enabled and tagged same as the job. - * @param allInstances List of instances to filter with instance tag - * @param instanceTag The instance group tag - * @return A new set contains instance name and that are marked enabled and have same - * tag with job. The original set will not be changed during the filtering + * Return all the live nodes that are enabled + * @return A new set contains live instance name and that are marked enabled */ - public Set getAllEnabledInstanceWithTag(final Set allInstances, - String instanceTag) { + public Set getAllEnabledLiveInstances() { + return getAllEnabledInstances(null); + } + + /** + * Return all the live nodes that are enabled and tagged same as the job. + * @param instanceTag The instance group tag, could be null, when no instance group specified + * @return A new set contains live instance name and that are marked enabled and have same + * tag with job, only if instance tag input is not null. + */ + public Set getAllEnabledLiveInstancesWithTag(String instanceTag) { + return getAllEnabledInstances(instanceTag); + } + + private Set getAllEnabledInstances(String instanceTag) { Set enabledTagInstances = new HashSet(); - for (String instance : allInstances) { + for (String instance : _liveInstanceMap.keySet()) { InstanceConfig instanceConfig = _instanceConfigMap.get(instance); - if (instanceConfig != null && instanceConfig.getInstanceEnabled() && instanceConfig - .containsTag(instanceTag)) { - enabledTagInstances.add(instance); + // Check instance is enabled + if (instanceConfig != null && instanceConfig.getInstanceEnabled()) { + // Check whether it has instance group or not + // If it has instance group, check whether it belongs to that group or not + if (instanceTag == null || instanceConfig.containsTag(instanceTag)) { + enabledTagInstances.add(instance); + } } } diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java index 09db616c1b..0768b51b12 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java @@ -48,36 +48,37 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator { private static final Logger LOG = Logger.getLogger(FixedTargetTaskAssignmentCalculator.class); - @Override - public Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { - return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx); + @Override public Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Map idealStateMap) { + return getAllTaskPartitions(getTgtIdealState(jobCfg, idealStateMap), jobCfg, jobCtx); } @Override public Map> getTaskAssignment(CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment, Collection instances, JobConfig jobCfg, JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, - Set partitionSet, ClusterDataCache cache) { - IdealState tgtIs = getTgtIdealState(jobCfg, cache); + Set partitionSet, Map idealStateMap) { + IdealState tgtIs = getTgtIdealState(jobCfg, idealStateMap); if (tgtIs == null) { LOG.warn("Missing target resource for the scheduled job!"); return Collections.emptyMap(); } Set tgtStates = jobCfg.getTargetPartitionStates(); return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet, - jobContext, cache); + jobContext); } /** * Gets the ideal state of the target resource of this job * @param jobCfg job config containing target resource id - * @param cache snapshot of the cluster containing the task and target resource + * @param idealStateMap the map of resource name map to ideal state * @return target resource ideal state, or null */ - private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) { + private static IdealState getTgtIdealState(JobConfig jobCfg, + Map idealStateMap) { String tgtResourceId = jobCfg.getTargetResource(); - return cache.getIdealState(tgtResourceId); + return idealStateMap.get(tgtResourceId); } /** @@ -131,7 +132,7 @@ private static List getPartitionsForTargetPartition(String targetPartit */ private static Map> getTgtPartitionAssignment( CurrentStateOutput currStateOutput, Iterable instances, IdealState tgtIs, - Set tgtStates, Set includeSet, JobContext jobCtx, ClusterDataCache cache) { + Set tgtStates, Set includeSet, JobContext jobCtx) { Map> result = new HashMap>(); for (String instance : instances) { result.put(instance, new TreeSet()); @@ -153,18 +154,6 @@ private static Map> getTgtPartitionAssignment( continue; } - InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance); - - if (instanceConfig == null) { - LOG.error("Instance config not found for instance : " + instance); - continue; - } - - if (!instanceConfig.getInstanceEnabled()) { - LOG.debug("Instance has been disabled, ignore instance : " + instance); - continue; - } - String s = currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName), instance); diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java index 569fe03c44..1589c1afd4 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java @@ -43,7 +43,7 @@ @Override public Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { return taskAssignmentCalculator - .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache); + .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates()); } @Override public Map> getTaskAssignment( @@ -53,6 +53,6 @@ ClusterDataCache cache) { return taskAssignmentCalculator .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext, - workflowCfg, workflowCtx, partitionSet, cache); + workflowCfg, workflowCtx, partitionSet, cache.getIdealStates()); } } diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java index fbc7af336b..58ba670df8 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java @@ -34,6 +34,7 @@ import org.apache.helix.HelixException; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.util.JenkinsHash; @@ -52,7 +53,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator { @Override public Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Map idealStateMap) { Map taskMap = jobCfg.getTaskConfigMap(); Map taskIdMap = jobCtx.getTaskIdPartitionMap(); for (TaskConfig taskCfg : taskMap.values()) { @@ -69,7 +71,7 @@ public Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, public Map> getTaskAssignment(CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment, Collection instances, JobConfig jobCfg, final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, - Set partitionSet, ClusterDataCache cache) { + Set partitionSet, Map idealStateMap) { // Gather input to the full auto rebalancing algorithm LinkedHashMap states = new LinkedHashMap(); states.put("ONLINE", 1); diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java index 6a005b9d0b..1720fbb4a8 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java @@ -42,7 +42,7 @@ public class GenericTaskRebalancer extends DeprecatedTaskRebalancer { public Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { return taskAssignmentCalculator - .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache); + .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates()); } @Override @@ -52,6 +52,6 @@ public Map> getTaskAssignment(CurrentStateOutput curr Set partitionSet, ClusterDataCache cache) { return taskAssignmentCalculator .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext, - workflowCfg, workflowCtx, partitionSet, cache); + workflowCfg, workflowCtx, partitionSet, cache.getIdealStates()); } } diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 378ad95665..cf7f5e64a7 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -131,9 +131,8 @@ public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clu // Fetch the previous resource assignment from the property store. This is required because of // HELIX-230. Set liveInstances = jobCfg.getInstanceGroupTag() == null - ? clusterData.getLiveInstances().keySet() - : clusterData.getAllEnabledInstanceWithTag(clusterData.getLiveInstances().keySet(), - jobCfg.getInstanceGroupTag()); + ? clusterData.getAllEnabledLiveInstances() + : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag()); if (liveInstances.isEmpty()) { LOG.error("No available instance found for job!"); @@ -222,8 +221,8 @@ private ResourceAssignment computeResourceMapping(String jobResource, // Process all the current assignments of tasks. TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg); - Set allPartitions = - taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache); + Set allPartitions = taskAssignmentCal + .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache.getIdealStates()); if (allPartitions == null || allPartitions.isEmpty()) { // Empty target partitions, mark the job as FAILED. @@ -424,7 +423,7 @@ private ResourceAssignment computeResourceMapping(String jobResource, // Get instance->[partition, ...] mappings for the target resource. Map> tgtPartitionAssignments = taskAssignmentCal .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx, - workflowConfig, workflowCtx, allPartitions, cache); + workflowConfig, workflowCtx, allPartitions, cache.getIdealStates()); for (Map.Entry> entry : taskAssignments.entrySet()) { String instance = entry.getKey(); if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java index a3ed5ab851..a6a9ed3bdf 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java @@ -2,6 +2,7 @@ import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; import org.apache.helix.model.ResourceAssignment; import java.util.Collection; @@ -17,11 +18,12 @@ public abstract class TaskAssignmentCalculator { * @param jobCtx the task context * @param workflowCfg the workflow configuration * @param workflowCtx the workflow context - * @param cache cluster snapshot + * @param idealStateMap the map of resource name map to ideal state * @return set of partition numbers */ public abstract Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache); + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Map idealStateMap); /** * Compute an assignment of tasks to instances @@ -34,12 +36,12 @@ public abstract Set getAllTaskPartitions(JobConfig jobCfg, JobContext j * @param workflowCfg the workflow configuration * @param workflowCtx the workflow context * @param partitionSet the partitions to assign - * @param cache cluster snapshot + * @param idealStateMap the map of resource name map to ideal state * @return map of instances to set of partition numbers */ public abstract Map> getTaskAssignment( CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment, Collection instances, JobConfig jobCfg, JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set partitionSet, - ClusterDataCache cache); + Map idealStateMap); } From 9fc6c540bbcb4d7c71f0b7fe89e2acbc5955e859 Mon Sep 17 00:00:00 2001 From: Junkai Xue Date: Tue, 13 Sep 2016 16:01:39 -0700 Subject: [PATCH 3/3] Job Config and logic refactoring 1. Support identical task initialization with job command and number of tasks 2. Remove unused MaxForcedReassignmentPerTask field 3. Refactor logics of failure. --- .../java/org/apache/helix/task/JobConfig.java | 52 ++++++++++++++----- .../org/apache/helix/task/JobRebalancer.java | 19 +------ .../org/apache/helix/task/TaskConfig.java | 50 +++++++++--------- .../org/apache/helix/task/beans/JobBean.java | 2 +- .../org/apache/helix/task/beans/TaskBean.java | 1 + .../TestGenericTaskAssignmentCalculator.java | 4 +- .../task/TestIndependentTaskRebalancer.java | 45 +++------------- .../task/TestUserContentStore.java | 6 +-- 8 files changed, 81 insertions(+), 98 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index 7a4e2d3b8d..a966f35c16 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -83,6 +83,7 @@ protected enum JobConfigProperty { * The maximum number of times the task rebalancer may attempt to execute a task. */ MaxAttemptsPerTask, + @Deprecated /** * The maximum number of times Helix will intentionally move a failing task */ @@ -134,6 +135,7 @@ protected enum JobConfigProperty { public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0; public static final boolean DEFAULT_DISABLE_EXTERNALVIEW = false; public static final boolean DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE = false; + public static final int DEFAULT_NUMBER_OF_TASKS = 0; private final String _workflow; private final String _targetResource; @@ -218,10 +220,6 @@ public int getMaxAttemptsPerTask() { return _maxAttemptsPerTask; } - public int getMaxForcedReassignmentsPerTask() { - return _maxForcedReassignmentsPerTask; - } - public int getFailureThreshold() { return _failureThreshold; } @@ -308,6 +306,8 @@ public static JobConfig fromHelixProperty(HelixProperty property) * A builder for {@link JobConfig}. Validates the configurations. */ public static class Builder { + private final String NUMBER_OF_TASKS = "NumberOfTasks"; + private String _workflow; private String _targetResource; private String _jobType; @@ -325,10 +325,18 @@ public static class Builder { private long _retryDelay = DEFAULT_TASK_RETRY_DELAY; private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW; private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE; + private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS; public JobConfig build() { validate(); + if (_taskConfigMap.isEmpty()) { + for (int i = 0; i < _numberOfTasks; i++) { + TaskConfig taskConfig = new TaskConfig(null, null); + _taskConfigMap.put(taskConfig.getId(), taskConfig); + } + } + return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates, _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance, _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay, @@ -376,10 +384,6 @@ public static Builder fromMap(Map cfg) { b.setMaxAttemptsPerTask( Integer.parseInt(cfg.get(JobConfigProperty.MaxAttemptsPerTask.name()))); } - if (cfg.containsKey(JobConfigProperty.MaxForcedReassignmentsPerTask.name())) { - b.setMaxForcedReassignmentsPerTask( - Integer.parseInt(cfg.get(JobConfigProperty.MaxForcedReassignmentsPerTask.name()))); - } if (cfg.containsKey(JobConfigProperty.FailureThreshold.name())) { b.setFailureThreshold( Integer.parseInt(cfg.get(JobConfigProperty.FailureThreshold.name()))); @@ -429,6 +433,11 @@ public Builder setCommand(String v) { return this; } + public Builder setNumberOfTasks(int v) { + _numberOfTasks = v; + return this; + } + public Builder setJobCommandConfigMap(Map v) { _commandConfig = v; return this; @@ -449,6 +458,8 @@ public Builder setMaxAttemptsPerTask(int v) { return this; } + // This field will be ignored by Helix + @Deprecated public Builder setMaxForcedReassignmentsPerTask(int v) { _maxForcedReassignmentsPerTask = v; return this; @@ -508,9 +519,25 @@ private void validate() { throw new IllegalArgumentException( String.format("%s cannot be an empty set", JobConfigProperty.TargetPartitionStates)); } - if (_taskConfigMap.isEmpty() && _command == null) { - throw new IllegalArgumentException( - String.format("%s cannot be null", JobConfigProperty.Command)); + if (_taskConfigMap.isEmpty()) { + // Check Job command is not null when none taskconfig specified + if (_command == null) { + throw new IllegalArgumentException( + String.format("%s cannot be null", JobConfigProperty.Command)); + } + // Check number of task is set when Job command is not null and none taskconfig specified + if (_targetResource == null && _numberOfTasks == 0) { + throw new IllegalArgumentException("Either targetResource or numberOfTask should be set"); + } + } + // Check each either Job command is not null or none of task command is not null + if (_command == null) { + for (TaskConfig taskConfig : _taskConfigMap.values()) { + if (taskConfig.getCommand() == null) { + throw new IllegalArgumentException( + String.format("Task % command cannot be null", taskConfig.getId())); + } + } } if (_timeoutPerTask < 0) { throw new IllegalArgumentException(String @@ -547,12 +574,11 @@ public static Builder from(JobBean jobBean) { Builder b = new Builder(); b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask) - .setMaxForcedReassignmentsPerTask(jobBean.maxForcedReassignmentsPerTask) .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance) .setTimeoutPerTask(jobBean.timeoutPerPartition) .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay) .setDisableExternalView(jobBean.disableExternalView) - .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure); + .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure).setNumberOfTasks(jobBean.numberOfTasks); if (jobBean.jobCommandConfigMap != null) { b.setJobCommandConfigMap(jobBean.jobCommandConfigMap); diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index cf7f5e64a7..7676dab64c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -345,24 +345,9 @@ private ResourceAssignment computeResourceMapping(String jobResource, // maximum number of attempts or task is in ABORTED state. if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() || currState.equals(TaskPartitionState.TASK_ABORTED)) { - // If the user does not require this task to succeed in order for the job to succeed, - // then we don't have to fail the job right now - boolean successOptional = false; - String taskId = jobCtx.getTaskIdForPartition(pId); - if (taskId != null) { - TaskConfig taskConfig = jobCfg.getTaskConfig(taskId); - if (taskConfig != null) { - successOptional = taskConfig.isSuccessOptional(); - } - } - - // Similarly, if we have some leeway for how many tasks we can fail, then we don't have + // If we have some leeway for how many tasks we can fail, then we don't have // to fail the job immediately - if (skippedPartitions.size() < jobCfg.getFailureThreshold()) { - successOptional = true; - } - - if (!successOptional) { + if (skippedPartitions.size() >= jobCfg.getFailureThreshold()) { markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx); markAllPartitionsError(jobCtx, currState, false); addAllPartitions(allPartitions, partitionsToDropFromIs); diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java index b990f99f47..621d3714ea 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java @@ -36,6 +36,7 @@ public class TaskConfig { private enum TaskConfigProperty { TASK_ID, TASK_COMMAND, + @Deprecated TASK_SUCCESS_OPTIONAL, TASK_TARGET_PARTITION } @@ -44,18 +45,26 @@ private enum TaskConfigProperty { private final Map _configMap; + @Deprecated + public TaskConfig(String command, Map configMap, boolean successOptional, + String id, String target) { + this(command, configMap, id, target); + } + + @Deprecated + public TaskConfig(String command, Map configMap, boolean successOptional) { + this(command, configMap, null, null); + } + /** * Instantiate the task config * * @param command the command to invoke for the task * @param configMap configuration to be passed as part of the invocation - * @param successOptional true if this task need not pass for the job to succeed, false - * otherwise * @param id existing task ID * @param target target partition for a task */ - public TaskConfig(String command, Map configMap, boolean successOptional, - String id, String target) { + public TaskConfig(String command, Map configMap, String id, String target) { if (configMap == null) { configMap = Maps.newHashMap(); } @@ -65,8 +74,6 @@ public TaskConfig(String command, Map configMap, boolean success if (command != null) { configMap.put(TaskConfigProperty.TASK_COMMAND.name(), command); } - configMap - .put(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name(), Boolean.toString(successOptional)); configMap.put(TaskConfigProperty.TASK_ID.name(), id); if (target != null) { configMap.put(TaskConfigProperty.TASK_TARGET_PARTITION.name(), target); @@ -79,11 +86,9 @@ public TaskConfig(String command, Map configMap, boolean success * * @param command the command to invoke for the task * @param configMap configuration to be passed as part of the invocation - * @param successOptional true if this task need not pass for the job to succeed, false - * otherwise */ - public TaskConfig(String command, Map configMap, boolean successOptional) { - this(command, configMap, successOptional, null, null); + public TaskConfig(String command, Map configMap) { + this(command, configMap, null, null); } /** @@ -115,16 +120,13 @@ public String getTargetPartition() { /** * Check if this task must succeed for a job to succeed - * + * This field has been ignored by Helix * @return true if success is optional, false otherwise */ + @Deprecated public boolean isSuccessOptional() { - String successOptionalStr = _configMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name()); - if (successOptionalStr == null) { - return false; - } else { - return Boolean.parseBoolean(successOptionalStr); - } + // This option will not be used in rebalancer anymore, deprecate it. + return true; } /** @@ -154,7 +156,7 @@ public static class Builder { private Map _configMap; public TaskConfig build() { - return new TaskConfig(_command, _configMap, _successOptional, _taskId, _targetPartition); + return new TaskConfig(_command, _configMap, _taskId, _targetPartition); } public String getTaskId() { @@ -184,10 +186,12 @@ public Builder setTargetPartition(String targetPartition) { return this; } + @Deprecated public boolean isSuccessOptional() { return _successOptional; } + @Deprecated public Builder setSuccessOptional(boolean successOptional) { _successOptional = successOptional; return this; @@ -208,7 +212,7 @@ public Builder addConfig(String key, String value) { * @return instantiated TaskConfig */ public static TaskConfig from(String target) { - return new TaskConfig(null, null, false, null, target); + return new TaskConfig(null, null, null, target); } /** @@ -218,7 +222,7 @@ public static TaskConfig from(String target) { * @return instantiated TaskConfig */ public static TaskConfig from(TaskBean bean) { - return new TaskConfig(bean.command, bean.taskConfigMap, bean.successOptional); + return new TaskConfig(bean.command, bean.taskConfigMap); } /** @@ -232,11 +236,7 @@ public static TaskConfig from(Map rawConfigMap) { String taskId = rawConfigMap.get(TaskConfigProperty.TASK_ID.name()); String command = rawConfigMap.get(TaskConfigProperty.TASK_COMMAND.name()); String targetPartition = rawConfigMap.get(TaskConfigProperty.TASK_TARGET_PARTITION.name()); - String successOptionalStr = - rawConfigMap.get(TaskConfigProperty.TASK_SUCCESS_OPTIONAL.name()); - boolean successOptional = - (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : false; - return new TaskConfig(command, rawConfigMap, successOptional, taskId, targetPartition); + return new TaskConfig(command, rawConfigMap, taskId, targetPartition); } } } diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index dd7ebab781..9a376f890d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -41,9 +41,9 @@ public class JobBean { public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK; public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK; - public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK; public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD; public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY; public boolean disableExternalView = JobConfig.DEFAULT_DISABLE_EXTERNALVIEW; public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE; + public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS; } diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java index 97ecfc0faf..a61556bf69 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java @@ -29,5 +29,6 @@ public class TaskBean { public String command; public Map taskConfigMap; + @Deprecated public boolean successOptional = false; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java index 0410db20d8..564500946a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java @@ -104,7 +104,7 @@ public void beforeClass() throws Exception { _driver = new TaskDriver(_manager); Map taskConfigMap = Maps.newHashMap(); - _taskConfig = new TaskConfig("TaskOne", taskConfigMap, false); + _taskConfig = new TaskConfig("TaskOne", taskConfigMap); _jobCommandMap = Maps.newHashMap(); } @@ -136,7 +136,7 @@ public void testMultipleTaskAssignment() throws InterruptedException { List taskConfigs = Lists.newArrayListWithCapacity(20); for (int i = 0; i < 50; i++) { Map taskConfigMap = Maps.newHashMap(); - taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap, false)); + taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap)); } JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand").setJobCommandConfigMap(_jobCommandMap) diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index c4d588c879..64b9073e53 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -136,8 +136,8 @@ public void beforeMethod() { String jobName = TestHelper.getTestMethodName(); Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); List taskConfigs = Lists.newArrayListWithCapacity(2); - TaskConfig taskConfig1 = new TaskConfig("TaskOne", null, true); - TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true); + TaskConfig taskConfig1 = new TaskConfig("TaskOne", null); + TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null); taskConfigs.add(taskConfig1); taskConfigs.add(taskConfig2); Map jobCommandMap = Maps.newHashMap(); @@ -164,8 +164,8 @@ public void beforeMethod() { Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); List taskConfigs = Lists.newArrayListWithCapacity(2); Map taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true)); - TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false); - TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false); + TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap); + TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null); taskConfigs.add(taskConfig1); taskConfigs.add(taskConfig2); Map jobConfigMap = Maps.newHashMap(); @@ -185,35 +185,6 @@ public void beforeMethod() { Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName())); } - @Test public void testOptionalTaskFailure() throws Exception { - // Create a job with two different tasks - String jobName = TestHelper.getTestMethodName(); - Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); - List taskConfigs = Lists.newArrayListWithCapacity(2); - Map taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true)); - TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, true); - TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false); - taskConfigs.add(taskConfig1); - taskConfigs.add(taskConfig2); - Map jobCommandMap = Maps.newHashMap(); - jobCommandMap.put("Timeout", "1000"); - - JobConfig.Builder jobBuilder = - new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs) - .setJobCommandConfigMap(jobCommandMap); - workflowBuilder.addJob(jobName, jobBuilder); - - _driver.start(workflowBuilder.build()); - - // Ensure the job completes - _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS); - _driver.pollForWorkflowState(jobName, TaskState.COMPLETED); - - // Ensure that each class was invoked - Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); - Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName())); - } - @Test public void testReassignment() throws Exception { final int NUM_INSTANCES = 5; String jobName = TestHelper.getTestMethodName(); @@ -221,13 +192,13 @@ public void beforeMethod() { List taskConfigs = Lists.newArrayListWithCapacity(2); Map taskConfigMap = Maps.newHashMap(ImmutableMap .of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1))); - TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false); + TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap); taskConfigs.add(taskConfig1); Map jobCommandMap = Maps.newHashMap(); jobCommandMap.put("Timeout", "1000"); JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand") - .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs) + .addTaskConfigs(taskConfigs) .setJobCommandConfigMap(jobCommandMap); workflowBuilder.addJob(jobName, jobBuilder); @@ -254,7 +225,7 @@ public void testOneTimeScheduled() throws Exception { Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); List taskConfigs = Lists.newArrayListWithCapacity(1); Map taskConfigMap = Maps.newHashMap(); - TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false); + TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap); taskConfigs.add(taskConfig1); Map jobCommandMap = Maps.newHashMap(); jobCommandMap.put("Timeout", "1000"); @@ -289,7 +260,7 @@ public void testDelayedRetry() throws Exception { Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); List taskConfigs = Lists.newArrayListWithCapacity(1); Map taskConfigMap = Maps.newHashMap(); - TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false); + TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap); taskConfigs.add(taskConfig1); Map jobCommandMap = Maps.newHashMap(); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java index b2b27efa9f..13cd5311ac 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java @@ -123,7 +123,7 @@ public void testWorkflowAndJobTaskUserContentStore() throws InterruptedException Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); List taskConfigs = Lists.newArrayListWithCapacity(1); Map taskConfigMap = Maps.newHashMap(); - TaskConfig taskConfig1 = new TaskConfig("ContentStoreTask", taskConfigMap, false); + TaskConfig taskConfig1 = new TaskConfig("ContentStoreTask", taskConfigMap); taskConfigs.add(taskConfig1); Map jobCommandMap = Maps.newHashMap(); jobCommandMap.put("Timeout", "1000"); @@ -148,8 +148,8 @@ public void testJobContentPutAndGetWithDependency() throws InterruptedException List taskConfigs2 = Lists.newArrayListWithCapacity(1); Map taskConfigMap1 = Maps.newHashMap(); Map taskConfigMap2 = Maps.newHashMap(); - TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap1, false); - TaskConfig taskConfig2 = new TaskConfig("TaskTwo", taskConfigMap2, false); + TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap1); + TaskConfig taskConfig2 = new TaskConfig("TaskTwo", taskConfigMap2); taskConfigs1.add(taskConfig1); taskConfigs2.add(taskConfig2);