Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Framework IdealState Removal #1326

Merged
merged 8 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
* under the License.
*/

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

import org.apache.helix.HelixProperty;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
Expand All @@ -33,7 +35,10 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.WorkflowConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,28 +60,54 @@ public void process(ClusterEvent event) throws Exception {
throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
}

Map<String, IdealState> idealStates = cache.getIdealStates();

Map<String, Resource> resourceMap = new LinkedHashMap<>();
Map<String, Resource> resourceToRebalance = new LinkedHashMap<>();

Map<String, IdealState> idealStates = cache.getIdealStates();
boolean isTaskCache = cache instanceof WorkflowControllerDataProvider;

processIdealStates(cache, resourceMap, resourceToRebalance, idealStates, isTaskCache);

// Add TaskFramework resources from workflow and job configs as Task Framework will no longer
// use IdealState
if (isTaskCache) {
WorkflowControllerDataProvider taskDataCache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
processWorkflowConfigs(taskDataCache, resourceMap, resourceToRebalance);
processJobConfigs(taskDataCache, resourceMap, resourceToRebalance, idealStates);
}

// It's important to get partitions from CurrentState as well since the
// idealState might be removed.
processCurrentStates(cache, resourceMap, resourceToRebalance, idealStates, isTaskCache);

event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceToRebalance);
}

