Skip to content

Commit

Permalink
YARN-5325. Stateless ARMRMProxy policies implementation. (Carlo Curin…
Browse files Browse the repository at this point in the history
…o via Subru).

(cherry picked from commit 11c5336)
  • Loading branch information
subru authored and Carlo Curino committed Aug 2, 2017
1 parent 0662996 commit 1dadd0b
Show file tree
Hide file tree
Showing 31 changed files with 1,839 additions and 342 deletions.
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -16,82 +16,87 @@
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.federation.policies.router;
package org.apache.hadoop.yarn.server.federation.policies;

import java.util.Map;

import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;

import java.util.Map;

/**
* Abstract class provides common validation of reinitialize(), for all
* policies that are "weight-based".
* Base abstract class for a weighted {@link ConfigurableFederationPolicy}.
*/
public abstract class BaseWeightedRouterPolicy
implements FederationRouterPolicy {
public abstract class AbstractConfigurableFederationPolicy
implements ConfigurableFederationPolicy {

private WeightedPolicyInfo policyInfo = null;
private FederationPolicyInitializationContext policyContext;
private boolean isDirty;

public BaseWeightedRouterPolicy() {
public AbstractConfigurableFederationPolicy() {
}

@Override
public void reinitialize(FederationPolicyInitializationContext
federationPolicyContext)
public void reinitialize(
FederationPolicyInitializationContext initializationContext)
throws FederationPolicyInitializationException {
isDirty = true;
FederationPolicyInitializationContextValidator
.validate(federationPolicyContext, this.getClass().getCanonicalName());
.validate(initializationContext, this.getClass().getCanonicalName());

// perform consistency checks
WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo
.fromByteBuffer(
federationPolicyContext.getSubClusterPolicyConfiguration()
.getParams());
WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo.fromByteBuffer(
initializationContext.getSubClusterPolicyConfiguration().getParams());

// if nothing has changed skip the rest of initialization
// and signal to childs that the reinit is free via isDirty var.
if (policyInfo != null && policyInfo.equals(newPolicyInfo)) {
isDirty = false;
return;
}

validate(newPolicyInfo);
setPolicyInfo(newPolicyInfo);
this.policyContext = federationPolicyContext;
this.policyContext = initializationContext;
}

/**
* Overridable validation step for the policy configuration.
*
* @param newPolicyInfo the configuration to test.
* @throws FederationPolicyInitializationException if the configuration is
* not valid.
*
* @throws FederationPolicyInitializationException if the configuration is not
* valid.
*/
public void validate(WeightedPolicyInfo newPolicyInfo) throws
FederationPolicyInitializationException {
public void validate(WeightedPolicyInfo newPolicyInfo)
throws FederationPolicyInitializationException {
if (newPolicyInfo == null) {
throw new FederationPolicyInitializationException("The policy to "
+ "validate should not be null.");
}
Map<SubClusterIdInfo, Float> newWeights =
newPolicyInfo.getRouterPolicyWeights();
if (newWeights == null || newWeights.size() < 1) {
throw new FederationPolicyInitializationException(
"Weight vector cannot be null/empty.");
"The policy to " + "validate should not be null.");
}
}

/**
* Returns true whether the last reinitialization requires actual changes, or
* was "free" as the weights have not changed. This is used by subclasses
* overriding reinitialize and calling super.reinitialize() to know wheter to
* quit early.
*
* @return whether more work is needed to initialize.
*/
public boolean getIsDirty() {
return isDirty;
}

/**
* Getter method for the configuration weights.
*
* @return the {@link WeightedPolicyInfo} representing the policy
* configuration.
* configuration.
*/
public WeightedPolicyInfo getPolicyInfo() {
return policyInfo;
Expand All @@ -101,15 +106,15 @@ public WeightedPolicyInfo getPolicyInfo() {
* Setter method for the configuration weights.
*
* @param policyInfo the {@link WeightedPolicyInfo} representing the policy
* configuration.
* configuration.
*/
public void setPolicyInfo(
WeightedPolicyInfo policyInfo) {
public void setPolicyInfo(WeightedPolicyInfo policyInfo) {
this.policyInfo = policyInfo;
}

/**
* Getter method for the {@link FederationPolicyInitializationContext}.
*
* @return the context for this policy.
*/
public FederationPolicyInitializationContext getPolicyContext() {
Expand All @@ -118,6 +123,7 @@ public FederationPolicyInitializationContext getPolicyContext() {

/**
* Setter method for the {@link FederationPolicyInitializationContext}.
*
* @param policyContext the context to assign to this policy.
*/
public void setPolicyContext(
Expand All @@ -130,13 +136,14 @@ public void setPolicyContext(
* FederationStateStoreFacade} and validate it not being null/empty.
*
* @return the map of ids to info for all active subclusters.
*
* @throws YarnException if we can't get the list.
*/
protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
throws YarnException {

Map<SubClusterId, SubClusterInfo> activeSubclusters = getPolicyContext()
.getFederationStateStoreFacade().getSubClusters(true);
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getPolicyContext().getFederationStateStoreFacade().getSubClusters(true);

if (activeSubclusters == null || activeSubclusters.size() < 1) {
throw new NoActiveSubclustersException(
Expand All @@ -145,6 +152,4 @@ protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
return activeSubclusters;
}



}
Expand Up @@ -31,14 +31,11 @@ public interface ConfigurableFederationPolicy {
* policies. The implementor should provide try-n-swap semantics, and retain
* state if possible.
*
* @param federationPolicyInitializationContext the new context to provide to
* implementor.
* @param policyContext the new context to provide to implementor.
*
* @throws FederationPolicyInitializationException in case the initialization
* fails.
* fails.
*/
void reinitialize(
FederationPolicyInitializationContext
federationPolicyInitializationContext)
void reinitialize(FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException;
}
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.policies;

import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;

Expand All @@ -30,27 +31,27 @@ public class FederationPolicyInitializationContext {
private SubClusterPolicyConfiguration federationPolicyConfiguration;
private SubClusterResolver federationSubclusterResolver;
private FederationStateStoreFacade federationStateStoreFacade;
private SubClusterId homeSubcluster;

public FederationPolicyInitializationContext() {
federationPolicyConfiguration = null;
federationSubclusterResolver = null;
federationStateStoreFacade = null;
}

public FederationPolicyInitializationContext(SubClusterPolicyConfiguration
policy, SubClusterResolver resolver, FederationStateStoreFacade
storeFacade) {
public FederationPolicyInitializationContext(
SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
FederationStateStoreFacade storeFacade) {
this.federationPolicyConfiguration = policy;
this.federationSubclusterResolver = resolver;
this.federationStateStoreFacade = storeFacade;
}


/**
* Getter for the {@link SubClusterPolicyConfiguration}.
*
* @return the {@link SubClusterPolicyConfiguration} to be used for
* initialization.
* initialization.
*/
public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() {
return federationPolicyConfiguration;
Expand All @@ -59,8 +60,8 @@ public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() {
/**
* Setter for the {@link SubClusterPolicyConfiguration}.
*
* @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration}
* to be used for initialization.
* @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration} to
* be used for initialization.
*/
public void setSubClusterPolicyConfiguration(
SubClusterPolicyConfiguration fedPolicyConfiguration) {
Expand All @@ -80,7 +81,7 @@ public SubClusterResolver getFederationSubclusterResolver() {
* Setter for the {@link SubClusterResolver}.
*
* @param federationSubclusterResolver the {@link SubClusterResolver} to be
* used for initialization.
* used for initialization.
*/
public void setFederationSubclusterResolver(
SubClusterResolver federationSubclusterResolver) {
Expand All @@ -105,4 +106,24 @@ public void setFederationStateStoreFacade(
FederationStateStoreFacade federationStateStoreFacade) {
this.federationStateStoreFacade = federationStateStoreFacade;
}

/**
* Returns the current home sub-cluster. Useful for default policy behaviors.
*
* @return the home sub-cluster.
*/
public SubClusterId getHomeSubcluster() {
return homeSubcluster;
}

/**
* Sets in the context the home sub-cluster. Useful for default policy
* behaviors.
*
* @param homeSubcluster value to set.
*/
public void setHomeSubcluster(SubClusterId homeSubcluster) {
this.homeSubcluster = homeSubcluster;
}

}
Expand Up @@ -25,50 +25,44 @@
public final class FederationPolicyInitializationContextValidator {

private FederationPolicyInitializationContextValidator() {
//disable constructor per checkstyle
// disable constructor per checkstyle
}

public static void validate(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
String myType) throws FederationPolicyInitializationException {
FederationPolicyInitializationContext policyContext, String myType)
throws FederationPolicyInitializationException {

if (myType == null) {
throw new FederationPolicyInitializationException("The myType parameter"
+ " should not be null.");
throw new FederationPolicyInitializationException(
"The myType parameter" + " should not be null.");
}

if (federationPolicyInitializationContext == null) {
if (policyContext == null) {
throw new FederationPolicyInitializationException(
"The FederationPolicyInitializationContext provided is null. Cannot"
+ " reinitalize "
+ "successfully.");
+ " reinitalize " + "successfully.");
}

if (federationPolicyInitializationContext.getFederationStateStoreFacade()
== null) {
if (policyContext.getFederationStateStoreFacade() == null) {
throw new FederationPolicyInitializationException(
"The FederationStateStoreFacade provided is null. Cannot"
+ " reinitalize successfully.");
}

if (federationPolicyInitializationContext.getFederationSubclusterResolver()
== null) {
if (policyContext.getFederationSubclusterResolver() == null) {
throw new FederationPolicyInitializationException(
"The FederationStateStoreFacase provided is null. Cannot"
+ " reinitalize successfully.");
}

if (federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
== null) {
if (policyContext.getSubClusterPolicyConfiguration() == null) {
throw new FederationPolicyInitializationException(
"The FederationSubclusterResolver provided is null. Cannot "
+ "reinitalize successfully.");
}

String intendedType =
federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
.getType();
policyContext.getSubClusterPolicyConfiguration().getType();

if (!myType.equals(intendedType)) {
throw new FederationPolicyInitializationException(
Expand Down

0 comments on commit 1dadd0b

Please sign in to comment.