Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Framework IdealState Removal #1326

Merged
merged 8 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public void scheduleRebalance(HelixManager manager, String resource, long startT
long delay = startTime - System.currentTimeMillis();
if (delay < 0) {
LOG.debug(String.format("Delay time is %s, will not be scheduled", delay));
return;
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
}
LOG.info("Schedule rebalance for resource : {} at time: {} delay: {}", resource, startTime,
delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public enum AttributeName {
RESOURCES,
RESOURCES_TO_REBALANCE,
TASK_RESOURCES_TO_DROP,
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
BEST_POSSIBLE_STATE,
CURRENT_STATE,
CUSTOMIZED_STATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.WorkflowConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,6 +62,7 @@ public void process(ClusterEvent event) throws Exception {

Map<String, Resource> resourceMap = new LinkedHashMap<>();
Map<String, Resource> resourceToRebalance = new LinkedHashMap<>();
Map<String, Resource> taskResourcesToDrop = new LinkedHashMap<>();
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved

boolean isTaskCache = cache instanceof WorkflowControllerDataProvider;

Expand All @@ -74,9 +78,8 @@ public void process(ClusterEvent event) throws Exception {
cache.getResourceConfig(resourceName));
resourceMap.put(resourceName, resource);

if (!idealState.isValid() && !isTaskCache
|| idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && isTaskCache
|| !idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && !isTaskCache) {
if (!isTaskCache && (!idealState.isValid() || !idealState.getStateModelDefRef()
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
.equals(TaskConstants.STATE_MODEL_NAME))) {
resourceToRebalance.put(resourceName, resource);
}
resource.setStateModelDefRef(idealState.getStateModelDefRef());
Expand All @@ -98,6 +101,43 @@ public void process(ClusterEvent event) throws Exception {
}
}
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved

// Add TaskFramework resources from workflow and job configs as Task Framework will no longer
// use IdealState
if (isTaskCache) {
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
WorkflowControllerDataProvider taskDataCache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
for (Map.Entry<String, WorkflowConfig> workflowConfigEntry : taskDataCache
.getWorkflowConfigMap().entrySet()) {
// always overwrite, because the resource could be created by IS
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
String resourceName = workflowConfigEntry.getKey();
WorkflowConfig workflowConfig = workflowConfigEntry.getValue();
addResourceConfigToResourceMap(resourceName, workflowConfig, cache.getClusterConfig(),
resourceMap, resourceToRebalance);
addPartition(resourceName, resourceName, resourceMap);
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
}

for (Map.Entry<String, JobConfig> jobConfigEntry : taskDataCache.getJobConfigMap()
.entrySet()) {
// always overwrite, because the resource could be created by IS
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
String resourceName = jobConfigEntry.getKey();
JobConfig jobConfig = jobConfigEntry.getValue();
addResourceConfigToResourceMap(resourceName, jobConfig, cache.getClusterConfig(),
resourceMap, resourceToRebalance);
int numPartitions = jobConfig.getTaskConfigMap().size();
if (numPartitions == 0 && idealStates != null) {
IdealState targetIs = idealStates.get(jobConfig.getTargetResource());
if (targetIs == null) {
LOG.warn("Target resource does not exist for job " + resourceName);
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
} else {
numPartitions = targetIs.getPartitionSet().size();
}
}
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < numPartitions; i++) {
addPartition(resourceName + "_" + i, resourceName, resourceMap);
}
}
}
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved

// It's important to get partitions from CurrentState as well since the
// idealState might be removed.
Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -130,11 +170,15 @@ public void process(ClusterEvent event) throws Exception {
resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
resource.setBucketSize(currentState.getBucketSize());
resource.setBatchMessageMode(currentState.getBatchMessageMode());
if (resource.getStateModelDefRef() == null && !isTaskCache
|| resource.getStateModelDefRef() != null && (
resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && isTaskCache
|| !resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)
&& !isTaskCache)) {
if (!isTaskCache && (resource.getStateModelDefRef() == null
|| !TaskConstants.STATE_MODEL_NAME.equals(resource.getStateModelDefRef()))) {
resourceToRebalance.put(resourceName, resource);
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
}

if (isTaskCache && TaskConstants.STATE_MODEL_NAME
.equals(resource.getStateModelDefRef())) {
// If a task current state exists without configs, it needs to be cleaned up
taskResourcesToDrop.put(resourceName, resource);
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
resourceToRebalance.put(resourceName, resource);
}

Expand Down Expand Up @@ -163,6 +207,9 @@ public void process(ClusterEvent event) throws Exception {

event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceToRebalance);
if (isTaskCache) {
event.addAttribute(AttributeName.TASK_RESOURCES_TO_DROP.name(), taskResourcesToDrop);
}
}

private void addResource(String resource, Map<String, Resource> resourceMap) {
Expand All @@ -185,4 +232,21 @@ private void addPartition(String partition, String resourceName, Map<String, Res
resource.addPartition(partition);

}

private void addResourceConfigToResourceMap(String resourceName, ResourceConfig resourceConfig,
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
ClusterConfig clusterConfig, Map<String, Resource> resourceMap,
Map<String, Resource> resourceToRebalance) {
Resource resource = new Resource(resourceName, clusterConfig, resourceConfig);
resourceMap.put(resourceName, resource);
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
resource.setStateModelDefRef(TaskConstants.STATE_MODEL_NAME);
resource.setStateModelFactoryName(resourceConfig.getStateModelFactoryName());
boolean batchMessageMode = resourceConfig.getBatchMessageMode();
if (clusterConfig != null) {
batchMessageMode |= clusterConfig.getBatchMessageMode();
}
resource.setBatchMessageMode(batchMessageMode);
resource.setResourceGroupName(resourceConfig.getResourceGroupName());
resource.setResourceTag(resourceConfig.getInstanceGroupTag());
resourceToRebalance.put(resourceName, resource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,20 @@
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.task.AssignableInstanceManager;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskRebalancer;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.task.WorkflowDispatcher;
import org.apache.helix.task.assigner.AssignableInstance;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -110,117 +104,17 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource
restOfResources.remove(jobName);
}

// Current rest of resources including: only current state left over ones
// Original resource map contains workflows + jobs + other invalid resources
// After removing workflows + jobs, only leftover ones will go over old rebalance pipeline.
for (Resource resource : restOfResources.values()) {
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
if (!computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output)) {
failureResources.add(resource.getResourceName());
LogUtil.logWarn(logger, _eventId,
"Failed to calculate best possible states for " + resource.getResourceName());
}
Map<String, Resource> taskResourcesToDrop =
event.getAttribute(AttributeName.TASK_RESOURCES_TO_DROP.name());
for (String resourceName : taskResourcesToDrop.keySet()) {
ResourceAssignment emptyAssignment =
_workflowDispatcher.buildEmptyAssignment(resourceName, currentStateOutput);
_workflowDispatcher.updateBestPossibleStateOutput(resourceName, emptyAssignment, output);
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
}

return output;
}


private boolean computeResourceBestPossibleState(ClusterEvent event, WorkflowControllerDataProvider cache,
CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output) {
// for each ideal state
// read the state model def
// for each resource
// get the preference list
// for each instanceName check if its alive then assign a state

String resourceName = resource.getResourceName();
LogUtil.logDebug(logger, _eventId, "Processing resource:" + resourceName);
// Ideal state may be gone. In that case we need to get the state model name
// from the current state
IdealState idealState = cache.getIdealState(resourceName);
if (idealState == null) {
// if ideal state is deleted, use an empty one
LogUtil.logInfo(logger, _eventId, "resource:" + resourceName + " does not exist anymore");
idealState = new IdealState(resourceName);
idealState.setStateModelDefRef(resource.getStateModelDefRef());
}

// Skip the resources are not belonging to task pipeline
if (!idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
LogUtil.logWarn(logger, _eventId, String
.format("Resource %s should not be processed by %s pipeline", resourceName,
cache.getPipelineName()));
return false;
}

Rebalancer rebalancer = null;
String rebalancerClassName = idealState.getRebalancerClassName();
if (rebalancerClassName != null) {
if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId,
"resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
}
try {
rebalancer = Rebalancer.class
.cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
} catch (Exception e) {
LogUtil.logError(logger, _eventId,
"Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
}
}

MappingCalculator mappingCalculator = null;
if (rebalancer != null) {
try {
mappingCalculator = MappingCalculator.class.cast(rebalancer);
} catch (ClassCastException e) {
LogUtil.logWarn(logger, _eventId,
"Rebalancer does not have a mapping calculator, defaulting to SEMI_AUTO, resource: "
+ resourceName);
}
} else {
// Create dummy rebalancer for dropping existing current states
rebalancer = new SemiAutoRebalancer();
mappingCalculator = new SemiAutoRebalancer();
}

if (rebalancer instanceof TaskRebalancer) {
TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer);
taskRebalancer.setClusterStatusMonitor(
(ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name()));
}
ResourceAssignment partitionStateAssignment = null;
try {
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
rebalancer.init(manager);
partitionStateAssignment = mappingCalculator
.computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput);
_workflowDispatcher.updateBestPossibleStateOutput(resource.getResourceName(), partitionStateAssignment, output);

// Check if calculation is done successfully
return true;
} catch (Exception e) {
LogUtil
.logError(logger, _eventId, "Error computing assignment for resource " + resourceName + ". Skipping.", e);
// TODO : remove this part after debugging NPE
StringBuilder sb = new StringBuilder();

sb.append(String
.format("HelixManager is null : %s\n", event.getAttribute("helixmanager") == null));
sb.append(String.format("Rebalancer is null : %s\n", rebalancer == null));
sb.append(String.format("Calculated idealState is null : %s\n", idealState == null));
sb.append(String.format("MappingCaculator is null : %s\n", mappingCalculator == null));
sb.append(
String.format("PartitionAssignment is null : %s\n", partitionStateAssignment == null));
sb.append(String.format("Output is null : %s\n", output == null));

LogUtil.logError(logger, _eventId, sb.toString());
}

// Exception or rebalancer is not found
return false;
}

class WorkflowObject implements Comparable<WorkflowObject> {
String _workflowId;
Long _rankingValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.monitoring.mbeans.JobMonitor;
import org.apache.helix.task.assigner.AssignableInstance;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -518,7 +519,8 @@ protected void handleJobTimeout(JobContext jobCtx, WorkflowContext workflowCtx,
}
_clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.TIMED_OUT);
_rebalanceScheduler.removeScheduledRebalance(jobResource);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
// New pipeline trigger for workflow status update
RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
}

protected void failJob(String jobName, WorkflowContext workflowContext, JobContext jobContext,
Expand All @@ -534,7 +536,8 @@ protected void failJob(String jobName, WorkflowContext workflowContext, JobConte
}
_clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobName), TaskState.FAILED);
_rebalanceScheduler.removeScheduledRebalance(jobName);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
// New pipeline trigger for workflow status update
RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);
}

