Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.apache.helix.api.rebalancer.constraint;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.List;
import java.util.Map;

import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.Partition;
import org.apache.helix.model.StateModelDefinition;

/**
* A generic interface to find and recover if the partition has abnormal current states.
*/
public interface AbnormalStateResolver {
/**
* A placeholder which will be used when the resolver is not specified.
* This is a dummy class that does not really functional.
*/
AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
final String resourceName, final Partition partition,
final StateModelDefinition stateModelDef) {
// By default, all current states are valid.
return true;
}
public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
final String resourceName, final Partition partition,
final StateModelDefinition stateModelDef, final List<String> preferenceList) {
throw new UnsupportedOperationException("This resolver won't recover abnormal states.");
}
};

/**
* Check if the current states of the specified partition is valid.
* @param currentStateOutput
* @param resourceName
* @param partition
* @param stateModelDef
* @return true if the current states of the specified partition is valid.
*/
boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
final String resourceName, final Partition partition,
final StateModelDefinition stateModelDef);

/**
* Compute a transient partition state assignment to fix the abnormal.
* @param currentStateOutput
* @param resourceName
* @param partition
* @param stateModelDef
* @param preferenceList
* @return the transient partition state assignment which remove the abnormal states.
*/
Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
final String resourceName, final Partition partition,
final StateModelDefinition stateModelDef, final List<String> preferenceList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@

import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
import org.apache.helix.common.caches.AbstractDataCache;
import org.apache.helix.common.caches.CurrentStateCache;
import org.apache.helix.common.caches.InstanceMessagesCache;
Expand All @@ -53,6 +55,7 @@
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,6 +106,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private Map<String, Map<String, String>> _idealStateRuleMap;
private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
private Set<String> _disabledInstanceSet = new HashSet<>();
private final Map<String, AbnormalStateResolver> _abnormalStateResolverMap = new HashMap<>();

