Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Refine the WAGED rebalancer related interfaces for integration" #437

Merged
merged 1 commit into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@
*/

import com.google.common.collect.Sets;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixProperty;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;

/**
* ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
Expand All @@ -39,7 +37,6 @@
* WARNING: the methods of this class are not thread-safe.
*/
public class ResourceChangeDetector implements ChangeDetector {
private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName());

private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
Expand Down Expand Up @@ -111,13 +108,10 @@ private void clearCachedComputation() {
return snapshot.getResourceConfigMap();
case LIVE_INSTANCE:
return snapshot.getLiveInstances();
case CONFIG:
return Collections.emptyMap();
default:
LOG.warn(
"ResourceChangeDetector cannot compute the names of changes for the given ChangeType: {}",
changeType);
return Collections.emptyMap();
throw new HelixException(String.format(
"ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s",
changeType));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.apache.helix.controller.rebalancer;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.HelixManager;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;

import java.util.Map;

public interface GlobalRebalancer<T extends BaseControllerDataProvider> {
enum RebalanceFailureType {
INVALID_CLUSTER_STATUS,
INVALID_REBALANCER_STATUS,
FAILED_TO_CALCULATE,
UNKNOWN_FAILURE
}

class RebalanceFailureReason {
private final static String DEFAULT_REASON_MESSAGE = "No detail";
private final RebalanceFailureType _type;
private final String _reason;

public RebalanceFailureReason(RebalanceFailureType type) {
this(type, DEFAULT_REASON_MESSAGE);
}

public RebalanceFailureReason(RebalanceFailureType type, String reason) {
_type = type;
_reason = reason;
}

public RebalanceFailureType get_type() {
return _type;
}

public String get_reason() {
return _reason;
}
}

void init(HelixManager manager);

Map<String, IdealState> computeNewIdealState(final CurrentStateOutput currentStateOutput,
T clusterData, Map<String, Resource> resourceMap);

RebalanceFailureReason getFailureReason();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.apache.helix.controller.rebalancer.waged;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* A placeholder before we have the Cluster Data Detector implemented.
*
* @param <T> The cache class that can be handled by the detector.
*/
public class ClusterDataDetector<T extends BaseControllerDataProvider> {
/**
* All the cluster change type that may trigger a WAGED rebalancer re-calculation.
*/
public enum ChangeType {
BaselineAssignmentChange,
InstanceConfigChange,
ClusterConfigChange,
ResourceConfigChange,
ResourceIdealStatesChange,
InstanceStateChange,
OtherChange
}

private Map<ChangeType, Set<String>> _currentChanges =
Collections.singletonMap(ChangeType.ClusterConfigChange, Collections.emptySet());

public void updateClusterStatus(T cache) {
}

/**
* Returns all change types detected during the ClusterDetection stage.
*/
public Set<ChangeType> getChangeTypes() {
return _currentChanges.keySet();
}

/**
* Returns a set of the names of components that changed based on the given change type.
*/
public Set<String> getChangesBasedOnType(ChangeType changeType) {
return _currentChanges.get(changeType);
}

/**
* Return a map of the change details <type, change details>.
*/
public Map<ChangeType, Set<String>> getAllChanges() {
return _currentChanges;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.apache.helix.controller.rebalancer.waged;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.model.ResourceAssignment;

import java.util.Map;
import java.util.Set;

/**
* A placeholder before we have the implementation.
*
* The data provider generates the Cluster Model based on the controller's data cache.
*/
public class ClusterDataProvider {

/**
* @param dataProvider The controller's data cache.
* @param activeInstances The logical active instances that will be used in the calculation. Note
* This list can be different from the real active node list according to
* the rebalancer logic.
* @param clusterChanges All the cluster changes that happened after the previous rebalance.
* @param baselineAssignment The persisted Baseline assignment.
* @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
* previous rebalance.
* @return The cluster model as the input for the upcoming rebalance.
*/
protected static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
Set<String> activeInstances, Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> baselineAssignment,
Map<String, ResourceAssignment> bestPossibleAssignment) {
// TODO finish the implementation.
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
* under the License.
*/

import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintsRebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.GlobalRebalancer;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;
Expand All @@ -39,57 +36,23 @@
* A placeholder before we have the implementation.
* Weight-Aware Globally-Even Distribute Rebalancer.
*
* @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
* Design Document
* </a>
* @see <a href="Design Document">https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer</a>
*/
public class WagedRebalancer {
public class WagedRebalancer implements GlobalRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);

// --------- The following fields are placeholders and need replacement. -----------//
// TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
// ------------------------------------------------------------------------------------//
@Override
public void init(HelixManager manager) { }

// The cluster change detector is a stateful object. Make it static to avoid unnecessary
// reinitialization.
private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
new ThreadLocal<>();
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;

private ResourceChangeDetector getChangeDetector() {
if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
}
return CHANGE_DETECTOR_THREAD_LOCAL.get();
}

public WagedRebalancer(HelixManager helixManager) {
// TODO init the metadata store according to their requirement when integrate, or change to final static method if possible.
_assignmentMetadataStore = new AssignmentMetadataStore();
// TODO init the algorithm according to the requirement when integrate.
_rebalanceAlgorithm = new ConstraintsRebalanceAlgorithm();

// Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment
// output.
// This calculator will translate the best possible assignment into an applicable state mapping
// based on the current states.
// TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer
_mappingCalculator = new DelayedAutoRebalancer();
@Override
public Map<String, IdealState> computeNewIdealState(CurrentStateOutput currentStateOutput,
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
throws HelixException {
return new HashMap<>();
}

/**
* Compute the new IdealStates for all the resources input. The IdealStates include both the new
* partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
* @param clusterData The Cluster status data provider.
* @param resourceMap A map containing all the rebalancing resources.
* @param currentStateOutput The present Current State of the cluster.
* @return A map containing the computed new IdealStates.
*/
public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
return new HashMap<>();
@Override
public RebalanceFailureReason getFailureReason() {
return new RebalanceFailureReason(RebalanceFailureType.UNKNOWN_FAILURE);
}
}
Loading