// Compute real assignment from theoretical calculation with applied throttling
Expand Down Expand Up @@ -936,6 +939,10 @@ protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConf

protected void scheduleJobCleanUp(long expiry, WorkflowConfig workflowConfig,
long currentTime) {
if (expiry < 0) {
// If the expiry is negative, it's an invalid clean up. Return.
return;
}
long currentScheduledTime =
_rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) == -1 ? Long.MAX_VALUE
: _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId());
Expand Down Expand Up @@ -1148,7 +1155,7 @@ protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
return true;
}

protected ResourceAssignment buildEmptyAssignment(String name,
public ResourceAssignment buildEmptyAssignment(String name,
CurrentStateOutput currStateOutput) {
ResourceAssignment assignment = new ResourceAssignment(name);
Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.TreeSet;

import com.google.common.collect.ImmutableMap;
import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
Expand Down Expand Up @@ -85,15 +86,16 @@ public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName,
// completed)
TaskState workflowState = workflowCtx.getWorkflowState();
TaskState jobState = workflowCtx.getJobState(jobName);
// The job is already in a final state (completed/failed).
// Do not include workflowState == TIMED_OUT here, as later logic needs to handle this case
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
|| jobState == TaskState.FAILED || jobState == TaskState.COMPLETED
|| jobState == TaskState.TIMED_OUT) {
LOG.info(String.format(
"Workflow %s or job %s is already in final state, workflow state (%s), job state (%s), clean up job IS.",
workflowResource, jobName, workflowState, jobState));
finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), workflowResource, jobName);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
// New pipeline trigger for workflow status update
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);
_rebalanceScheduler.removeScheduledRebalance(jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
Expand Down Expand Up @@ -287,7 +289,8 @@ private ResourceAssignment computeResourceMapping(String jobResource,
_clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
jobCtx.getFinishTime() - jobCtx.getStartTime());
_rebalanceScheduler.removeScheduledRebalance(jobResource);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
// New pipeline trigger for workflow status update
RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);
return buildEmptyAssignment(jobResource, currStateOutput);
}

Expand All @@ -298,6 +301,7 @@ private ResourceAssignment computeResourceMapping(String jobResource,
handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(),
jobResource);
scheduleJobCleanUp(jobCfg.getTerminalStateExpiry(), workflowConfig, currentTime);
NealSun96 marked this conversation as resolved.
Show resolved Hide resolved
return buildEmptyAssignment(jobResource, currStateOutput);
}

Expand Down