public BaseControllerDataProvider() {
this(AbstractDataCache.UNKNOWN_CLUSTER, AbstractDataCache.UNKNOWN_PIPELINE);
Expand Down Expand Up @@ -225,6 +229,7 @@ private void refreshClusterConfig(final HelixDataAccessor accessor,
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CLUSTER_CONFIG).getAndSet(false)) {
_clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
refreshedType.add(HelixConstants.ChangeType.CLUSTER_CONFIG);
refreshAbnormalStateResolverMap(_clusterConfig);
} else {
LogUtil.logInfo(logger, getClusterEventId(), String.format(
"No ClusterConfig change for cluster %s, pipeline %s", _clusterName, getPipelineName()));
Expand Down Expand Up @@ -372,6 +377,7 @@ public ClusterConfig getClusterConfig() {

public void setClusterConfig(ClusterConfig clusterConfig) {
_clusterConfig = clusterConfig;
refreshAbnormalStateResolverMap(_clusterConfig);
}

@Override
Expand Down Expand Up @@ -723,6 +729,43 @@ public void setAsyncTasksThreadPool(ExecutorService asyncTasksThreadPool) {
_asyncTasksThreadPool = asyncTasksThreadPool;
}


public AbnormalStateResolver getAbnormalStateResolver(String stateModel) {
return _abnormalStateResolverMap
.getOrDefault(stateModel, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
}

private void refreshAbnormalStateResolverMap(ClusterConfig clusterConfig) {
if (clusterConfig == null) {
logger.debug("Skip refreshing abnormal state resolvers because the ClusterConfig is missing");
return;
}
Map<String, String> resolverMap = clusterConfig.getAbnormalStateResolverMap();
logger.info("Start loading the abnormal state resolvers with configuration {}", resolverMap);
// Remove any resolver configuration that does not exist anymore.
_abnormalStateResolverMap.keySet().retainAll(resolverMap.keySet());
// Reload the resolver classes into cache based on the configuration.
for (String stateModel : resolverMap.keySet()) {
String resolverClassName = resolverMap.get(stateModel);
if (resolverClassName == null || resolverClassName.isEmpty()) {
// skip the empty definition.
continue;
}
if (!resolverClassName.equals(getAbnormalStateResolver(stateModel).getClass().getName())) {
try {
AbnormalStateResolver resolver = AbnormalStateResolver.class
.cast(HelixUtil.loadClass(getClass(), resolverClassName).newInstance());
_abnormalStateResolverMap.put(stateModel, resolver);
} catch (Exception e) {
throw new HelixException(String
.format("Failed to instantiate the abnormal state resolver %s for state model %s",
resolverClassName, stateModel));
}
} // else, nothing to update since the same resolver class has been loaded.
}
logger.info("Finish loading the abnormal state resolvers {}", _abnormalStateResolverMap);
}

public boolean isMaintenanceModeEnabled() {
return _isMaintenanceModeEnabled;
}
Expand Down Expand Up @@ -760,4 +803,4 @@ protected PropertyCache<LiveInstance> getLiveInstanceCache() {
public String toString() {
return genCacheContentStringBuilder().toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
Expand Down Expand Up @@ -102,7 +104,8 @@ public ResourceAssignment computeBestPossiblePartitionState(
Map<String, String> bestStateForPartition =
computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef,
preferenceList, currentStateOutput, disabledInstancesForPartition, idealState,
cache.getClusterConfig(), partition);
cache.getClusterConfig(), partition,
cache.getAbnormalStateResolver(stateModelDefName));
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
Expand Down Expand Up @@ -179,44 +182,97 @@ protected RebalanceStrategy<T> getRebalanceStrategy(
return rebalanceStrategy;
}

/**
* Compute best state for partition in AUTO ideal state mode.
* @param liveInstances
* @param stateModelDef
* @param preferenceList
* @param currentStateOutput instance->state for each partition
* @param disabledInstancesForPartition
* @param idealState
* @param clusterConfig
* @param partition
* @param resolver
* @return
*/
protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
StateModelDefinition stateModelDef, List<String> preferenceList,
CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
IdealState idealState, ClusterConfig clusterConfig, Partition partition) {
IdealState idealState, ClusterConfig clusterConfig, Partition partition,
AbnormalStateResolver resolver) {
Optional<Map<String, String>> optionalOverwrittenStates =
computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
idealState, partition, resolver);
if (optionalOverwrittenStates.isPresent()) {
return optionalOverwrittenStates.get();
}

Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition);
Map<String, String> currentStateMap = new HashMap<>(
currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
return computeBestPossibleMap(preferenceList, stateModelDef, currentStateMap, liveInstances,
disabledInstancesForPartition);
}

if (currentStateMap == null) {
currentStateMap = Collections.emptyMap();
}
/**
* Compute if an overwritten is necessary for the partition assignment in case that the proposed
* assignment is not valid or empty.
* @param stateModelDef
* @param preferenceList
* @param currentStateOutput
* @param idealState
* @param partition
* @param resolver
* @return An optional object which contains the assignment map if overwritten is necessary.
* Otherwise return Optional.empty().
*/
protected Optional<Map<String, String>> computeStatesOverwriteForPartition(
final StateModelDefinition stateModelDef, final List<String> preferenceList,
final CurrentStateOutput currentStateOutput, IdealState idealState, final Partition partition,
final AbnormalStateResolver resolver) {
String resourceName = idealState.getResourceName();
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition);

// (1) If the partition is removed from IS or the IS is deleted.
// Transit to DROPPED no matter the instance is disabled or not.
if (preferenceList == null) {
return computeBestPossibleMapForDroppedResource(currentStateMap);
return Optional.of(computeBestPossibleMapForDroppedResource(currentStateMap));
}

// (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR.
if (!idealState.isEnabled()) {
return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef);
return Optional.of(computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef));
}

