Skip to content

Commit

Permalink
YARN-5457. Refactor DistributedScheduling framework to pull out commo…
Browse files Browse the repository at this point in the history
…n functionality. (asuresh)
  • Loading branch information
xslogic committed Aug 9, 2016
1 parent 8f9b618 commit 82c9e06
Show file tree
Hide file tree
Showing 18 changed files with 860 additions and 562 deletions.
Expand Up @@ -91,6 +91,8 @@ public void doTest(int numMappers, int numReducers, int numNodes,
Configuration conf = new Configuration(); Configuration conf = new Configuration();
// Start the mini-MR and mini-DFS clusters // Start the mini-MR and mini-DFS clusters
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setBoolean(YarnConfiguration.
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
dfsCluster = new MiniDFSCluster.Builder(conf) dfsCluster = new MiniDFSCluster.Builder(conf)
Expand Down
Expand Up @@ -306,55 +306,60 @@ public static boolean isAclEnabled(Configuration conf) {
YARN_PREFIX + "distributed-scheduling.enabled"; YARN_PREFIX + "distributed-scheduling.enabled";
public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false; public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;


/** Minimum memory (in MB) used for allocating a container through distributed /** Setting that controls whether opportunistic container allocation
* scheduling. */ * is enabled or not. */
public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB = public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED =
YARN_PREFIX + "distributed-scheduling.min-container-memory-mb"; YARN_PREFIX + "opportunistic-container-allocation.enabled";
public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512; public static final boolean

OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false;
/** Minimum virtual CPU cores used for allocating a container through
* distributed scheduling. */ /** Minimum memory (in MB) used for allocating an opportunistic container. */
public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES = public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB =
YARN_PREFIX + "distributed-scheduling.min-container-vcores"; YARN_PREFIX + "opportunistic-containers.min-memory-mb";
public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1; public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512;


/** Maximum memory (in MB) used for allocating a container through distributed /** Minimum virtual CPU cores used for allocating an opportunistic container.
* scheduling. */ * */
public static final String DIST_SCHEDULING_MAX_MEMORY_MB = public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES =
YARN_PREFIX + "distributed-scheduling.max-container-memory-mb"; YARN_PREFIX + "opportunistic-containers.min-vcores";
public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048; public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1;


/** Maximum virtual CPU cores used for allocating a container through /** Maximum memory (in MB) used for allocating an opportunistic container. */
* distributed scheduling. */ public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB =
public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES = YARN_PREFIX + "opportunistic-containers.max-memory-mb";
YARN_PREFIX + "distributed-scheduling.max-container-vcores"; public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048;
public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4;

/** Maximum virtual CPU cores used for allocating an opportunistic container.
/** Incremental memory (in MB) used for allocating a container through * */
* distributed scheduling. */ public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES =
public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB = YARN_PREFIX + "opportunistic-containers.max-vcores";
YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb"; public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4;
public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT =
/** Incremental memory (in MB) used for allocating an opportunistic container.
* */
public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB =
YARN_PREFIX + "opportunistic-containers.incr-memory-mb";
public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT =
512; 512;


/** Incremental virtual CPU cores used for allocating a container through /** Incremental virtual CPU cores used for allocating an opportunistic
* distributed scheduling. */ * container. */
public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES = public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES =
YARN_PREFIX + "distributed-scheduling.incr-vcores"; YARN_PREFIX + "opportunistic-containers.incr-vcores";
public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1; public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1;


/** Container token expiry for container allocated via distributed /** Container token expiry for opportunistic containers. */
* scheduling. */ public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS =
public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms";
YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms"; public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT =
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
600000; 600000;


/** Number of nodes to be used by the LocalScheduler of a NodeManager for /** Number of nodes to be used by the Opportunistic Container allocator for
* dispatching containers during distributed scheduling. */ * dispatching containers during container allocation. */
public static final String DIST_SCHEDULING_NODES_NUMBER_USED = public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED =
YARN_PREFIX + "distributed-scheduling.nodes-used"; YARN_PREFIX + "opportunistic-container-allocation.nodes-used";
public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10; public static final int OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT =
10;


/** Frequency for computing least loaded NMs. */ /** Frequency for computing least loaded NMs. */
public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
Expand Down Expand Up @@ -2829,6 +2834,18 @@ public static String getClusterId(Configuration conf) {
return clusterId; return clusterId;
} }


public static boolean isDistSchedulingEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
}

public static boolean isOpportunisticContainerAllocationEnabled(
Configuration conf) {
return conf.getBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT);
}

