Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement AssignmentMetadataStore #453

Merged
merged 6 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@
* under the License.
*/

import java.io.IOException;
import java.util.Arrays;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ResourceAssignment;

import java.util.HashMap;
Expand All @@ -28,24 +38,116 @@
* A placeholder before we have the real assignment metadata store.
*/
public class AssignmentMetadataStore {
private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
private static final String ASSIGNMENT_METADATA_KEY = "ASSIGNMENT_METADATA";
private static final String BASELINE_TEMPLATE = "/%s/%s/BASELINE";
private static final String BEST_POSSIBLE_TEMPLATE = "/%s/%s/BEST_POSSIBLE";
private static final String BASELINE_KEY = "BASELINE";
private static final String BEST_POSSIBLE_KEY = "BEST_POSSIBLE";
private static final ZkSerializer SERIALIZER = new ZNRecordJacksonSerializer();

private BucketDataAccessor _dataAccessor;
private String _baselinePath;
private String _bestPossiblePath;
private Map<String, ResourceAssignment> _globalBaseline;
narendly marked this conversation as resolved.
Show resolved Hide resolved
private Map<String, ResourceAssignment> _bestPossibleAssignment;

public AssignmentMetadataStore(HelixManager helixManager) {
_dataAccessor = new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString());
_baselinePath =
narendly marked this conversation as resolved.
Show resolved Hide resolved
String.format(BASELINE_TEMPLATE, helixManager.getClusterName(), ASSIGNMENT_METADATA_KEY);
narendly marked this conversation as resolved.
Show resolved Hide resolved
_bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, helixManager.getClusterName(),
ASSIGNMENT_METADATA_KEY);
}

public Map<String, ResourceAssignment> getBaseline() {
return _persistGlobalBaseline;
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_globalBaseline == null) {
try {
HelixProperty baseline =
_dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
_globalBaseline = splitAssignments(baseline);
} catch (HelixException e) {
// Metadata does not exist, so return an empty map
_globalBaseline = new HashMap<>();
narendly marked this conversation as resolved.
Show resolved Hide resolved
}
}
return _globalBaseline;
}

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_bestPossibleAssignment == null) {
try {
HelixProperty baseline =
_dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
_bestPossibleAssignment = splitAssignments(baseline);
} catch (HelixException e) {
narendly marked this conversation as resolved.
Show resolved Hide resolved
// Metadata does not exist, so return an empty map
_bestPossibleAssignment = new HashMap<>();
}
}
return _bestPossibleAssignment;
}

public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
// TODO clean up invalid items
_persistGlobalBaseline = globalBaseline;
// Update the in-memory reference
_globalBaseline = globalBaseline;

// TODO: Make the write async?
// Persist to ZK
HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
try {
_dataAccessor.compressedBucketWrite(_baselinePath, combinedAssignments);
} catch (IOException e) {
// TODO: Improve failure handling
throw new HelixException("Failed to persist baseline!", e);
narendly marked this conversation as resolved.
Show resolved Hide resolved
}
}

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _persistBestPossibleAssignment;
public void persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
// Update the in-memory reference
_bestPossibleAssignment.putAll(bestPossibleAssignment);
narendly marked this conversation as resolved.
Show resolved Hide resolved

// TODO: Make the write async?
// Persist to ZK asynchronously
HelixProperty combinedAssignments =
combineAssignments(BEST_POSSIBLE_KEY, bestPossibleAssignment);
try {
_dataAccessor.compressedBucketWrite(_bestPossiblePath, combinedAssignments);
} catch (IOException e) {
// TODO: Improve failure handling
throw new HelixException("Failed to persist baseline!", e);
}
}

/**
* Produces one HelixProperty that contains all assignment data.
* @param name
* @param assignmentMap
* @return
*/
private HelixProperty combineAssignments(String name,
Map<String, ResourceAssignment> assignmentMap) {
HelixProperty property = new HelixProperty(name);
// Add each resource's assignment as a simple field in one ZNRecord
assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource,
Arrays.toString(SERIALIZER.serialize(assignment.getRecord()))));
return property;
}

public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
// TODO clean up invalid items
_persistBestPossibleAssignment.putAll(bestPossibleAssignment);
/**
* Returns a Map of (ResourceName, ResourceAssignment) pairs.
* @param property
* @return
*/
private Map<String, ResourceAssignment> splitAssignments(HelixProperty property) {
Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
// Convert each resource's assignment String into a ResourceAssignment object and put it in a
// map
property.getRecord().getSimpleFields()
.forEach((resource, assignment) -> assignmentMap.put(resource,
new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignment.getBytes()))));
return assignmentMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@

/**
* Weight-Aware Globally-Even Distribute Rebalancer.
*
* @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
* Design Document
* </a>
* @see <a
* href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
* Design Document
* </a>
*/
public class WagedRebalancer {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);

// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
Collections.unmodifiableSet(new HashSet<>(Arrays
.asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
Collections
.unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.CLUSTER_CONFIG,
HelixConstants.ChangeType.INSTANCE_CONFIG)));
// The cluster change detector is a stateful object.
Expand All @@ -73,16 +73,17 @@ public class WagedRebalancer {
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;

// --------- The following fields are placeholders and need replacement. -----------//
// TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
// TODO Shall we make the metadata store a static threadlocal object as well to avoid
// reinitialization?
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
// ------------------------------------------------------------------------------------//

public WagedRebalancer(HelixManager helixManager) {
this(
// TODO init the metadata store according to their requirement when integrate,
// or change to final static method if possible.
new AssignmentMetadataStore(),
// or change to final static method if possible.
new AssignmentMetadataStore(helixManager),
// TODO parse the cluster setting
ConstraintBasedAlgorithmFactory.getInstance(),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
Expand All @@ -103,14 +104,14 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());

}

/**
* Compute the new IdealStates for all the input resources. The IdealStates include both new
* partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
*
* @param clusterData The Cluster status data provider.
* @param resourceMap A map containing all the rebalancing resources.
* @param clusterData The Cluster status data provider.
* @param resourceMap A map containing all the rebalancing resources.
* @param currentStateOutput The present Current States of the resources.
* @return A map of the new IdealStates with the resource name as key.
*/
Expand All @@ -124,8 +125,8 @@ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvi
IdealState is = clusterData.getIdealState(resourceEntry.getKey());
return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
&& getClass().getName().equals(is.getRebalancerClassName());
}).collect(Collectors
.toMap(resourceEntry -> resourceEntry.getKey(), resourceEntry -> resourceEntry.getValue()));
}).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
resourceEntry -> resourceEntry.getValue()));

if (resourceMap.isEmpty()) {
LOG.warn("There is no valid resource to be rebalanced by {}",
Expand All @@ -140,13 +141,13 @@ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvi
Map<String, IdealState> newIdealStates = computeBestPossibleStates(clusterData, resourceMap);

// Construct the new best possible states according to the current state and target assignment.
// Note that the new ideal state might be an intermediate state between the current state and the target assignment.
// Note that the new ideal state might be an intermediate state between the current state and
// the target assignment.
for (IdealState is : newIdealStates.values()) {
String resourceName = is.getResourceName();
// Adjust the states according to the current state.
ResourceAssignment finalAssignment = _mappingCalculator
.computeBestPossiblePartitionState(clusterData, is, resourceMap.get(resourceName),
currentStateOutput);
ResourceAssignment finalAssignment = _mappingCalculator.computeBestPossiblePartitionState(
clusterData, is, resourceMap.get(resourceName), currentStateOutput);

// Clean up the state mapping fields. Use the final assignment that is calculated by the
// mapping calculator to replace them.
Expand Down Expand Up @@ -195,10 +196,10 @@ private Map<String, IdealState> computeBestPossibleStates(
IdealState newIdeaState;
try {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Map<String, Integer> statePriorityMap =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
.getStatePriorityMap();
// Create a new IdealState instance contains the new calculated assignment in the preference list.
Map<String, Integer> statePriorityMap = clusterData
.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
// Create a new IdealState instance contains the new calculated assignment in the preference
// list.
newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState,
newAssignment.get(resourceName), statePriorityMap);
} catch (Exception ex) {
Expand Down Expand Up @@ -227,9 +228,8 @@ private void refreshBaseline(ResourceControllerDataProvider clusterData,
throw new HelixRebalanceException("Failed to get the current baseline assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
Map<String, ResourceAssignment> baseline =
calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(),
Collections.emptyMap(), currentBaseline);
Map<String, ResourceAssignment> baseline = calculateAssignment(clusterData, clusterChanges,
resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline);
try {
_assignmentMetadataStore.persistBaseline(baseline);
} catch (Exception ex) {
Expand All @@ -254,9 +254,8 @@ private Map<String, ResourceAssignment> partialRebalance(
throw new HelixRebalanceException("Failed to get the persisted assignment records.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
Map<String, ResourceAssignment> newAssignment =
calculateAssignment(clusterData, clusterChanges, resourceMap, activeInstances, baseline,
prevBestPossibleAssignment);
Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
resourceMap, activeInstances, baseline, prevBestPossibleAssignment);
try {
// TODO Test to confirm if persisting the final assignment (with final partition states)
// would be a better option.
Expand All @@ -271,13 +270,13 @@ private Map<String, ResourceAssignment> partialRebalance(

/**
* Generate the cluster model based on the input and calculate the optimal assignment.
*
* @param clusterData the cluster data cache.
* @param clusterChanges the detected cluster changes.
* @param resourceMap the rebalancing resources.
* @param activeNodes the alive and enabled nodes.
* @param baseline the baseline assignment for the algorithm as a reference.
* @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a reference.
* @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a
* reference.
* @return the new optimal assignment for the resources.
*/
private Map<String, ResourceAssignment> calculateAssignment(
Expand All @@ -289,9 +288,8 @@ private Map<String, ResourceAssignment> calculateAssignment(
LOG.info("Start calculating for an assignment");
ClusterModel clusterModel;
try {
clusterModel = ClusterModelProvider
.generateClusterModel(clusterData, resourceMap, activeNodes, clusterChanges, baseline,
prevBestPossibleAssignment);
clusterModel = ClusterModelProvider.generateClusterModel(clusterData, resourceMap,
activeNodes, clusterChanges, baseline, prevBestPossibleAssignment);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.codehaus.jackson.map.ObjectMapper;

/**
* ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using MessagePack's
* serializer. Note that this serializer doesn't check for the size of the resulting binary.
* ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using Jackson. Note that
* this serializer doesn't check for the size of the resulting binary.
*/
public class ZNRecordJacksonSerializer implements ZkSerializer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import org.apache.helix.HelixManager;
import org.apache.helix.model.ResourceAssignment;

import java.util.HashMap;
Expand All @@ -32,6 +33,11 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();

public MockAssignmentMetadataStore() {
// In-memory mock component, so pass null for HelixManager since it's not needed
super(null);
}

public Map<String, ResourceAssignment> getBaseline() {
return _persistGlobalBaseline;
}
Expand All @@ -44,7 +50,8 @@ public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _persistBestPossibleAssignment;
}

public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
public void persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
_persistBestPossibleAssignment = bestPossibleAssignment;
}

Expand Down
Loading