Skip to content

Commit

Permalink
Revert "Refine the WAGED rebalancer related interfaces for integration (
Browse files Browse the repository at this point in the history
apache#431)" (apache#437)

This reverts commit 08a2015.
  • Loading branch information
jiajunwang authored and i3wangyi committed Sep 3, 2019
1 parent 502800d commit e1c622b
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 202 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@
*/

import com.google.common.collect.Sets;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixProperty;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;

/**
* ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
Expand All @@ -39,7 +37,6 @@
* WARNING: the methods of this class are not thread-safe.
*/
public class ResourceChangeDetector implements ChangeDetector {
private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName());

private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
Expand Down Expand Up @@ -111,13 +108,10 @@ private void clearCachedComputation() {
return snapshot.getResourceConfigMap();
case LIVE_INSTANCE:
return snapshot.getLiveInstances();
case CONFIG:
return Collections.emptyMap();
default:
LOG.warn(
"ResourceChangeDetector cannot compute the names of changes for the given ChangeType: {}",
changeType);
return Collections.emptyMap();
throw new HelixException(String.format(
"ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s",
changeType));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.apache.helix.controller.rebalancer;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.HelixManager;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;

import java.util.Map;

public interface GlobalRebalancer<T extends BaseControllerDataProvider> {
enum RebalanceFailureType {
INVALID_CLUSTER_STATUS,
INVALID_REBALANCER_STATUS,
FAILED_TO_CALCULATE,
UNKNOWN_FAILURE
}

class RebalanceFailureReason {
private final static String DEFAULT_REASON_MESSAGE = "No detail";
private final RebalanceFailureType _type;
private final String _reason;

public RebalanceFailureReason(RebalanceFailureType type) {
this(type, DEFAULT_REASON_MESSAGE);
}

public RebalanceFailureReason(RebalanceFailureType type, String reason) {
_type = type;
_reason = reason;
}

public RebalanceFailureType get_type() {
return _type;
}

public String get_reason() {
return _reason;
}
}

void init(HelixManager manager);

Map<String, IdealState> computeNewIdealState(final CurrentStateOutput currentStateOutput,
T clusterData, Map<String, Resource> resourceMap);

RebalanceFailureReason getFailureReason();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.apache.helix.controller.rebalancer.waged;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* A placeholder before we have the Cluster Data Detector implemented.
*
* @param <T> The cache class that can be handled by the detector.
*/
public class ClusterDataDetector<T extends BaseControllerDataProvider> {
/**
* All the cluster change type that may trigger a WAGED rebalancer re-calculation.
*/
public enum ChangeType {
BaselineAssignmentChange,
InstanceConfigChange,
ClusterConfigChange,
ResourceConfigChange,
ResourceIdealStatesChange,
InstanceStateChange,
OtherChange
}

private Map<ChangeType, Set<String>> _currentChanges =
Collections.singletonMap(ChangeType.ClusterConfigChange, Collections.emptySet());

public void updateClusterStatus(T cache) {
}

/**
* Returns all change types detected during the ClusterDetection stage.
*/
public Set<ChangeType> getChangeTypes() {
return _currentChanges.keySet();
}

/**
* Returns a set of the names of components that changed based on the given change type.
*/
public Set<String> getChangesBasedOnType(ChangeType changeType) {
return _currentChanges.get(changeType);
}

/**
* Return a map of the change details <type, change details>.
*/
public Map<ChangeType, Set<String>> getAllChanges() {
return _currentChanges;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.apache.helix.controller.rebalancer.waged;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.model.ResourceAssignment;

import java.util.Map;
import java.util.Set;

/**
* A placeholder before we have the implementation.
*
* The data provider generates the Cluster Model based on the controller's data cache.
*/
public class ClusterDataProvider {

/**
* @param dataProvider The controller's data cache.
* @param activeInstances The logical active instances that will be used in the calculation. Note
* This list can be different from the real active node list according to
* the rebalancer logic.
* @param clusterChanges All the cluster changes that happened after the previous rebalance.
* @param baselineAssignment The persisted Baseline assignment.
* @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
* previous rebalance.
* @return The cluster model as the input for the upcoming rebalance.
*/
protected static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
Set<String> activeInstances, Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> baselineAssignment,
Map<String, ResourceAssignment> bestPossibleAssignment) {
// TODO finish the implementation.
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
* under the License.
*/

import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintsRebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.GlobalRebalancer;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;
Expand All @@ -39,57 +36,23 @@
* A placeholder before we have the implementation.
* Weight-Aware Globally-Even Distribute Rebalancer.
*
* @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
* Design Document
* </a>
* @see <a href="Design Document">https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer</a>
*/
public class WagedRebalancer {
public class WagedRebalancer implements GlobalRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);

// --------- The following fields are placeholders and need replacement. -----------//
// TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
// ------------------------------------------------------------------------------------//
@Override
public void init(HelixManager manager) { }

// The cluster change detector is a stateful object. Make it static to avoid unnecessary
// reinitialization.
private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
new ThreadLocal<>();
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;

private ResourceChangeDetector getChangeDetector() {
if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
}
return CHANGE_DETECTOR_THREAD_LOCAL.get();
}

public WagedRebalancer(HelixManager helixManager) {
// TODO init the metadata store according to their requirement when integrate, or change to final static method if possible.
_assignmentMetadataStore = new AssignmentMetadataStore();
// TODO init the algorithm according to the requirement when integrate.
_rebalanceAlgorithm = new ConstraintsRebalanceAlgorithm();

// Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment
// output.
// This calculator will translate the best possible assignment into an applicable state mapping
// based on the current states.
// TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer
_mappingCalculator = new DelayedAutoRebalancer();
@Override
public Map<String, IdealState> computeNewIdealState(CurrentStateOutput currentStateOutput,
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
throws HelixException {
return new HashMap<>();
}

/**
* Compute the new IdealStates for all the resources input. The IdealStates include both the new
* partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
* @param clusterData The Cluster status data provider.
* @param resourceMap A map containing all the rebalancing resources.
* @param currentStateOutput The present Current State of the cluster.
* @return A map containing the computed new IdealStates.
*/
public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
return new HashMap<>();
@Override
public RebalanceFailureReason getFailureReason() {
return new RebalanceFailureReason(RebalanceFailureType.UNKNOWN_FAILURE);
}
}

0 comments on commit e1c622b

Please sign in to comment.