Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.resource
import org.apache.spark.annotation.{Evolving, Since}

/**
* An Executor resource request. This is used in conjunction with the ResourceProfile to
* An Executor resource request. This is used in conjunction with the [[ResourceProfile]] to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
Expand All @@ -39,7 +39,7 @@ import org.apache.spark.annotation.{Evolving, Since}
*
* See the configuration and cluster specific docs for more details.
*
* Use ExecutorResourceRequests class as a convenience API.
* Use [[ExecutorResourceRequests]] class as a convenience API.
*
* @param resourceName Name of the resource
* @param amount Amount requesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,19 @@ class ExecutorResourceRequests() extends Serializable {

private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()

/**
* Returns all the resource requests for the task.
*/
def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap

/**
* (Java-specific) Returns all the resource requests for the executor.
*/
def requestsJMap: JMap[String, ExecutorResourceRequest] = requests.asJava

/**
* Specify heap memory. The value specified will be converted to MiB.
* This is a convenient API to add [[ExecutorResourceRequest]] for "memory" resource.
*
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
* Default unit is MiB if not specified.
Expand All @@ -57,6 +64,7 @@ class ExecutorResourceRequests() extends Serializable {
/**
* Specify off heap memory. The value specified will be converted to MiB.
* This value only take effect when MEMORY_OFFHEAP_ENABLED is true.
* This is a convenient API to add [[ExecutorResourceRequest]] for "offHeap" resource.
*
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
* Default unit is MiB if not specified.
Expand All @@ -70,6 +78,7 @@ class ExecutorResourceRequests() extends Serializable {

/**
* Specify overhead memory. The value specified will be converted to MiB.
* This is a convenient API to add [[ExecutorResourceRequest]] for "memoryOverhead" resource.
*
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
* Default unit is MiB if not specified.
Expand All @@ -83,6 +92,7 @@ class ExecutorResourceRequests() extends Serializable {

/**
* Specify pyspark memory. The value specified will be converted to MiB.
* This is a convenient API to add [[ExecutorResourceRequest]] for "pyspark.memory" resource.
*
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
* Default unit is MiB if not specified.
Expand All @@ -96,6 +106,7 @@ class ExecutorResourceRequests() extends Serializable {

/**
* Specify number of cores per Executor.
* This is a convenient API to add [[ExecutorResourceRequest]] for "cores" resource.
*
* @param amount Number of cores to allocate per Executor.
*/
Expand All @@ -111,6 +122,7 @@ class ExecutorResourceRequests() extends Serializable {
* like GPUs are gpu (spark configs spark.executor.resource.gpu.*). If you pass in a resource
* that the cluster manager doesn't support the result is undefined, it may error or may just
* be ignored.
* This is a convenient API to add [[ExecutorResourceRequest]] for custom resources.
*
* @param resourceName Name of the resource.
* @param amount amount of that resource per executor to use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.SparkException
* Trait used to help executor/worker allocate resources.
* Please note that this is intended to be used in a single thread.
*/
trait ResourceAllocator {
private[spark] trait ResourceAllocator {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This's probably mistakenly exposed in 3.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this impact folks working on schedulers outside of org.apache.spark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it wouldn't since there's no way to plug-in a custom ResourceAllocator in Spark yet.


protected def resourceName: String
protected def resourceAddresses: Seq[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ import org.apache.spark.util.Utils
* Resource profile to associate with an RDD. A ResourceProfile allows the user to
* specify executor and task requirements for an RDD that will get applied during a
* stage. This allows the user to change the resource requirements between stages.
* This is meant to be immutable so user can't change it after building.
* This is meant to be immutable so user can't change it after building. Users
* should use [[ResourceProfileBuilder]] to build it.
*
* @param executorResources Resource requests for executors. Mapped from the resource
* name (e.g., cores, memory, CPU) to its specific request.
* @param taskResources Resource requests for tasks. Mapped from the resource
* name (e.g., cores, memory, CPU) to its specific request.
*/
@Evolving
@Since("3.1.0")
Expand All @@ -53,6 +59,9 @@ class ResourceProfile(
private var _maxTasksPerExecutor: Option[Int] = None
private var _coresLimitKnown: Boolean = false

/**
* A unique id of this ResourceProfile
*/
def id: Int = _id

/**
Expand Down Expand Up @@ -242,17 +251,39 @@ class ResourceProfile(

object ResourceProfile extends Logging {
// task resources
/**
* built-in task resource: cpus
*/
val CPUS = "cpus"
// Executor resources
// Make sure add new executor resource in below allSupportedExecutorResources
/**
* built-in executor resource: cores
*/
val CORES = "cores"
/**
* built-in executor resource: cores
*/
val MEMORY = "memory"
/**
* built-in executor resource: offHeap
*/
val OFFHEAP_MEM = "offHeap"
/**
* built-in executor resource: memoryOverhead
*/
val OVERHEAD_MEM = "memoryOverhead"
/**
* built-in executor resource: pyspark.memory
*/
val PYSPARK_MEM = "pyspark.memory"

// all supported spark executor resources (minus the custom resources like GPUs/FPGAs)
val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
/**
* Return all supported Spark built-in executor resources, custom resources like GPUs/FPGAs
* are excluded.
*/
def allSupportedExecutorResources: Array[String] =
Array(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)

val UNKNOWN_RESOURCE_PROFILE_ID = -1
val DEFAULT_RESOURCE_PROFILE_ID = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@ import org.apache.spark.annotation.{Evolving, Since}


/**
* Resource profile builder to build a Resource profile to associate with an RDD.
* A ResourceProfile allows the user to specify executor and task requirements for an RDD
* that will get applied during a stage. This allows the user to change the resource
* Resource profile builder to build a [[ResourceProfile]] to associate with an RDD.
* A [[ResourceProfile]] allows the user to specify executor and task resource requirements
* for an RDD that will get applied during a stage. This allows the user to change the resource
* requirements between stages.
*
*/
@Evolving
@Since("3.1.0")
class ResourceProfileBuilder() {

// Task resource requests specified by users, mapped from resource name to the request.
private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
// Executor resource requests specified by users, mapped from resource name to the request.
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()

def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
Expand All @@ -54,11 +56,21 @@ class ResourceProfileBuilder() {
_executorResources.asScala.asJava
}

/**
* Add executor resource requests
* @param requests The detailed executor resource requests, see [[ExecutorResourceRequests]]
* @return This ResourceProfileBuilder
*/
def require(requests: ExecutorResourceRequests): this.type = {
_executorResources.putAll(requests.requests.asJava)
this
}

/**
* Add task resource requests
* @param requests The detailed task resource requests, see [[TaskResourceRequest]]
* @return This ResourceProfileBuilder
*/
def require(requests: TaskResourceRequests): this.type = {
_taskResources.putAll(requests.requests.asJava)
this
Expand All @@ -80,7 +92,7 @@ class ResourceProfileBuilder() {
s"task resources: ${_taskResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}"
}

def build: ResourceProfile = {
def build(): ResourceProfile = {
new ResourceProfile(executorResources, taskResources)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ package org.apache.spark.resource
import org.apache.spark.annotation.{Evolving, Since}

/**
* A task resource request. This is used in conjunction with the ResourceProfile to
* A task resource request. This is used in conjunction with the [[ResourceProfile]] to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
* Use TaskResourceRequests class as a convenience API.
* Use [[TaskResourceRequests]] class as a convenience API.
*
* @param resourceName Resource name
* @param amount Amount requesting as a Double to support fractional resource requests.
* Valid values are less than or equal to 0.5 or whole numbers. This essentially
* lets you configure X number of tasks to run on a single resource,
* ie amount equals 0.5 translates into 2 tasks per resource address.
*/
@Evolving
@Since("3.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,19 @@ class TaskResourceRequests() extends Serializable {

private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()

/**
* Returns all the resource requests for the task.
*/
def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap

/**
* (Java-specific) Returns all the resource requests for the task.
*/
def requestsJMap: JMap[String, TaskResourceRequest] = requests.asJava

/**
* Specify number of cpus per Task.
* This is a convenient API to add [[TaskResourceRequest]] for cpus.
*
* @param amount Number of cpus to allocate per Task.
*/
Expand All @@ -52,7 +59,8 @@ class TaskResourceRequests() extends Serializable {
}

/**
* Amount of a particular custom resource(GPU, FPGA, etc) to use.
* Amount of a particular custom resource(GPU, FPGA, etc) to use.
* This is a convenient API to add [[TaskResourceRequest]] for custom resources.
*
* @param resourceName Name of the resource.
* @param amount Amount requesting as a Double to support fractional resource requests.
Expand All @@ -66,6 +74,9 @@ class TaskResourceRequests() extends Serializable {
this
}

/**
* Add a certain [[TaskResourceRequest]] to the request set.
*/
def addRequest(treq: TaskResourceRequest): this.type = {
_taskResources.put(treq.resourceName, treq)
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
ereq.cores(4).memory("2g").memoryOverhead("1g").pysparkMemory("3g")
treq.cpus(2)
rpb.require(ereq).require(treq)
val rp = rpb.build
val rp = rpb.build()
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
val executor = step.configurePod(SparkPod.initialPod())

Expand All @@ -307,7 +307,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
ereq.cores(2).resource("gpu", 2, "/path/getGpusResources.sh", "nvidia.com")
treq.cpus(1)
rpb.require(ereq).require(treq)
val rp = rpb.build
val rp = rpb.build()
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
val executor = step.configurePod(SparkPod.initialPod())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
ereq.cores(4).memory("2g")
treq.cpus(2)
rpb.require(ereq).require(treq)
val rp = rpb.build
val rp = rpb.build()

// Target 1 executor for default profile, 2 for other profile,
// make sure it's requested, even with an empty initial snapshot.
Expand Down