// helper methods for timeline service configuration // helper methods for timeline service configuration
/** /**
* Returns whether the timeline service is enabled via configuration. * Returns whether the timeline service is enabled via configuration.
Expand Down
Expand Up @@ -84,7 +84,7 @@
* specifying OPPORTUNISTIC containers in its resource requests, * specifying OPPORTUNISTIC containers in its resource requests,
* the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
* on the NM and the DistributedSchedulingProtocol used by the framework to talk * on the NM and the DistributedSchedulingProtocol used by the framework to talk
* to the DistributedSchedulingAMService running on the RM. * to the OpportunisticContainerAllocatorAMService running on the RM.
*/ */
public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {


Expand All @@ -105,6 +105,8 @@ public void doBefore() throws Exception {


conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setBoolean(YarnConfiguration.
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
cluster.init(conf); cluster.init(conf);
Expand Down
Expand Up @@ -2761,72 +2761,76 @@


<property> <property>
<description> <description>
Minimum memory (in MB) used for allocating a container through distributed Setting that controls whether opportunistic container allocation
scheduling. is enabled.
</description> </description>
<name>yarn.distributed-scheduling.min-container-memory-mb</name> <name>yarn.opportunistic-container-allocation.enabled</name>
<value>false</value>
</property>

<property>
<description>
Minimum memory (in MB) used for allocating an opportunistic container.
</description>
<name>yarn.opportunistic-containers.min-memory-mb</name>
<value>512</value> <value>512</value>
</property> </property>


<property> <property>
<description> <description>
Minimum virtual CPU cores used for allocating a container through Minimum virtual CPU cores used for allocating an opportunistic container.
distributed scheduling.
</description> </description>
<name>yarn.distributed-scheduling.min-container-vcores</name> <name>yarn.opportunistic-containers.min-vcores</name>
<value>1</value> <value>1</value>
</property> </property>


<property> <property>
<description> <description>
Maximum memory (in MB) used for allocating a container through distributed Maximum memory (in MB) used for allocating an opportunistic container.
scheduling.
</description> </description>
<name>yarn.distributed-scheduling.max-container-memory-mb</name> <name>yarn.opportunistic-containers.max-memory-mb</name>
<value>2048</value> <value>2048</value>
</property> </property>


<property> <property>
<description> <description>
Maximum virtual CPU cores used for allocating a container through Maximum virtual CPU cores used for allocating an opportunistic container.
distributed scheduling.
</description> </description>
<name>yarn.distributed-scheduling.max-container-vcores</name> <name>yarn.opportunistic-containers.max-vcores</name>
<value>4</value> <value>4</value>
</property> </property>


<property> <property>
<description> <description>
Incremental memory (in MB) used for allocating a container through Incremental memory (in MB) used for allocating an opportunistic container.
distributed scheduling.
</description> </description>
<name>yarn.distributed-scheduling.incr-container-memory-mb</name> <name>yarn.opportunistic-containers.incr-memory-mb</name>
<value>512</value> <value>512</value>
</property> </property>


<property> <property>
<description> <description>
Incremental virtual CPU cores used for allocating a container through Incremental virtual CPU cores used for allocating an opportunistic
distributed scheduling. container.
</description> </description>
<name>yarn.distributed-scheduling.incr-vcores</name> <name>yarn.opportunistic-containers.incr-vcores</name>
<value>1</value> <value>1</value>
</property> </property>


<property> <property>
<description> <description>
Container token expiry for container allocated via distributed scheduling. Container token expiry for opportunistic containers.
</description> </description>
<name>yarn.distributed-scheduling.container-token-expiry-ms</name> <name>yarn.opportunistic-containers.container-token-expiry-ms</name>
<value>600000</value> <value>600000</value>
</property> </property>


<property> <property>
<description> <description>
Number of nodes to be used by the LocalScheduler of a NodeManager for Number of nodes to be used by the Opportunistic Container Allocator for
dispatching containers during distributed scheduling. dispatching containers during container allocation.
</description> </description>
<name>yarn.distributed-scheduling.nodes-used</name> <name>yarn.opportunistic-container-allocation.nodes-used</name>
<value>10</value> <value>10</value>
</property> </property>


Expand Down

0 comments on commit 82c9e06

Please sign in to comment.