/*
* Construct Resources based on IdealStates and add them to resource maps
*/
private void processIdealStates(BaseControllerDataProvider cache,
Map<String, Resource> resourceMap, Map<String, Resource> resourceToRebalance,
Map<String, IdealState> idealStates, boolean isTaskCache) {
if (idealStates != null && idealStates.size() > 0) {
for (IdealState idealState : idealStates.values()) {
if (idealState == null) {
continue;
}

Set<String> partitionSet = idealState.getPartitionSet();
String resourceName = idealState.getResourceName();
if (!resourceMap.containsKey(resourceName)) {
Resource resource = new Resource(resourceName, cache.getClusterConfig(),
cache.getResourceConfig(resourceName));
resourceMap.put(resourceName, resource);

if (!idealState.isValid() && !isTaskCache
|| idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && isTaskCache
|| !idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && !isTaskCache) {
// If this is a resource pipeline and the IdealState is invalid or has a non-task
// stateModelDef, add it to resourceToRebalance
if (!isTaskCache && (!idealState.isValid() || !idealState.getStateModelDefRef()
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
.equals(TaskConstants.STATE_MODEL_NAME))) {
resourceToRebalance.put(resourceName, resource);
}
resource.setStateModelDefRef(idealState.getStateModelDefRef());
Expand All @@ -97,9 +128,59 @@ public void process(ClusterEvent event) throws Exception {
}
}
}
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
}

// It's important to get partitions from CurrentState as well since the
// idealState might be removed.
/*
* Construct Resources based on WorkflowConfigs and add them to the two resource maps
*/
private void processWorkflowConfigs(WorkflowControllerDataProvider taskDataCache, Map<String, Resource> resourceMap,
Map<String, Resource> resourceToRebalance) {
for (Map.Entry<String, WorkflowConfig> workflowConfigEntry : taskDataCache
.getWorkflowConfigMap().entrySet()) {
// The resource could have been created by IS - always overwrite with config values
String resourceName = workflowConfigEntry.getKey();
WorkflowConfig workflowConfig = workflowConfigEntry.getValue();
addResourceConfigToResourceMap(resourceName, workflowConfig, taskDataCache.getClusterConfig(),
resourceMap, resourceToRebalance);
addPartition(resourceName, resourceName, resourceMap);
}
}

/*
* Construct Resources based on JobConfigs and add them to the two resource maps
*/
private void processJobConfigs(WorkflowControllerDataProvider taskDataCache, Map<String, Resource> resourceMap,
Map<String, Resource> resourceToRebalance, Map<String, IdealState> idealStates) {
for (Map.Entry<String, JobConfig> jobConfigEntry : taskDataCache.getJobConfigMap()
.entrySet()) {
// always overwrite, because the resource could be created by IS
String resourceName = jobConfigEntry.getKey();
JobConfig jobConfig = jobConfigEntry.getValue();
addResourceConfigToResourceMap(resourceName, jobConfig, taskDataCache.getClusterConfig(),
resourceMap, resourceToRebalance);
int numPartitions = jobConfig.getTaskConfigMap().size();
// If there is no task config, this is a targeted job. We get task counts based on target
// resource IdealState
if (numPartitions == 0 && idealStates != null) {
IdealState targetIs = idealStates.get(jobConfig.getTargetResource());
if (targetIs == null) {
LOG.warn("Target resource " + jobConfig.getTargetResource() + " does not exist for job " + resourceName);
} else {
numPartitions = targetIs.getPartitionSet().size();
}
}
for (int i = 0; i < numPartitions; i++) {
addPartition(resourceName + "_" + i, resourceName, resourceMap);
}
}
}

/*
* Construct Resources based on CurrentStates and add them to resource maps
*/
private void processCurrentStates(BaseControllerDataProvider cache,
Map<String, Resource> resourceMap, Map<String, Resource> resourceToRebalance,
Map<String, IdealState> idealStates, boolean isTaskCache) throws StageException {
Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved

if (availableInstances != null && availableInstances.size() > 0) {
Expand All @@ -124,17 +205,15 @@ public void process(ClusterEvent event) throws Exception {

// don't overwrite ideal state settings
if (!resourceMap.containsKey(resourceName)) {
addResource(resourceName, resourceMap);
Resource resource = resourceMap.get(resourceName);
Resource resource = new Resource(resourceName);
resource.setStateModelDefRef(currentState.getStateModelDefRef());
resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
resource.setBucketSize(currentState.getBucketSize());
resource.setBatchMessageMode(currentState.getBatchMessageMode());
if (resource.getStateModelDefRef() == null && !isTaskCache
|| resource.getStateModelDefRef() != null && (
resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && isTaskCache
|| !resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)
&& !isTaskCache)) {
// if state model def is null, it's added during resource pipeline; if it's not null,
// it's added when it matches the pipeline type
if (isTaskCache == TaskConstants.STATE_MODEL_NAME
.equals(resource.getStateModelDefRef())) {
resourceToRebalance.put(resourceName, resource);
}

Expand All @@ -143,15 +222,16 @@ public void process(ClusterEvent event) throws Exception {
resource.setResourceGroupName(idealState.getResourceGroupName());
resource.setResourceTag(idealState.getInstanceGroupTag());
}
resourceMap.put(resourceName, resource);
}

if (currentState.getStateModelDefRef() == null) {
LogUtil.logError(LOG, _eventId,
"state model def is null." + "resource:" + currentState.getResourceName()
+ ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: "
+ currentState.getPartitionStateMap().values());
throw new StageException("State model def is null for resource:"
+ currentState.getResourceName());
throw new StageException(
"State model def is null for resource:" + currentState.getResourceName());
}

for (String partition : resourceStateMap.keySet()) {
Expand All @@ -160,18 +240,6 @@ public void process(ClusterEvent event) throws Exception {
}
}
}

event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceToRebalance);
}

private void addResource(String resource, Map<String, Resource> resourceMap) {
if (resource == null || resourceMap == null) {
return;
}
if (!resourceMap.containsKey(resource)) {
resourceMap.put(resource, new Resource(resource));
}
}

private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap) {
Expand All @@ -185,4 +253,21 @@ private void addPartition(String partition, String resourceName, Map<String, Res
resource.addPartition(partition);

}

private void addResourceConfigToResourceMap(String resourceName, ResourceConfig resourceConfig,
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
ClusterConfig clusterConfig, Map<String, Resource> resourceMap,
Map<String, Resource> resourceToRebalance) {
Resource resource = new Resource(resourceName, clusterConfig, resourceConfig);
resourceMap.put(resourceName, resource);
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
resource.setStateModelDefRef(TaskConstants.STATE_MODEL_NAME);
resource.setStateModelFactoryName(resourceConfig.getStateModelFactoryName());
boolean batchMessageMode = resourceConfig.getBatchMessageMode();
if (clusterConfig != null) {
batchMessageMode |= clusterConfig.getBatchMessageMode();
}
resource.setBatchMessageMode(batchMessageMode);
resource.setResourceGroupName(resourceConfig.getResourceGroupName());
resource.setResourceTag(resourceConfig.getInstanceGroupTag());
resourceToRebalance.put(resourceName, resource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,20 @@
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.task.AssignableInstanceManager;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskRebalancer;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.task.WorkflowDispatcher;
import org.apache.helix.task.assigner.AssignableInstance;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -110,117 +104,17 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource
restOfResources.remove(jobName);
}

// Current rest of resources including: only current state left over ones
// Original resource map contains workflows + jobs + other invalid resources
// After removing workflows + jobs, only leftover ones will go over old rebalance pipeline.
for (Resource resource : restOfResources.values()) {
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
if (!computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output)) {
failureResources.add(resource.getResourceName());
LogUtil.logWarn(logger, _eventId,
"Failed to calculate best possible states for " + resource.getResourceName());
}
// Jobs that exist in current states but are missing corresponding JobConfigs or WorkflowConfigs
// or WorkflowContexts need to be cleaned up. Note that restOfResources can only be jobs,
// because workflow resources are created based on Configs only - workflows don't have
// CurrentStates
for (String resourceName : restOfResources.keySet()) {
_workflowDispatcher.processJobForDrop(resourceName, currentStateOutput, output);
}

return output;
}


private boolean computeResourceBestPossibleState(ClusterEvent event, WorkflowControllerDataProvider cache,
CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output) {
// for each ideal state
// read the state model def
// for each resource
// get the preference list
// for each instanceName check if its alive then assign a state

String resourceName = resource.getResourceName();
LogUtil.logDebug(logger, _eventId, "Processing resource:" + resourceName);
// Ideal state may be gone. In that case we need to get the state model name
// from the current state
IdealState idealState = cache.getIdealState(resourceName);
if (idealState == null) {
// if ideal state is deleted, use an empty one
LogUtil.logInfo(logger, _eventId, "resource:" + resourceName + " does not exist anymore");
idealState = new IdealState(resourceName);
idealState.setStateModelDefRef(resource.getStateModelDefRef());
}

// Skip the resources are not belonging to task pipeline
if (!idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
LogUtil.logWarn(logger, _eventId, String
.format("Resource %s should not be processed by %s pipeline", resourceName,
cache.getPipelineName()));
return false;
}

Rebalancer rebalancer = null;
String rebalancerClassName = idealState.getRebalancerClassName();
if (rebalancerClassName != null) {
if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId,
"resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
}
try {
rebalancer = Rebalancer.class
.cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
} catch (Exception e) {
LogUtil.logError(logger, _eventId,
"Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
}
}

MappingCalculator mappingCalculator = null;
if (rebalancer != null) {
try {
mappingCalculator = MappingCalculator.class.cast(rebalancer);
} catch (ClassCastException e) {
LogUtil.logWarn(logger, _eventId,
"Rebalancer does not have a mapping calculator, defaulting to SEMI_AUTO, resource: "
+ resourceName);
}
} else {
// Create dummy rebalancer for dropping existing current states
rebalancer = new SemiAutoRebalancer();
mappingCalculator = new SemiAutoRebalancer();
}

if (rebalancer instanceof TaskRebalancer) {
TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer);
taskRebalancer.setClusterStatusMonitor(
(ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name()));
}
ResourceAssignment partitionStateAssignment = null;
try {
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
rebalancer.init(manager);
partitionStateAssignment = mappingCalculator
.computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput);
_workflowDispatcher.updateBestPossibleStateOutput(resource.getResourceName(), partitionStateAssignment, output);

// Check if calculation is done successfully
return true;
} catch (Exception e) {
LogUtil
.logError(logger, _eventId, "Error computing assignment for resource " + resourceName + ". Skipping.", e);
// TODO : remove this part after debugging NPE
StringBuilder sb = new StringBuilder();

sb.append(String
.format("HelixManager is null : %s\n", event.getAttribute("helixmanager") == null));
sb.append(String.format("Rebalancer is null : %s\n", rebalancer == null));
sb.append(String.format("Calculated idealState is null : %s\n", idealState == null));
sb.append(String.format("MappingCaculator is null : %s\n", mappingCalculator == null));
sb.append(
String.format("PartitionAssignment is null : %s\n", partitionStateAssignment == null));
sb.append(String.format("Output is null : %s\n", output == null));

LogUtil.logError(logger, _eventId, sb.toString());
}

// Exception or rebalancer is not found
return false;
}

class WorkflowObject implements Comparable<WorkflowObject> {
String _workflowId;
Long _rankingValue;
Expand Down