From 87a099fbe4aafcbdabb2c3fc9d1f91f34adfa245 Mon Sep 17 00:00:00 2001 From: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com> Date: Wed, 28 Aug 2019 22:47:58 -0700 Subject: [PATCH] Revert "Refine the WAGED rebalancer related interfaces for integration (#431)" (#437) This reverts commit 08a2015c617ddd3c93525afc572081a7836f9476. --- .../apache/helix/HelixRebalanceException.java | 43 -------- .../ResourceChangeDetector.java | 20 ++-- .../rebalancer/GlobalRebalancer.java | 67 +++++++++++++ .../rebalancer/waged/ClusterDataDetector.java | 73 ++++++++++++++ .../rebalancer/waged/ClusterDataProvider.java | 54 ++++++++++ .../rebalancer/waged/WagedRebalancer.java | 65 +++--------- .../waged/model/ClusterModelProvider.java | 25 +++-- .../stages/BestPossibleStateCalcStage.java | 98 +++++-------------- .../waged/model/TestClusterModelProvider.java | 6 +- 9 files changed, 255 insertions(+), 196 deletions(-) delete mode 100644 helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java deleted file mode 100644 index c01b173e87..0000000000 --- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.apache.helix; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * Exception thrown by Helix due to rebalance failures. - */ -public class HelixRebalanceException extends Exception { - enum RebalanceFailureType { - INVALID_CLUSTER_STATUS, - INVALID_REBALANCER_STATUS, - FAILED_TO_CALCULATE, - UNKNOWN_FAILURE - } - - private final RebalanceFailureType _type; - - public HelixRebalanceException(String message, RebalanceFailureType type, Throwable cause) { - super(String.format("%s. Failure Type: %s", message, type.name()), cause); - _type = type; - } - - public RebalanceFailureType getFailureType() { - return _type; - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java index 611f4b2bc9..d65e609258 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java +++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java @@ -20,17 +20,15 @@ */ import com.google.common.collect.Sets; -import org.apache.helix.HelixConstants; -import org.apache.helix.HelixProperty; -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; /** * ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from @@ -39,7 +37,6 @@ * WARNING: the methods of this class are not thread-safe. */ public class ResourceChangeDetector implements ChangeDetector { - private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName()); private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run @@ -111,13 +108,10 @@ private void clearCachedComputation() { return snapshot.getResourceConfigMap(); case LIVE_INSTANCE: return snapshot.getLiveInstances(); - case CONFIG: - return Collections.emptyMap(); default: - LOG.warn( - "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: {}", - changeType); - return Collections.emptyMap(); + throw new HelixException(String.format( + "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s", + changeType)); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java new file mode 100644 index 0000000000..a3b9b3259e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java @@ -0,0 +1,67 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixManager; +import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Resource; + +import java.util.Map; + +public interface GlobalRebalancer { + enum RebalanceFailureType { + INVALID_CLUSTER_STATUS, + INVALID_REBALANCER_STATUS, + FAILED_TO_CALCULATE, + UNKNOWN_FAILURE + } + + class RebalanceFailureReason { + private final static String DEFAULT_REASON_MESSAGE = "No detail"; + private final RebalanceFailureType _type; + private final String _reason; + + public RebalanceFailureReason(RebalanceFailureType type) { + this(type, DEFAULT_REASON_MESSAGE); + } + + public RebalanceFailureReason(RebalanceFailureType type, String reason) { + _type = type; + _reason = reason; + } + + public RebalanceFailureType get_type() { + return _type; + } + + public String get_reason() { + return _reason; + } + } + + void init(HelixManager manager); + + Map computeNewIdealState(final CurrentStateOutput currentStateOutput, + T clusterData, Map resourceMap); + + RebalanceFailureReason getFailureReason(); +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java new file mode 100644 index 0000000000..0423edf4d3 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java @@ -0,0 +1,73 @@ +package org.apache.helix.controller.rebalancer.waged; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * A placeholder before we have the Cluster Data Detector implemented. + * + * @param The cache class that can be handled by the detector. + */ +public class ClusterDataDetector { + /** + * All the cluster change type that may trigger a WAGED rebalancer re-calculation. + */ + public enum ChangeType { + BaselineAssignmentChange, + InstanceConfigChange, + ClusterConfigChange, + ResourceConfigChange, + ResourceIdealStatesChange, + InstanceStateChange, + OtherChange + } + + private Map> _currentChanges = + Collections.singletonMap(ChangeType.ClusterConfigChange, Collections.emptySet()); + + public void updateClusterStatus(T cache) { + } + + /** + * Returns all change types detected during the ClusterDetection stage. + */ + public Set getChangeTypes() { + return _currentChanges.keySet(); + } + + /** + * Returns a set of the names of components that changed based on the given change type. + */ + public Set getChangesBasedOnType(ChangeType changeType) { + return _currentChanges.get(changeType); + } + + /** + * Return a map of the change details . + */ + public Map> getAllChanges() { + return _currentChanges; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java new file mode 100644 index 0000000000..387666c6db --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java @@ -0,0 +1,54 @@ +package org.apache.helix.controller.rebalancer.waged; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; +import org.apache.helix.model.ResourceAssignment; + +import java.util.Map; +import java.util.Set; + +/** + * A placeholder before we have the implementation. + * + * The data provider generates the Cluster Model based on the controller's data cache. + */ +public class ClusterDataProvider { + + /** + * @param dataProvider The controller's data cache. + * @param activeInstances The logical active instances that will be used in the calculation. Note + * This list can be different from the real active node list according to + * the rebalancer logic. + * @param clusterChanges All the cluster changes that happened after the previous rebalance. + * @param baselineAssignment The persisted Baseline assignment. + * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the + * previous rebalance. + * @return The cluster model as the input for the upcoming rebalance. + */ + protected static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider, + Set activeInstances, Map> clusterChanges, + Map baselineAssignment, + Map bestPossibleAssignment) { + // TODO finish the implementation. + return null; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index fd740e6533..aa3cfeeacc 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -19,13 +19,10 @@ * under the License. */ +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; -import org.apache.helix.HelixRebalanceException; -import org.apache.helix.controller.changedetector.ResourceChangeDetector; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; -import org.apache.helix.controller.rebalancer.internal.MappingCalculator; -import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintsRebalanceAlgorithm; +import org.apache.helix.controller.rebalancer.GlobalRebalancer; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; import org.apache.helix.model.Resource; @@ -39,57 +36,23 @@ * A placeholder before we have the implementation. * Weight-Aware Globally-Even Distribute Rebalancer. * - * @see - * Design Document - * + * @see https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer */ -public class WagedRebalancer { +public class WagedRebalancer implements GlobalRebalancer { private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class); - // --------- The following fields are placeholders and need replacement. -----------// - // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization? - private final AssignmentMetadataStore _assignmentMetadataStore; - private final RebalanceAlgorithm _rebalanceAlgorithm; - // ------------------------------------------------------------------------------------// + @Override + public void init(HelixManager manager) { } - // The cluster change detector is a stateful object. Make it static to avoid unnecessary - // reinitialization. - private static final ThreadLocal CHANGE_DETECTOR_THREAD_LOCAL = - new ThreadLocal<>(); - private final MappingCalculator _mappingCalculator; - - private ResourceChangeDetector getChangeDetector() { - if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) { - CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector()); - } - return CHANGE_DETECTOR_THREAD_LOCAL.get(); - } - - public WagedRebalancer(HelixManager helixManager) { - // TODO init the metadata store according to their requirement when integrate, or change to final static method if possible. - _assignmentMetadataStore = new AssignmentMetadataStore(); - // TODO init the algorithm according to the requirement when integrate. - _rebalanceAlgorithm = new ConstraintsRebalanceAlgorithm(); - - // Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment - // output. - // This calculator will translate the best possible assignment into an applicable state mapping - // based on the current states. - // TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer - _mappingCalculator = new DelayedAutoRebalancer(); + @Override + public Map computeNewIdealState(CurrentStateOutput currentStateOutput, + ResourceControllerDataProvider clusterData, Map resourceMap) + throws HelixException { + return new HashMap<>(); } - /** - * Compute the new IdealStates for all the resources input. The IdealStates include both the new - * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields). - * @param clusterData The Cluster status data provider. - * @param resourceMap A map containing all the rebalancing resources. - * @param currentStateOutput The present Current State of the cluster. - * @return A map containing the computed new IdealStates. - */ - public Map computeNewIdealStates(ResourceControllerDataProvider clusterData, - Map resourceMap, final CurrentStateOutput currentStateOutput) - throws HelixRebalanceException { - return new HashMap<>(); + @Override + public RebalanceFailureReason getFailureReason() { + return new RebalanceFailureReason(RebalanceFailureType.UNKNOWN_FAILURE); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index c4f7d02e9c..9de023b0f7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -19,9 +19,9 @@ * under the License. */ -import org.apache.helix.HelixConstants; import org.apache.helix.HelixException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; @@ -59,7 +59,7 @@ public class ClusterModelProvider { */ public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider, Map resourceMap, Set activeInstances, - Map> clusterChanges, + Map> clusterChanges, Map baselineAssignment, Map bestPossibleAssignment) { // Generate replica objects for all the resource partitions. @@ -108,13 +108,14 @@ public static ClusterModel generateClusterModel(ResourceControllerDataProvider d */ private static Set findToBeAssignedReplicas( Map> replicaMap, - Map> clusterChanges, Set activeInstances, + Map> clusterChanges, Set activeInstances, Map bestPossibleAssignment, Map> allocatedReplicas) { Set toBeAssignedReplicas = new HashSet<>(); - if (clusterChanges.containsKey(HelixConstants.ChangeType.CONFIG) - || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) { - // If the cluster topology has been modified, need to reassign all replicas + if (clusterChanges.containsKey(ClusterDataDetector.ChangeType.ClusterConfigChange) + || clusterChanges.containsKey(ClusterDataDetector.ChangeType.InstanceConfigChange) + || clusterChanges.containsKey(ClusterDataDetector.ChangeType.BaselineAssignmentChange)) { + // If the cluster topology or baseline assignment has been modified, need to reassign all replicas toBeAssignedReplicas .addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet())); } else { @@ -123,13 +124,11 @@ private static Set findToBeAssignedReplicas( Set replicas = replicaMap.get(resourceName); // 1. if the resource config/idealstate is changed, need to reassign. // 2. if the resource does appear in the best possible assignment, need to reassign. - if (clusterChanges - .getOrDefault(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.emptySet()) - .contains(resourceName) - || clusterChanges - .getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet()) - .contains(resourceName) - || !bestPossibleAssignment.containsKey(resourceName)) { + if (clusterChanges.getOrDefault(ClusterDataDetector.ChangeType.ResourceConfigChange, + Collections.emptySet()).contains(resourceName) || clusterChanges + .getOrDefault(ClusterDataDetector.ChangeType.ResourceIdealStatesChange, + Collections.emptySet()).contains(resourceName) || !bestPossibleAssignment + .containsKey(resourceName)) { toBeAssignedReplicas.addAll(replicas); continue; // go to check next resource } else { diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index ba4da88c4f..49a72e01a8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -20,7 +20,6 @@ */ import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -28,7 +27,6 @@ import org.apache.helix.HelixException; import org.apache.helix.HelixManager; -import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; @@ -39,7 +37,6 @@ import org.apache.helix.controller.rebalancer.Rebalancer; import org.apache.helix.controller.rebalancer.SemiAutoRebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; -import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.MaintenanceSignal; @@ -59,19 +56,18 @@ * IdealState,StateModel,LiveInstance */ public class BestPossibleStateCalcStage extends AbstractBaseStage { - private static final Logger logger = - LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName()); @Override public void process(ClusterEvent event) throws Exception { _eventId = event.getEventId(); - CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name()); + CurrentStateOutput currentStateOutput = + event.getAttribute(AttributeName.CURRENT_STATE.name()); final Map resourceMap = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); final ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); - ResourceControllerDataProvider cache = - event.getAttribute(AttributeName.ControllerDataProvider.name()); + ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); if (currentStateOutput == null || resourceMap == null || cache == null) { throw new StageException( @@ -94,7 +90,8 @@ public Object call() { resourceMap, stateModelDefMap); } } catch (Exception e) { - LogUtil.logError(logger, _eventId, "Could not update cluster status metrics!", e); + LogUtil + .logError(logger, _eventId, "Could not update cluster status metrics!", e); } return null; } @@ -103,8 +100,7 @@ public Object call() { private BestPossibleStateOutput compute(ClusterEvent event, Map resourceMap, CurrentStateOutput currentStateOutput) { - ResourceControllerDataProvider cache = - event.getAttribute(AttributeName.ControllerDataProvider.name()); + ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); BestPossibleStateOutput output = new BestPossibleStateOutput(); HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name()); @@ -116,50 +112,19 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map newIdealStates = new HashMap<>(); - WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager); - try { - newIdealStates - .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput)); - } catch (HelixRebalanceException ex) { - // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result. - // Since it calculates for all the eligible resources globally, a partial result is invalid. - // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring. - LogUtil.logError(logger, _eventId, String - .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s", - wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex); - } - final List failureResources = new ArrayList<>(); Iterator itr = resourceMap.values().iterator(); while (itr.hasNext()) { Resource resource = itr.next(); boolean result = false; - IdealState is = newIdealStates.get(resource.getResourceName()); - if (is != null) { - // 2. Check if the WAGED rebalancer has calculated for this resource or not. - result = checkBestPossibleStateCalculation(is); - if (result) { - // The WAGED rebalancer calculates a valid result, record in the output - updateBestPossibleStateOutput(output, resource, is); - } - } else { - // 3. The WAGED rebalancer skips calculating the resource assignment, fallback to use a - // legacy resource rebalancer if applicable. - // If this calculation fails, the resource will be reported in the failureResources list. - try { - result = - computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource, - output); - } catch (HelixException ex) { - LogUtil.logError(logger, _eventId, - "Exception when calculating best possible states for " + resource.getResourceName(), - ex); - } + try { + result = + computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output); + } catch (HelixException ex) { + LogUtil.logError(logger, _eventId, + "Exception when calculating best possible states for " + resource.getResourceName(), + ex); + } if (!result) { failureResources.add(resource.getResourceName()); @@ -220,9 +185,8 @@ private boolean validateOfflineInstancesLimit(final ResourceControllerDataProvid if (manager != null) { if (manager.getHelixDataAccessor() .getProperty(manager.getHelixDataAccessor().keyBuilder().maintenance()) == null) { - manager.getClusterManagmentTool() - .autoEnableMaintenanceMode(manager.getClusterName(), true, errMsg, - MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED); + manager.getClusterManagmentTool().autoEnableMaintenanceMode(manager.getClusterName(), + true, errMsg, MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED); LogUtil.logWarn(logger, _eventId, errMsg); } } else { @@ -235,19 +199,8 @@ private boolean validateOfflineInstancesLimit(final ResourceControllerDataProvid return true; } - private void updateBestPossibleStateOutput(BestPossibleStateOutput output, Resource resource, - IdealState computedIdealState) { - output.setPreferenceLists(resource.getResourceName(), computedIdealState.getPreferenceLists()); - for (Partition partition : resource.getPartitions()) { - Map newStateMap = - computedIdealState.getInstanceStateMap(partition.getPartitionName()); - output.setState(resource.getResourceName(), partition, newStateMap); - } - } - - private boolean computeSingleResourceBestPossibleState(ClusterEvent event, - ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput, - Resource resource, BestPossibleStateOutput output) { + private boolean computeResourceBestPossibleState(ClusterEvent event, ResourceControllerDataProvider cache, + CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output) { // for each ideal state // read the state model def // for each resource @@ -276,13 +229,12 @@ private boolean computeSingleResourceBestPossibleState(ClusterEvent event, Rebalancer rebalancer = getRebalancer(idealState, resourceName, cache.isMaintenanceModeEnabled()); - MappingCalculator mappingCalculator = - getMappingCalculator(rebalancer, resourceName); + MappingCalculator mappingCalculator = getMappingCalculator(rebalancer, resourceName); if (rebalancer == null || mappingCalculator == null) { - LogUtil.logError(logger, _eventId, "Error computing assignment for resource " + resourceName - + ". no rebalancer found. rebalancer: " + rebalancer + " mappingCalculator: " - + mappingCalculator); + LogUtil.logError(logger, _eventId, + "Error computing assignment for resource " + resourceName + ". no rebalancer found. rebalancer: " + rebalancer + + " mappingCalculator: " + mappingCalculator); } if (rebalancer != null && mappingCalculator != null) { @@ -347,8 +299,8 @@ private boolean checkBestPossibleStateCalculation(IdealState idealState) { } } - private Rebalancer getRebalancer(IdealState idealState, - String resourceName, boolean isMaintenanceModeEnabled) { + private Rebalancer getRebalancer(IdealState idealState, String resourceName, + boolean isMaintenanceModeEnabled) { Rebalancer customizedRebalancer = null; String rebalancerClassName = idealState.getRebalancerClassName(); if (rebalancerClassName != null) { diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java index 1221b6f28b..f92a66ccaf 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java @@ -19,8 +19,8 @@ * under the License. */ -import org.apache.helix.HelixConstants; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.model.CurrentState; import org.apache.helix.model.IdealState; @@ -177,7 +177,7 @@ public void testGenerateClusterModel() throws IOException { // 5. test with best possible assignment but cluster topology is changed clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream() .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))), - _instances, Collections.singletonMap(HelixConstants.ChangeType.CONFIG, + _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ClusterConfigChange, Collections.emptySet()), Collections.emptyMap(), bestPossibleAssignment); // There should be no existing assignment since the topology change invalidates all existing assignment Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream() @@ -194,7 +194,7 @@ public void testGenerateClusterModel() throws IOException { String changedResourceName = _resourceNames.get(0); clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream() .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))), - _instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG, + _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ResourceConfigChange, Collections.singleton(changedResourceName)), Collections.emptyMap(), bestPossibleAssignment); // There should be no existing assignment for all the resource except for resource2.