From 4dc4db46632f378896b36d2b6451b189d2fb7236 Mon Sep 17 00:00:00 2001 From: "Qi (Quincy) Qu" Date: Mon, 3 Apr 2023 15:29:34 -0400 Subject: [PATCH 1/2] Refactor WagedRebalancer and add comments Create standalone classes for partial and global rebalance to make the class more modular and easier to manage. --- .../rebalancer/util/WagedRebalanceUtil.java | 53 +++ .../rebalancer/waged/AssignmentManager.java | 103 +++++ .../waged/GlobalRebalanceRunner.java | 216 ++++++++++ .../waged/PartialRebalanceRunner.java | 208 ++++++++++ .../rebalancer/waged/WagedRebalancer.java | 376 ++---------------- .../rebalancer/waged/TestWagedRebalancer.java | 5 - 6 files changed, 607 insertions(+), 354 deletions(-) create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java new file mode 100644 index 0000000000..62a5fb515a --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java @@ -0,0 +1,53 @@ +package org.apache.helix.controller.rebalancer.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; +import org.apache.helix.HelixRebalanceException; +import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm; +import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; +import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment; +import org.apache.helix.model.ResourceAssignment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class WagedRebalanceUtil { + + private static final Logger LOG = LoggerFactory.getLogger(WagedRebalanceUtil.class); + + /** + * @param clusterModel the cluster model that contains all the cluster status for the purpose of + * rebalancing. + * @return the new optimal assignment for the resources. + */ + public static Map calculateAssignment(ClusterModel clusterModel, + RebalanceAlgorithm algorithm) throws HelixRebalanceException { + long startTime = System.currentTimeMillis(); + LOG.info("Start calculating for an assignment with algorithm {}", + algorithm.getClass().getSimpleName()); + OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel); + Map newAssignment = + optimalAssignment.getOptimalResourceAssignment(); + LOG.info("Finish calculating an assignment with algorithm {}. Took: {} ms.", + algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime); + return newAssignment; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java new file mode 100644 index 0000000000..45834fc443 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java @@ -0,0 +1,103 @@ +package org.apache.helix.controller.rebalancer.waged; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixRebalanceException; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.monitoring.metrics.model.LatencyMetric; + + +class AssignmentManager { + private final LatencyMetric _stateReadLatency; + + public AssignmentManager(LatencyMetric stateReadLatency) { + _stateReadLatency = stateReadLatency; + } + + /** + * @param assignmentMetadataStore + * @param currentStateOutput + * @param resources + * @return The current baseline assignment. If record does not exist in the + * assignmentMetadataStore, return the current state assignment. + * @throws HelixRebalanceException + */ + public Map getBaselineAssignment(AssignmentMetadataStore assignmentMetadataStore, + CurrentStateOutput currentStateOutput, Set resources) throws HelixRebalanceException { + Map currentBaseline = new HashMap<>(); + if (assignmentMetadataStore != null) { + try { + _stateReadLatency.startMeasuringLatency(); + currentBaseline = new HashMap<>(assignmentMetadataStore.getBaseline()); + _stateReadLatency.endMeasuringLatency(); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Failed to get the current baseline assignment because of unexpected error.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } + currentBaseline.keySet().retainAll(resources); + + // For resources without baseline, fall back to current state assignments + Set missingResources = new HashSet<>(resources); + missingResources.removeAll(currentBaseline.keySet()); + currentBaseline.putAll(currentStateOutput.getAssignment(missingResources)); + + return currentBaseline; + } + + /** + * @param assignmentMetadataStore + * @param currentStateOutput + * @param resources + * @return The current best possible assignment. If record does not exist in the + * assignmentMetadataStore, return the current state assignment. + * @throws HelixRebalanceException + */ + public Map getBestPossibleAssignment( + AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput, + Set resources) throws HelixRebalanceException { + Map currentBestAssignment = new HashMap<>(); + if (assignmentMetadataStore != null) { + try { + _stateReadLatency.startMeasuringLatency(); + currentBestAssignment = new HashMap<>(assignmentMetadataStore.getBestPossibleAssignment()); + _stateReadLatency.endMeasuringLatency(); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Failed to get the current best possible assignment because of unexpected error.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } + currentBestAssignment.keySet().retainAll(resources); + + // For resources without best possible states, fall back to current state assignments + Set missingResources = new HashSet<>(resources); + missingResources.removeAll(currentBestAssignment.keySet()); + currentBestAssignment.putAll(currentStateOutput.getAssignment(missingResources)); + + return currentBestAssignment; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java new file mode 100644 index 0000000000..6130e5c522 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java @@ -0,0 +1,216 @@ +package org.apache.helix.controller.rebalancer.waged; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixRebalanceException; +import org.apache.helix.controller.changedetector.ResourceChangeDetector; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil; +import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; +import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.monitoring.metrics.MetricCollector; +import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.CountMetric; +import org.apache.helix.monitoring.metrics.model.LatencyMetric; +import org.apache.helix.util.RebalanceUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Global Rebalance does the baseline recalculation when certain changes happen. + * The Global Baseline calculation does not consider any temporary status, such as participants' offline/disabled. + * Baseline is used as an anchor for {@link PartialRebalanceRunner}. Its computation takes previous baseline as an input. + * The Baseline is NOT directly propagated to the final output. It is consumed by the {link PartialRebalanceRunner} + * as an important parameter. + */ +class GlobalRebalanceRunner implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(GlobalRebalanceRunner.class); + + // When any of the following change happens, the rebalancer needs to do a global rebalance which + // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. + private static final Set GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = + ImmutableSet + .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE, + HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG); + + // To calculate the baseline asynchronously + private final ExecutorService _baselineCalculateExecutor; + private final ResourceChangeDetector _changeDetector; + private final AssignmentManager _assignmentManager; + private final AssignmentMetadataStore _assignmentMetadataStore; + private final LatencyMetric _writeLatency; + private final CountMetric _baselineCalcCounter; + private final LatencyMetric _baselineCalcLatency; + private final CountMetric _rebalanceFailureCount; + + private boolean _asyncGlobalRebalanceEnabled; + + public GlobalRebalanceRunner(AssignmentManager assignmentManager, + AssignmentMetadataStore assignmentMetadataStore, + MetricCollector metricCollector, + LatencyMetric writeLatency, + CountMetric rebalanceFailureCount, + boolean isAsyncGlobalRebalanceEnabled) { + _baselineCalculateExecutor = Executors.newSingleThreadExecutor(); + _assignmentManager = assignmentManager; + _assignmentMetadataStore = assignmentMetadataStore; + _changeDetector = new ResourceChangeDetector(true); + _writeLatency = writeLatency; + _baselineCalcCounter = metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(), + CountMetric.class); + _baselineCalcLatency = metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), + LatencyMetric.class); + _rebalanceFailureCount = rebalanceFailureCount; + _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled; + } + + /** + * Global rebalance calculates for a new baseline assignment. + * The new baseline assignment will be persisted and leveraged by the partial rebalance. + * @param clusterData + * @param resourceMap + * @param currentStateOutput + * @param algorithm + * @throws HelixRebalanceException + */ + public void globalRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, + final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException { + _changeDetector.updateSnapshots(clusterData); + // Get all the changed items' information. Filter for the items that have content changed. + final Map> clusterChanges = _changeDetector.getAllChanges(); + + if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) { + final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled; + // Calculate the Baseline assignment for global rebalance. + Future result = _baselineCalculateExecutor.submit(() -> { + try { + // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should + // be triggered again after baseline is finished. + // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance + doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance, + clusterChanges); + } catch (HelixRebalanceException e) { + if (_asyncGlobalRebalanceEnabled) { + _rebalanceFailureCount.increment(1L); + } + LOG.error("Failed to calculate baseline assignment!", e); + return false; + } + return true; + }); + if (waitForGlobalRebalance) { + try { + if (!result.get()) { + throw new HelixRebalanceException("Failed to calculate for the new Baseline.", + HelixRebalanceException.Type.FAILED_TO_CALCULATE); + } + } catch (InterruptedException | ExecutionException e) { + throw new HelixRebalanceException("Failed to execute new Baseline calculation.", + HelixRebalanceException.Type.FAILED_TO_CALCULATE, e); + } + } + } + } + + /** + * Calculate and update the Baseline assignment + * @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance + * so the new Baseline could be applied to cluster. + */ + private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, + RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline, + Map> clusterChanges) throws HelixRebalanceException { + LOG.info("Start calculating the new baseline."); + _baselineCalcCounter.increment(1L); + _baselineCalcLatency.startMeasuringLatency(); + + // Build the cluster model for rebalance calculation. + // Note, for a Baseline calculation, + // 1. Ignore node status (disable/offline). + // 2. Use the previous Baseline as the only parameter about the previous assignment. + Map currentBaseline = + _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); + ClusterModel clusterModel; + try { + clusterModel = + ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, clusterData.getAllInstances(), + clusterChanges, currentBaseline); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.", + HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); + } + + Map newBaseline = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); + boolean isBaselineChanged = + _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline); + // Write the new baseline to metadata store + if (isBaselineChanged) { + try { + _writeLatency.startMeasuringLatency(); + _assignmentMetadataStore.persistBaseline(newBaseline); + _writeLatency.endMeasuringLatency(); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to persist the new baseline assignment.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } else { + LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment."); + } + _baselineCalcLatency.endMeasuringLatency(); + LOG.info("Global baseline calculation completed and has been persisted into metadata store."); + + if (isBaselineChanged && shouldTriggerMainPipeline) { + LOG.info("Schedule a new rebalance after the new baseline calculation has finished."); + RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false); + } + } + + public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) { + _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled; + } + + public ResourceChangeDetector getChangeDetector() { + return _changeDetector; + } + + public void resetChangeDetector() { + _changeDetector.resetSnapshots(); + } + + public void close() { + if (_baselineCalculateExecutor != null) { + _baselineCalculateExecutor.shutdownNow(); + } + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java new file mode 100644 index 0000000000..74982f6e39 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java @@ -0,0 +1,208 @@ +package org.apache.helix.controller.rebalancer.waged; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.helix.HelixRebalanceException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil; +import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; +import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.monitoring.metrics.MetricCollector; +import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge; +import org.apache.helix.monitoring.metrics.model.CountMetric; +import org.apache.helix.monitoring.metrics.model.LatencyMetric; +import org.apache.helix.util.RebalanceUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Compute the best possible assignment based on the Baseline and the previous Best Possible assignment. + * The coordinator compares the previous Best Possible assignment with the current cluster state so as to derive a + * minimal rebalance scope. In short, the rebalance scope only contains the following two types of partitions. + * 1. The partition's current assignment becomes invalid. + * 2. The Baseline contains some new partition assignments that do not exist in the current assignment. + */ +class PartialRebalanceRunner implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(PartialRebalanceRunner.class); + + private final ExecutorService _bestPossibleCalculateExecutor; + private final AssignmentManager _assignmentManager; + private final AssignmentMetadataStore _assignmentMetadataStore; + private final BaselineDivergenceGauge _baselineDivergenceGauge; + private final CountMetric _rebalanceFailureCount; + private final CountMetric _partialRebalanceCounter; + private final LatencyMetric _partialRebalanceLatency; + + private boolean _asyncPartialRebalanceEnabled; + private Future _asyncPartialRebalanceResult; + + public PartialRebalanceRunner(AssignmentManager assignmentManager, + AssignmentMetadataStore assignmentMetadataStore, + MetricCollector metricCollector, + CountMetric rebalanceFailureCount, + boolean isAsyncPartialRebalanceEnabled) { + _assignmentManager = assignmentManager; + _assignmentMetadataStore = assignmentMetadataStore; + _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor(); + _rebalanceFailureCount = rebalanceFailureCount; + _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled; + + _partialRebalanceCounter = metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(), + CountMetric.class); + _partialRebalanceLatency = metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge + .name(), + LatencyMetric.class); + _baselineDivergenceGauge = metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(), + BaselineDivergenceGauge.class); + } + + public void partialRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, + Set activeNodes, final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) + throws HelixRebalanceException { + // If partial rebalance is async and the previous result is not completed yet, + // do not start another partial rebalance. + if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null + && !_asyncPartialRebalanceResult.isDone()) { + return; + } + + _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> { + try { + doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm, + currentStateOutput); + } catch (HelixRebalanceException e) { + if (_asyncPartialRebalanceEnabled) { + _rebalanceFailureCount.increment(1L); + } + LOG.error("Failed to calculate best possible assignment!", e); + return false; + } + return true; + }); + if (!_asyncPartialRebalanceEnabled) { + try { + if (!_asyncPartialRebalanceResult.get()) { + throw new HelixRebalanceException("Failed to calculate for the new best possible.", + HelixRebalanceException.Type.FAILED_TO_CALCULATE); + } + } catch (InterruptedException | ExecutionException e) { + throw new HelixRebalanceException("Failed to execute new best possible calculation.", + HelixRebalanceException.Type.FAILED_TO_CALCULATE, e); + } + } + } + + /** + * Calculate and update the Best Possible assignment + * If the result differ from the persisted result, persist it to memory (only if the version is not stale); + * If persisted, trigger the pipeline so that main thread logic can run again. + */ + private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, + Set activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput) + throws HelixRebalanceException { + LOG.info("Start calculating the new best possible assignment."); + _partialRebalanceCounter.increment(1L); + _partialRebalanceLatency.startMeasuringLatency(); + + int newBestPossibleAssignmentVersion = -1; + if (_assignmentMetadataStore != null) { + newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1; + } else { + LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version."); + } + + // Read the baseline from metadata store + Map currentBaseline = + _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); + + // Read the best possible assignment from metadata store + Map currentBestPossibleAssignment = + _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet()); + ClusterModel clusterModel; + try { + clusterModel = ClusterModelProvider + .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes, + currentBaseline, currentBestPossibleAssignment); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.", + HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); + } + Map newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); + + // Asynchronously report baseline divergence metric before persisting to metadata store, + // just in case if persisting fails, we still have the metric. + // To avoid changes of the new assignment and make it safe when being used to measure baseline + // divergence, use a deep copy of the new assignment. + Map newAssignmentCopy = new HashMap<>(); + for (Map.Entry entry : newAssignment.entrySet()) { + newAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord())); + } + + _baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(), + currentBaseline, newAssignmentCopy); + + boolean bestPossibleUpdateSuccessful = false; + if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) { + bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment, + newBestPossibleAssignmentVersion); + } else { + LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment."); + } + _partialRebalanceLatency.endMeasuringLatency(); + LOG.info("Finish calculating the new best possible assignment."); + + if (bestPossibleUpdateSuccessful) { + LOG.info("Schedule a new rebalance after the new best possible calculation has finished."); + RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false); + } + } + + public void setPartialRebalanceAsyncMode(boolean isAsyncPartialRebalanceEnabled) { + _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled; + } + + public boolean isAsyncPartialRebalanceEnabled() { + return _asyncPartialRebalanceEnabled; + } + + @Override + public void close() { + if (_bestPossibleCalculateExecutor != null) { + _bestPossibleCalculateExecutor.shutdownNow(); + } + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 3fffef2fbf..b43ce4b68e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -28,16 +28,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import org.apache.helix.HelixConstants; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -46,11 +40,11 @@ import org.apache.helix.controller.rebalancer.StatefulRebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil; +import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil; import org.apache.helix.controller.rebalancer.util.WagedValidationUtil; import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory; import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; -import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; @@ -60,10 +54,8 @@ import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.metrics.MetricCollector; import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; -import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge; import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; -import org.apache.helix.util.RebalanceUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,12 +69,6 @@ public class WagedRebalancer implements StatefulRebalancer { private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class); - // When any of the following change happens, the rebalancer needs to do a global rebalance which - // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. - private static final Set GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = - ImmutableSet - .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE, - HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG); // To identify if the preference has been configured or not. private static final Map NOT_CONFIGURED_PREFERENCE = ImmutableMap @@ -97,37 +83,25 @@ public class WagedRebalancer implements StatefulRebalancer _mappingCalculator; private final AssignmentMetadataStore _assignmentMetadataStore; private final MetricCollector _metricCollector; private final CountMetric _rebalanceFailureCount; - private final CountMetric _baselineCalcCounter; - private final LatencyMetric _baselineCalcLatency; private final LatencyMetric _writeLatency; - private final CountMetric _partialRebalanceCounter; - private final LatencyMetric _partialRebalanceLatency; private final CountMetric _emergencyRebalanceCounter; private final LatencyMetric _emergencyRebalanceLatency; private final CountMetric _rebalanceOverwriteCounter; private final LatencyMetric _rebalanceOverwriteLatency; - private final LatencyMetric _stateReadLatency; - private final BaselineDivergenceGauge _baselineDivergenceGauge; - - private boolean _asyncGlobalRebalanceEnabled; - private boolean _asyncPartialRebalanceEnabled; - private Future _asyncPartialRebalanceResult; + private final AssignmentManager _assignmentManager; + private final PartialRebalanceRunner _partialRebalanceRunner; + private final GlobalRebalanceRunner _globalRebalanceRunner; // Note, the rebalance algorithm field is mutable so it should not be directly referred except for // the public method computeNewIdealStates. private RebalanceAlgorithm _rebalanceAlgorithm; - private Map _preference = - NOT_CONFIGURED_PREFERENCE; + private Map _preference = NOT_CONFIGURED_PREFERENCE; private static AssignmentMetadataStore constructAssignmentStore(String metadataStoreAddrs, String clusterName) { @@ -199,20 +173,6 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, _rebalanceFailureCount = _metricCollector.getMetric( WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), CountMetric.class); - _baselineCalcCounter = _metricCollector.getMetric( - WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(), - CountMetric.class); - _baselineCalcLatency = _metricCollector.getMetric( - WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge - .name(), - LatencyMetric.class); - _partialRebalanceCounter = _metricCollector.getMetric( - WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(), - CountMetric.class); - _partialRebalanceLatency = _metricCollector.getMetric( - WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge - .name(), - LatencyMetric.class); _emergencyRebalanceCounter = _metricCollector.getMetric( WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceCounter.name(), CountMetric.class); _emergencyRebalanceLatency = _metricCollector.getMetric( @@ -226,29 +186,24 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, _writeLatency = _metricCollector.getMetric( WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), LatencyMetric.class); - _stateReadLatency = _metricCollector.getMetric( + _assignmentManager = new AssignmentManager(_metricCollector.getMetric( WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(), - LatencyMetric.class); - _baselineDivergenceGauge = _metricCollector.getMetric( - WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(), - BaselineDivergenceGauge.class); + LatencyMetric.class)); - _changeDetector = new ResourceChangeDetector(true); - - _baselineCalculateExecutor = Executors.newSingleThreadExecutor(); - _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor(); - _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled; - _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled; + _partialRebalanceRunner = new PartialRebalanceRunner(_assignmentManager, assignmentMetadataStore, metricCollector, + _rebalanceFailureCount, isAsyncPartialRebalanceEnabled); + _globalRebalanceRunner = new GlobalRebalanceRunner(_assignmentManager, assignmentMetadataStore, metricCollector, + _writeLatency, _rebalanceFailureCount, isAsyncGlobalRebalanceEnabled); } // Update the global rebalance mode to be asynchronous or synchronous public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) { - _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled; + _globalRebalanceRunner.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled); } // Update the partial rebalance mode to be asynchronous or synchronous public void setPartialRebalanceAsyncMode(boolean isAsyncPartialRebalanceEnabled) { - _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled; + _partialRebalanceRunner.setPartialRebalanceAsyncMode(isAsyncPartialRebalanceEnabled); } // Update the rebalancer preference if the new options are different from the current preference. @@ -267,18 +222,14 @@ public void reset() { if (_assignmentMetadataStore != null) { _assignmentMetadataStore.reset(); } - _changeDetector.resetSnapshots(); + _globalRebalanceRunner.resetChangeDetector(); } // TODO the rebalancer should reject any other computing request after being closed. @Override public void close() { - if (_baselineCalculateExecutor != null) { - _baselineCalculateExecutor.shutdownNow(); - } - if (_bestPossibleCalculateExecutor != null) { - _bestPossibleCalculateExecutor.shutdownNow(); - } + _partialRebalanceRunner.close(); + _globalRebalanceRunner.close(); if (_assignmentMetadataStore != null) { _assignmentMetadataStore.close(); } @@ -315,7 +266,7 @@ public Map computeNewIdealStates(ResourceControllerDataProvi // Note that don't return an assignment based on the current state if there is no previously // calculated result in this fallback logic. Map assignmentRecord = - getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(), + _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(), resourceMap.keySet()); newIdealStates = convertResourceAssignment(clusterData, assignmentRecord); } @@ -368,7 +319,7 @@ private Map computeBestPossibleStates( if (!activeNodes.equals(clusterData.getEnabledLiveInstances()) && requireRebalanceOverwrite(clusterData, newBestPossibleAssignment)) { applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap, - getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm); + _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm); } // Replace the assignment if user-defined preference list is configured. // Note the user-defined list is intentionally applied to the final mapping after calculation. @@ -388,7 +339,7 @@ protected Map computeBestPossibleAssignment( RebalanceAlgorithm algorithm) throws HelixRebalanceException { // Perform global rebalance for a new baseline assignment - globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm); + _globalRebalanceRunner.globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm); // Perform emergency rebalance for a new best possible assignment Map newAssignment = emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm); @@ -434,209 +385,6 @@ protected List failureTypesToPropagate() { return FAILURE_TYPES_TO_PROPAGATE; } - /** - * Global rebalance calculates for a new baseline assignment. - * The new baseline assignment will be persisted and leveraged by the partial rebalance. - * @param clusterData - * @param resourceMap - * @param currentStateOutput - * @param algorithm - * @throws HelixRebalanceException - */ - private void globalRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, - final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException { - _changeDetector.updateSnapshots(clusterData); - // Get all the changed items' information. Filter for the items that have content changed. - final Map> clusterChanges = _changeDetector.getAllChanges(); - - if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) { - final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled; - // Calculate the Baseline assignment for global rebalance. - Future result = _baselineCalculateExecutor.submit(() -> { - try { - // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should - // be triggered again after baseline is finished. - // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance - doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance, - clusterChanges); - } catch (HelixRebalanceException e) { - if (_asyncGlobalRebalanceEnabled) { - _rebalanceFailureCount.increment(1L); - } - LOG.error("Failed to calculate baseline assignment!", e); - return false; - } - return true; - }); - if (waitForGlobalRebalance) { - try { - if (!result.get()) { - throw new HelixRebalanceException("Failed to calculate for the new Baseline.", - HelixRebalanceException.Type.FAILED_TO_CALCULATE); - } - } catch (InterruptedException | ExecutionException e) { - throw new HelixRebalanceException("Failed to execute new Baseline calculation.", - HelixRebalanceException.Type.FAILED_TO_CALCULATE, e); - } - } - } - } - - /** - * Calculate and update the Baseline assignment - * @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance - * so the new Baseline could be applied to cluster. - */ - private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, - RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline, - Map> clusterChanges) throws HelixRebalanceException { - LOG.info("Start calculating the new baseline."); - _baselineCalcCounter.increment(1L); - _baselineCalcLatency.startMeasuringLatency(); - - // Build the cluster model for rebalance calculation. - // Note, for a Baseline calculation, - // 1. Ignore node status (disable/offline). - // 2. Use the previous Baseline as the only parameter about the previous assignment. - Map currentBaseline = - getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); - ClusterModel clusterModel; - try { - clusterModel = - ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, clusterData.getAllInstances(), - clusterChanges, currentBaseline); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.", - HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); - } - - Map newBaseline = calculateAssignment(clusterModel, algorithm); - boolean isBaselineChanged = - _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline); - // Write the new baseline to metadata store - if (isBaselineChanged) { - try { - _writeLatency.startMeasuringLatency(); - _assignmentMetadataStore.persistBaseline(newBaseline); - _writeLatency.endMeasuringLatency(); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to persist the new baseline assignment.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); - } - } else { - LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment."); - } - _baselineCalcLatency.endMeasuringLatency(); - LOG.info("Global baseline calculation completed and has been persisted into metadata store."); - - if (isBaselineChanged && shouldTriggerMainPipeline) { - LOG.info("Schedule a new rebalance after the new baseline calculation has finished."); - RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false); - } - } - - private void partialRebalance( - ResourceControllerDataProvider clusterData, Map resourceMap, - Set activeNodes, final CurrentStateOutput currentStateOutput, - RebalanceAlgorithm algorithm) - throws HelixRebalanceException { - // If partial rebalance is async and the previous result is not completed yet, - // do not start another partial rebalance. - if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null - && !_asyncPartialRebalanceResult.isDone()) { - return; - } - - _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> { - try { - doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm, - currentStateOutput); - } catch (HelixRebalanceException e) { - if (_asyncPartialRebalanceEnabled) { - _rebalanceFailureCount.increment(1L); - } - LOG.error("Failed to calculate best possible assignment!", e); - return false; - } - return true; - }); - if (!_asyncPartialRebalanceEnabled) { - try { - if (!_asyncPartialRebalanceResult.get()) { - throw new HelixRebalanceException("Failed to calculate for the new best possible.", - HelixRebalanceException.Type.FAILED_TO_CALCULATE); - } - } catch (InterruptedException | ExecutionException e) { - throw new HelixRebalanceException("Failed to execute new best possible calculation.", - HelixRebalanceException.Type.FAILED_TO_CALCULATE, e); - } - } - } - - /** - * Calculate and update the Best Possible assignment - */ - private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map resourceMap, - Set activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput) - throws HelixRebalanceException { - LOG.info("Start calculating the new best possible assignment."); - _partialRebalanceCounter.increment(1L); - _partialRebalanceLatency.startMeasuringLatency(); - - int newBestPossibleAssignmentVersion = -1; - if (_assignmentMetadataStore != null) { - newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1; - } else { - LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version."); - } - - // Read the baseline from metadata store - Map currentBaseline = - getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); - - // Read the best possible assignment from metadata store - Map currentBestPossibleAssignment = - getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, - resourceMap.keySet()); - ClusterModel clusterModel; - try { - clusterModel = ClusterModelProvider - .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes, - currentBaseline, currentBestPossibleAssignment); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.", - HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); - } - Map newAssignment = calculateAssignment(clusterModel, algorithm); - - // Asynchronously report baseline divergence metric before persisting to metadata store, - // just in case if persisting fails, we still have the metric. - // To avoid changes of the new assignment and make it safe when being used to measure baseline - // divergence, use a deep copy of the new assignment. - Map newAssignmentCopy = new HashMap<>(); - for (Map.Entry entry : newAssignment.entrySet()) { - newAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord())); - } - - _baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(), - currentBaseline, newAssignmentCopy); - - boolean bestPossibleUpdateSuccessful = false; - if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) { - bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment, - newBestPossibleAssignmentVersion); - } else { - LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment."); - } - _partialRebalanceLatency.endMeasuringLatency(); - LOG.info("Finish calculating the new best possible assignment."); - - if (bestPossibleUpdateSuccessful) { - LOG.info("Schedule a new rebalance after the new best possible calculation has finished."); - RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false); - } - } - protected Map emergencyRebalance( ResourceControllerDataProvider clusterData, Map resourceMap, Set activeNodes, final CurrentStateOutput currentStateOutput, @@ -647,7 +395,7 @@ protected Map emergencyRebalance( _emergencyRebalanceLatency.startMeasuringLatency(); Map currentBestPossibleAssignment = - getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, + _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); // Step 1: Check for permanent node down @@ -677,7 +425,7 @@ protected Map emergencyRebalance( throw new HelixRebalanceException("Failed to generate cluster model for emergency rebalance.", HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); } - newAssignment = calculateAssignment(clusterModel, algorithm); + newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); } else { newAssignment = currentBestPossibleAssignment; } @@ -687,9 +435,9 @@ protected Map emergencyRebalance( _emergencyRebalanceLatency.endMeasuringLatency(); LOG.info("Finish emergency rebalance"); - partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm); - if (!_asyncPartialRebalanceEnabled) { - newAssignment = getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, + _partialRebalanceRunner.partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm); + if (!_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) { + newAssignment = _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); persistBestPossibleAssignment(newAssignment); } @@ -697,24 +445,6 @@ protected Map emergencyRebalance( return newAssignment; } - /** - * @param clusterModel the cluster model that contains all the cluster status for the purpose of - * rebalancing. - * @return the new optimal assignment for the resources. - */ - private Map calculateAssignment(ClusterModel clusterModel, - RebalanceAlgorithm algorithm) throws HelixRebalanceException { - long startTime = System.currentTimeMillis(); - LOG.info("Start calculating for an assignment with algorithm {}", - algorithm.getClass().getSimpleName()); - OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel); - Map newAssignment = - optimalAssignment.getOptimalResourceAssignment(); - LOG.info("Finish calculating an assignment with algorithm {}. Took: {} ms.", - algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime); - return newAssignment; - } - // Generate the preference lists from the state mapping based on state priority. private Map> getPreferenceLists(ResourceAssignment newAssignment, Map statePriorityMap) { @@ -751,39 +481,6 @@ private void validateInput(ResourceControllerDataProvider clusterData, } } - /** - * @param assignmentMetadataStore - * @param currentStateOutput - * @param resources - * @return The current baseline assignment. If record does not exist in the - * assignmentMetadataStore, return the current state assignment. - * @throws HelixRebalanceException - */ - private Map getBaselineAssignment( - AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput, - Set resources) throws HelixRebalanceException { - Map currentBaseline = new HashMap<>(); - if (assignmentMetadataStore != null) { - try { - _stateReadLatency.startMeasuringLatency(); - currentBaseline = new HashMap<>(assignmentMetadataStore.getBaseline()); - _stateReadLatency.endMeasuringLatency(); - } catch (Exception ex) { - throw new HelixRebalanceException( - "Failed to get the current baseline assignment because of unexpected error.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); - } - } - currentBaseline.keySet().retainAll(resources); - - // For resources without baseline, fall back to current state assignments - Set missingResources = new HashSet<>(resources); - missingResources.removeAll(currentBaseline.keySet()); - currentBaseline.putAll(currentStateOutput.getAssignment(missingResources)); - - return currentBaseline; - } - /** * @param assignmentMetadataStore * @param currentStateOutput @@ -795,26 +492,7 @@ private Map getBaselineAssignment( protected Map getBestPossibleAssignment( AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput, Set resources) throws HelixRebalanceException { - Map currentBestAssignment = new HashMap<>(); - if (assignmentMetadataStore != null) { - try { - _stateReadLatency.startMeasuringLatency(); - currentBestAssignment = new HashMap<>(assignmentMetadataStore.getBestPossibleAssignment()); - _stateReadLatency.endMeasuringLatency(); - } catch (Exception ex) { - throw new HelixRebalanceException( - "Failed to get the current best possible assignment because of unexpected error.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); - } - } - currentBestAssignment.keySet().retainAll(resources); - - // For resources without best possible states, fall back to current state assignments - Set missingResources = new HashSet<>(resources); - missingResources.removeAll(currentBestAssignment.keySet()); - currentBestAssignment.putAll(currentStateOutput.getAssignment(missingResources)); - - return currentBestAssignment; + return _assignmentManager.getBestPossibleAssignment(assignmentMetadataStore, currentStateOutput, resources); } private void persistBestPossibleAssignment(Map bestPossibleAssignment) @@ -917,7 +595,7 @@ protected void applyRebalanceOverwrite(Map idealStateMap, HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); } Map activeIdealStates = - convertResourceAssignment(clusterData, calculateAssignment(clusterModel, algorithm)); + convertResourceAssignment(clusterData, WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm)); for (String resourceName : idealStateMap.keySet()) { // The new calculated ideal state before overwrite IdealState newIdealState = idealStateMap.get(resourceName); @@ -968,7 +646,7 @@ protected MetricCollector getMetricCollector() { } protected ResourceChangeDetector getChangeDetector() { - return _changeDetector; + return _globalRebalanceRunner.getChangeDetector(); } @Override diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index d011c5c2e5..3344fe14cb 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -20,12 +20,9 @@ */ import java.io.IOException; -import java.sql.Array; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -35,7 +32,6 @@ import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil; import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm; import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel; import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; @@ -55,7 +51,6 @@ import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.mockito.ArgumentCaptor; -import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; From 90c365cfb1f81f8c13a4cb54da38db0de5a32a75 Mon Sep 17 00:00:00 2001 From: "Qi (Quincy) Qu" Date: Tue, 4 Apr 2023 09:54:53 -0400 Subject: [PATCH 2/2] Resolve comments --- .../helix/controller/rebalancer/waged/AssignmentManager.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java index 45834fc443..475e8aad14 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java @@ -29,6 +29,9 @@ import org.apache.helix.monitoring.metrics.model.LatencyMetric; +/** + * A manager class for fetching assignment from metadata store. + */ class AssignmentManager { private final LatencyMetric _stateReadLatency;