return computeBestPossibleMap(preferenceList, stateModelDef, currentStateMap, liveInstances,
disabledInstancesForPartition);
// (3) If the current states are not valid, fix the invalid part first.
if (!resolver.isCurrentStatesValid(currentStateOutput, resourceName, partition, stateModelDef)) {
Map<String, String> recoveryAssignment = resolver
.computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
preferenceList);
if (recoveryAssignment == null || !recoveryAssignment.keySet()
.equals(currentStateMap.keySet())) {
throw new HelixException(String.format(
"Invalid recovery assignment %s since it changed the current partition placement %s",
recoveryAssignment, currentStateMap));
}
return Optional.of(recoveryAssignment);
}

return Optional.empty();
}

protected Map<String, String> computeBestPossibleMapForDroppedResource(Map<String, String> currentStateMap) {
Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
protected Map<String, String> computeBestPossibleMapForDroppedResource(
final Map<String, String> currentStateMap) {
Map<String, String> bestPossibleStateMap = new HashMap<>();
for (String instance : currentStateMap.keySet()) {
bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.toString());
}
return bestPossibleStateMap;
}

protected Map<String, String> computeBestPossibleMapForDisabledResource(Map<String, String> currentStateMap
, StateModelDefinition stateModelDef) {
Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
protected Map<String, String> computeBestPossibleMapForDisabledResource(
final Map<String, String> currentStateMap, StateModelDefinition stateModelDef) {
Map<String, String> bestPossibleStateMap = new HashMap<>();
for (String instance : currentStateMap.keySet()) {
if (!HelixDefinedState.ERROR.name().equals(currentStateMap.get(instance))) {
bestPossibleStateMap.put(instance, stateModelDef.getInitialState());
Expand Down Expand Up @@ -267,7 +323,6 @@ public static int getStateCount(String state, StateModelDefinition stateModelDef
*/
protected Map<String, String> computeBestPossibleMap(List<String> preferenceList, StateModelDefinition stateModelDef,
Map<String, String> currentStateMap, Set<String> liveInstances, Set<String> disabledInstancesForPartition) {

Map<String, String> bestPossibleStateMap = new HashMap<>();

// (1) Instances that have current state but not in preference list, drop, no matter it's disabled or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
Expand Down Expand Up @@ -263,7 +265,7 @@ public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDa
Map<String, String> bestStateForPartition =
computeBestPossibleStateForPartition(liveNodes, stateModelDef, preferenceList,
currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig,
partition);
partition, cache.getAbnormalStateResolver(stateModelDefName));

partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
Expand All @@ -276,39 +278,20 @@ public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDa
return partitionMapping;
}

/**
* compute best state for resource in AUTO ideal state mode
* @param liveInstances
* @param stateModelDef
* @param preferenceList
* @param currentStateOutput
* : instance->state for each partition
* @param disabledInstancesForPartition
* @param idealState
* @param clusterConfig
* @param partition
* @return
*/
@Override
protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
StateModelDefinition stateModelDef, List<String> preferenceList,
CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
IdealState idealState, ClusterConfig clusterConfig, Partition partition) {

IdealState idealState, ClusterConfig clusterConfig, Partition partition,
AbnormalStateResolver resolver) {
Optional<Map<String, String>> optionalOverwrittenStates =
computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
idealState, partition, resolver);
if (optionalOverwrittenStates.isPresent()) {
return optionalOverwrittenStates.get();
}
Map<String, String> currentStateMap = new HashMap<>(
currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));

// (1) If the partition is removed from IS or the IS is deleted.
// Transit to DROPPED no matter the instance is disabled or not.
if (preferenceList == null) {
return computeBestPossibleMapForDroppedResource(currentStateMap);
}

// (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR.
if (!idealState.isEnabled()) {
return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef);
}

// Instances not in preference list but still have active replica, retain to avoid zero replica during movement
List<String> currentInstances = new ArrayList<>(currentStateMap.keySet());
Collections.sort(currentInstances);
Expand All @@ -332,7 +315,6 @@ protected Map<String, String> computeBestPossibleStateForPartition(Set<String> l
}
}


// Sort the instancesToMove by their current partition state.
// Reason: because the states are assigned to instances in the order appeared in preferenceList, if we have
// [node1:Slave, node2:Master], we want to keep it that way, instead of assigning Master to node1.
Expand Down
Loading