diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 6f0182255e5fe..e4140f659d979 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -27,6 +27,7 @@ import org.apache.spark._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.rdd.RDD +import org.apache.spark.resource.ResourceProfile import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -49,6 +50,20 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) */ def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel)) + /** + * Specify a ResourceProfile to use when calculating this RDD. This is only supported on + * certain cluster managers and currently requires dynamic allocation to be enabled. + * It will result in new executors with the resources specified being acquired to + * calculate the RDD. + */ + def withResources(rp: ResourceProfile): JavaRDD[T] = wrapRDD(rdd.withResources(rp)) + + /** + * Get the ResourceProfile specified with this RDD or None if it wasn't specified. + * @return the user specified ResourceProfile or null if none was specified + */ + def getResourceProfile(): ResourceProfile = rdd.getResourceProfile() + /** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. * This method blocks until all blocks are deleted. diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 06c9446c7534e..b3d8780190fc2 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -35,6 +35,7 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES} import org.apache.spark.internal.config.Python._ +import org.apache.spark.resource.ResourceProfile.{EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY} import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -85,9 +86,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val conf = SparkEnv.get.conf protected val bufferSize: Int = conf.get(BUFFER_SIZE) private val reuseWorker = conf.get(PYTHON_WORKER_REUSE) - // each python worker gets an equal part of the allocation. the worker pool will grow to the - // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / conf.get(EXECUTOR_CORES)) // All the Python functions should have the same exec, version and envvars. protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars @@ -106,26 +104,41 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( // Authentication helper used when serving method calls via socket from Python side. private lazy val authHelper = new SocketAuthHelper(conf) + // each python worker gets an equal part of the allocation. the worker pool will grow to the + // number of concurrent tasks, which is determined by the number of cores in this executor. + private def getWorkerMemoryMb(mem: Option[Long], cores: Int): Option[Long] = { + mem.map(_ / cores) + } + def compute( inputIterator: Iterator[IN], partitionIndex: Int, context: TaskContext): Iterator[OUT] = { val startTime = System.currentTimeMillis val env = SparkEnv.get + + // Get the executor cores and pyspark memory, they are passed via the local properties when + // the user specified them in a ResourceProfile. + val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) + val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") // if OMP_NUM_THREADS is not explicitly set, override it with the number of cores if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool // see https://github.com/numpy/numpy/issues/10455 - conf.getOption("spark.executor.cores").foreach(envVars.put("OMP_NUM_THREADS", _)) + execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _)) } envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") } - if (memoryMb.isDefined) { - envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString) + // SPARK-30299 this could be wrong with standalone mode when executor + // cores might not be correct because it defaults to all cores on the box. + val execCores = execCoresProp.map(_.toInt).getOrElse(conf.get(EXECUTOR_CORES)) + val workerMemoryMb = getWorkerMemoryMb(memoryMb, execCores) + if (workerMemoryMb.isDefined) { + envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString) } envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap) diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala index d4c29f9a70c44..4ee1a07564042 100644 --- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala +++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala @@ -17,6 +17,7 @@ package org.apache.spark.resource +import java.util.{Map => JMap} import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -38,6 +39,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable { def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap + def requestsJMap: JMap[String, ExecutorResourceRequest] = requests.asJava + /** * Specify heap memory. The value specified will be converted to MiB. * diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 96c456ed51746..e072f2d27f60d 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -29,7 +29,6 @@ import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY -import org.apache.spark.util.Utils /** * Resource profile to associate with an RDD. A ResourceProfile allows the user to @@ -76,6 +75,10 @@ class ResourceProfile( taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt) } + private[spark] def getPySparkMemory: Option[Long] = { + executorResources.get(ResourceProfile.PYSPARK_MEM).map(_.amount.toLong) + } + /* * This function takes into account fractional amounts for the task resource requirement. * Spark only supports fractional amounts < 1 to basically allow for multiple tasks @@ -325,4 +328,7 @@ object ResourceProfile extends Logging { private[spark] def getTaskCpusOrDefaultForProfile(rp: ResourceProfile, conf: SparkConf): Int = { rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK)) } + + private[spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = "resource.pyspark.memory" + private[spark] val EXECUTOR_CORES_LOCAL_PROPERTY = "resource.executor.cores" } diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala index 9a5114fc1d0a4..09f4e02eee9e0 100644 --- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala @@ -17,6 +17,7 @@ package org.apache.spark.resource +import java.util.{Map => JMap} import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -37,6 +38,8 @@ private[spark] class TaskResourceRequests() extends Serializable { def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap + def requestsJMap: JMap[String, TaskResourceRequest] = requests.asJava + /** * Specify number of cpus per Task. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 079cf110a7ce9..bede0c561993e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -39,6 +39,7 @@ import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.{RDD, RDDCheckpointData} import org.apache.spark.resource.ResourceProfile +import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat @@ -1135,6 +1136,27 @@ private[spark] class DAGScheduler( } } + /** + * `PythonRunner` needs to know what the pyspark memory and cores settings are for the profile + * being run. Pass them in the local properties of the task if it's set for the stage profile. + */ + private def addPySparkConfigsToProperties(stage: Stage, properties: Properties): Unit = { + val rp = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId) + val pysparkMem = rp.getPySparkMemory + // use the getOption on EXECUTOR_CORES.key instead of using the EXECUTOR_CORES config reader + // because the default for this config isn't correct for standalone mode. Here we want + // to know if it was explicitly set or not. The default profile always has it set to either + // what user specified or default so special case it here. + val execCores = if (rp.id == DEFAULT_RESOURCE_PROFILE_ID) { + sc.conf.getOption(config.EXECUTOR_CORES.key) + } else { + val profCores = rp.getExecutorCores.map(_.toString) + if (profCores.isEmpty) sc.conf.getOption(config.EXECUTOR_CORES.key) else profCores + } + pysparkMem.map(mem => properties.setProperty(PYSPARK_MEMORY_LOCAL_PROPERTY, mem.toString)) + execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores)) + } + /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int): Unit = { logDebug("submitMissingTasks(" + stage + ")") @@ -1154,6 +1176,7 @@ private[spark] class DAGScheduler( // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties + addPySparkConfigsToProperties(stage, properties) runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index a8252e03b5c15..82b11de29d350 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -73,6 +73,10 @@ import org.apache.spark.partial.BoundedDouble; import org.apache.spark.partial.PartialResult; import org.apache.spark.rdd.RDD; +import org.apache.spark.resource.ExecutorResourceRequests; +import org.apache.spark.resource.ResourceProfile; +import org.apache.spark.resource.ResourceProfileBuilder; +import org.apache.spark.resource.TaskResourceRequests; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.LongAccumulator; @@ -897,6 +901,16 @@ public void persist() { assertEquals(1, rdd.first().intValue()); } + @Test + public void withResources() { + ExecutorResourceRequests ereqs = new ExecutorResourceRequests().cores(4); + TaskResourceRequests treqs = new TaskResourceRequests().cpus(1); + ResourceProfile rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build(); + JavaRDD in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); + in1.withResources(rp1); + assertEquals(rp1, in1.getResourceProfile()); + } + @Test public void iterator() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 391e4bbe1b1f0..85e6a1e9fadac 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -398,6 +398,21 @@ def __hash__(self): ) +pyspark_resource = Module( + name="pyspark-resource", + dependencies=[ + pyspark_core + ], + source_file_regexes=[ + "python/pyspark/resource" + ], + python_test_goals=[ + # unittests + "pyspark.resource.tests.test_resources", + ] +) + + pyspark_streaming = Module( name="pyspark-streaming", dependencies=[ diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 316a5b4d01273..0daf09b17a82a 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -152,6 +152,7 @@ def killChild(): java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.ml.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") + java_import(gateway.jvm, "org.apache.spark.resource.*") # TODO(davies): move into sql java_import(gateway.jvm, "org.apache.spark.sql.*") java_import(gateway.jvm, "org.apache.spark.sql.api.python.*") diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 52ab86c0d88ee..fbf645d10ee86 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -47,11 +47,14 @@ from pyspark.statcounter import StatCounter from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler from pyspark.storagelevel import StorageLevel +from pyspark.resource.executorrequests import ExecutorResourceRequests +from pyspark.resource.resourceprofile import ResourceProfile +from pyspark.resource.taskrequests import TaskResourceRequests from pyspark.resultiterable import ResultIterable from pyspark.shuffle import Aggregator, ExternalMerger, \ get_used_memory, ExternalSorter, ExternalGroupBy from pyspark.traceback_utils import SCCallSiteSync -from pyspark.util import fail_on_stopiteration +from pyspark.util import fail_on_stopiteration, _parse_memory __all__ = ["RDD"] @@ -125,22 +128,6 @@ def __new__(cls, mean, confidence, low, high): return obj -def _parse_memory(s): - """ - Parse a memory string in the format supported by Java (e.g. 1g, 200m) and - return the value in MiB - - >>> _parse_memory("256m") - 256 - >>> _parse_memory("2g") - 2048 - """ - units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024} - if s[-1].lower() not in units: - raise ValueError("invalid format: " + s) - return int(float(s[:-1]) * units[s[-1].lower()]) - - def _create_local_socket(sock_info): """ Create a local socket that can be used to load deserialized data from the JVM @@ -256,6 +243,7 @@ def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSeri self._jrdd = jrdd self.is_cached = False self.is_checkpointed = False + self.has_resource_profile = False self.ctx = ctx self._jrdd_deserializer = jrdd_deserializer self._id = jrdd.id() @@ -2483,6 +2471,47 @@ def _is_barrier(self): """ return self._jrdd.rdd().isBarrier() + def withResources(self, profile): + """ + .. note:: Experimental + + Specify a :class:`pyspark.resource.ResourceProfile` to use when calculating this RDD. + This is only supported on certain cluster managers and currently requires dynamic + allocation to be enabled. It will result in new executors with the resources specified + being acquired to calculate the RDD. + + .. versionadded:: 3.1.0 + """ + self.has_resource_profile = True + if profile._java_resource_profile is not None: + jrp = profile._java_resource_profile + else: + builder = self.ctx._jvm.org.apache.spark.resource.ResourceProfileBuilder() + ereqs = ExecutorResourceRequests(self.ctx._jvm, profile._executor_resource_requests) + treqs = TaskResourceRequests(self.ctx._jvm, profile._task_resource_requests) + builder.require(ereqs._java_executor_resource_requests) + builder.require(treqs._java_task_resource_requests) + jrp = builder.build() + + self._jrdd.withResources(jrp) + return self + + def getResourceProfile(self): + """ + .. note:: Experimental + + Get the :class:`pyspark.resource.ResourceProfile` specified with this RDD or None + if it wasn't specified. + :return: the user specified ResourceProfile or None if none were specified + + .. versionadded:: 3.1.0 + """ + rp = self._jrdd.getResourceProfile() + if rp is not None: + return ResourceProfile(_java_resource_profile=rp) + else: + return None + def _prepare_for_python_RDD(sc, command): # the serialized command will be compressed by broadcast @@ -2587,6 +2616,7 @@ def pipeline_func(split, iterator): self._prev_jrdd = prev._prev_jrdd # maintain the pipeline self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer self.is_cached = False + self.has_resource_profile = False self.is_checkpointed = False self.ctx = prev.ctx self.prev = prev @@ -2629,7 +2659,7 @@ def id(self): return self._id def _is_pipelinable(self): - return not (self.is_cached or self.is_checkpointed) + return not (self.is_cached or self.is_checkpointed or self.has_resource_profile) def _is_barrier(self): return self.is_barrier diff --git a/python/pyspark/resource/__init__.py b/python/pyspark/resource/__init__.py new file mode 100644 index 0000000000000..89070ec4adc7e --- /dev/null +++ b/python/pyspark/resource/__init__.py @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +APIs to let users manipulate resource requirements. +""" +from pyspark.resource.executorrequests import ExecutorResourceRequest, ExecutorResourceRequests +from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests +from pyspark.resource.resourceprofilebuilder import ResourceProfileBuilder +from pyspark.resource.resourceprofile import ResourceProfile + +__all__ = [ + "TaskResourceRequest", "TaskResourceRequests", "ExecutorResourceRequest", + "ExecutorResourceRequests", "ResourceProfile", "ResourceProfileBuilder", +] diff --git a/python/pyspark/resource/executorrequests.py b/python/pyspark/resource/executorrequests.py new file mode 100644 index 0000000000000..91a195c94b6e5 --- /dev/null +++ b/python/pyspark/resource/executorrequests.py @@ -0,0 +1,169 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.resource.taskrequests import TaskResourceRequest +from pyspark.util import _parse_memory + + +class ExecutorResourceRequest(object): + """ + .. note:: Evolving + + An Executor resource request. This is used in conjunction with the ResourceProfile to + programmatically specify the resources needed for an RDD that will be applied at the + stage level. + + This is used to specify what the resource requirements are for an Executor and how + Spark can find out specific details about those resources. Not all the parameters are + required for every resource type. Resources like GPUs are supported and have same limitations + as using the global spark configs spark.executor.resource.gpu.*. The amount, discoveryScript, + and vendor parameters for resources are all the same parameters a user would specify through the + configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}. + + For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has + to specify the resource name (gpu), the amount or number of GPUs per Executor, + the discovery script would be specified so that when the Executor starts up it can + discovery what GPU addresses are available for it to use because YARN doesn't tell + Spark that, then vendor would not be used because its specific for Kubernetes. + + See the configuration and cluster specific docs for more details. + + Use `pyspark.ExecutorResourceRequests` class as a convenience API. + + :param resourceName: Name of the resource + :param amount: Amount requesting + :param discoveryScript: Optional script used to discover the resources. This is required on some + cluster managers that don't tell Spark the addresses of the resources + allocated. The script runs on Executors startup to discover the addresses + of the resources available. + :param vendor: Vendor, required for some cluster managers + + .. versionadded:: 3.1.0 + """ + def __init__(self, resourceName, amount, discoveryScript="", vendor=""): + self._name = resourceName + self._amount = amount + self._discovery_script = discoveryScript + self._vendor = vendor + + @property + def resourceName(self): + return self._name + + @property + def amount(self): + return self._amount + + @property + def discoveryScript(self): + return self._discovery_script + + @property + def vendor(self): + return self._vendor + + +class ExecutorResourceRequests(object): + + """ + .. note:: Evolving + + A set of Executor resource requests. This is used in conjunction with the + :class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the + resources needed for an RDD that will be applied at the stage level. + + .. versionadded:: 3.1.0 + """ + _CORES = "cores" + _MEMORY = "memory" + _OVERHEAD_MEM = "memoryOverhead" + _PYSPARK_MEM = "pyspark.memory" + + def __init__(self, _jvm=None, _requests=None): + from pyspark import SparkContext + _jvm = _jvm or SparkContext._jvm + if _jvm is not None: + self._java_executor_resource_requests = \ + _jvm.org.apache.spark.resource.ExecutorResourceRequests() + if _requests is not None: + for k, v in _requests.items(): + if k == self._MEMORY: + self._java_executor_resource_requests.memory(str(v.amount)) + elif k == self._OVERHEAD_MEM: + self._java_executor_resource_requests.memoryOverhead(str(v.amount)) + elif k == self._PYSPARK_MEM: + self._java_executor_resource_requests.pysparkMemory(str(v.amount)) + elif k == self._CORES: + self._java_executor_resource_requests.cores(v.amount) + else: + self._java_executor_resource_requests.resource(v.resourceName, v.amount, + v.discoveryScript, v.vendor) + else: + self._java_executor_resource_requests = None + self._executor_resources = {} + + def memory(self, amount): + if self._java_executor_resource_requests is not None: + self._java_executor_resource_requests.memory(amount) + else: + self._executor_resources[self._MEMORY] = ExecutorResourceRequest(self._MEMORY, + _parse_memory(amount)) + return self + + def memoryOverhead(self, amount): + if self._java_executor_resource_requests is not None: + self._java_executor_resource_requests.memoryOverhead(amount) + else: + self._executor_resources[self._OVERHEAD_MEM] = \ + ExecutorResourceRequest(self._OVERHEAD_MEM, _parse_memory(amount)) + return self + + def pysparkMemory(self, amount): + if self._java_executor_resource_requests is not None: + self._java_executor_resource_requests.pysparkMemory(amount) + else: + self._executor_resources[self._PYSPARK_MEM] = \ + ExecutorResourceRequest(self._PYSPARK_MEM, _parse_memory(amount)) + return self + + def cores(self, amount): + if self._java_executor_resource_requests is not None: + self._java_executor_resource_requests.cores(amount) + else: + self._executor_resources[self._CORES] = ExecutorResourceRequest(self._CORES, amount) + return self + + def resource(self, resourceName, amount, discoveryScript="", vendor=""): + if self._java_executor_resource_requests is not None: + self._java_executor_resource_requests.resource(resourceName, amount, discoveryScript, + vendor) + else: + self._executor_resources[resourceName] = \ + ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor) + return self + + @property + def requests(self): + if self._java_executor_resource_requests is not None: + result = {} + execRes = self._java_executor_resource_requests.requestsJMap() + for k, v in execRes.items(): + result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(), + v.discoveryScript(), v.vendor()) + return result + else: + return self._executor_resources diff --git a/python/pyspark/resource/resourceprofile.py b/python/pyspark/resource/resourceprofile.py new file mode 100644 index 0000000000000..59e9ccb4b6ea0 --- /dev/null +++ b/python/pyspark/resource/resourceprofile.py @@ -0,0 +1,72 @@ + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.resource.taskrequests import TaskResourceRequest +from pyspark.resource.executorrequests import ExecutorResourceRequest + + +class ResourceProfile(object): + + """ + .. note:: Evolving + + Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile` + allows the user to specify executor and task requirements for an RDD that will get + applied during a stage. This allows the user to change the resource requirements between + stages. This is meant to be immutable so user doesn't change it after building. + + .. versionadded:: 3.1.0 + """ + + def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}): + if _java_resource_profile is not None: + self._java_resource_profile = _java_resource_profile + else: + self._java_resource_profile = None + self._executor_resource_requests = _exec_req + self._task_resource_requests = _task_req + + @property + def id(self): + if self._java_resource_profile is not None: + return self._java_resource_profile.id() + else: + raise RuntimeError("SparkContext must be created to get the id, get the id " + "after adding the ResourceProfile to an RDD") + + @property + def taskResources(self): + if self._java_resource_profile is not None: + taskRes = self._java_resource_profile.taskResourcesJMap() + result = {} + for k, v in taskRes.items(): + result[k] = TaskResourceRequest(v.resourceName(), v.amount()) + return result + else: + return self._task_resource_requests + + @property + def executorResources(self): + if self._java_resource_profile is not None: + execRes = self._java_resource_profile.executorResourcesJMap() + result = {} + for k, v in execRes.items(): + result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(), + v.discoveryScript(), v.vendor()) + return result + else: + return self._executor_resource_requests diff --git a/python/pyspark/resource/resourceprofilebuilder.py b/python/pyspark/resource/resourceprofilebuilder.py new file mode 100644 index 0000000000000..67654289d500f --- /dev/null +++ b/python/pyspark/resource/resourceprofilebuilder.py @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.resource.executorrequests import ExecutorResourceRequest,\ + ExecutorResourceRequests +from pyspark.resource.resourceprofile import ResourceProfile +from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests + + +class ResourceProfileBuilder(object): + + """ + .. note:: Evolving + + Resource profile Builder to build a resource profile to associate with an RDD. + A ResourceProfile allows the user to specify executor and task requirements for + an RDD that will get applied during a stage. This allows the user to change the + resource requirements between stages. + + .. versionadded:: 3.1.0 + """ + + def __init__(self): + from pyspark.context import SparkContext + _jvm = SparkContext._jvm + if _jvm is not None: + self._jvm = _jvm + self._java_resource_profile_builder = \ + _jvm.org.apache.spark.resource.ResourceProfileBuilder() + else: + self._jvm = None + self._java_resource_profile_builder = None + self._executor_resource_requests = {} + self._task_resource_requests = {} + + def require(self, resourceRequest): + if isinstance(resourceRequest, TaskResourceRequests): + if self._java_resource_profile_builder is not None: + if resourceRequest._java_task_resource_requests is not None: + self._java_resource_profile_builder.require( + resourceRequest._java_task_resource_requests) + else: + taskReqs = TaskResourceRequests(self._jvm, resourceRequest.requests) + self._java_resource_profile_builder.require( + taskReqs._java_task_resource_requests) + else: + self._task_resource_requests.update(resourceRequest.requests) + else: + if self._java_resource_profile_builder is not None: + if resourceRequest._java_executor_resource_requests is not None: + self._java_resource_profile_builder.require( + resourceRequest._java_executor_resource_requests) + else: + execReqs = ExecutorResourceRequests(self._jvm, resourceRequest.requests) + self._java_resource_profile_builder.require( + execReqs._java_executor_resource_requests) + else: + self._executor_resource_requests.update(resourceRequest.requests) + return self + + def clearExecutorResourceRequests(self): + if self._java_resource_profile_builder is not None: + self._java_resource_profile_builder.clearExecutorResourceRequests() + else: + self._executor_resource_requests = {} + + def clearTaskResourceRequests(self): + if self._java_resource_profile_builder is not None: + self._java_resource_profile_builder.clearTaskResourceRequests() + else: + self._task_resource_requests = {} + + @property + def taskResources(self): + if self._java_resource_profile_builder is not None: + taskRes = self._java_resource_profile_builder.taskResourcesJMap() + result = {} + for k, v in taskRes.items(): + result[k] = TaskResourceRequest(v.resourceName(), v.amount()) + return result + else: + return self._task_resource_requests + + @property + def executorResources(self): + if self._java_resource_profile_builder is not None: + result = {} + execRes = self._java_resource_profile_builder.executorResourcesJMap() + for k, v in execRes.items(): + result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(), + v.discoveryScript(), v.vendor()) + return result + else: + return self._executor_resource_requests + + @property + def build(self): + if self._java_resource_profile_builder is not None: + jresourceProfile = self._java_resource_profile_builder.build() + return ResourceProfile(_java_resource_profile=jresourceProfile) + else: + return ResourceProfile(_exec_req=self._executor_resource_requests, + _task_req=self._task_resource_requests) diff --git a/python/pyspark/resource/taskrequests.py b/python/pyspark/resource/taskrequests.py new file mode 100644 index 0000000000000..e8dca98d14b61 --- /dev/null +++ b/python/pyspark/resource/taskrequests.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +class TaskResourceRequest(object): + """ + .. note:: Evolving + + A task resource request. This is used in conjuntion with the + :class:`pyspark.resource.ResourceProfile` to programmatically specify the resources + needed for an RDD that will be applied at the stage level. The amount is specified + as a Double to allow for saying you want more then 1 task per resource. Valid values + are less than or equal to 0.5 or whole numbers. + Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API. + + :param resourceName: Name of the resource + :param amount: Amount requesting as a Double to support fractional resource requests. + Valid values are less than or equal to 0.5 or whole numbers. + + .. versionadded:: 3.1.0 + """ + def __init__(self, resourceName, amount): + self._name = resourceName + self._amount = float(amount) + + @property + def resourceName(self): + return self._name + + @property + def amount(self): + return self._amount + + +class TaskResourceRequests(object): + + """ + .. note:: Evolving + + A set of task resource requests. This is used in conjuntion with the + :class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources + needed for an RDD that will be applied at the stage level. + + .. versionadded:: 3.1.0 + """ + + _CPUS = "cpus" + + def __init__(self, _jvm=None, _requests=None): + from pyspark import SparkContext + _jvm = _jvm or SparkContext._jvm + if _jvm is not None: + self._java_task_resource_requests = \ + SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests() + if _requests is not None: + for k, v in _requests.items(): + if k == self._CPUS: + self._java_task_resource_requests.cpus(int(v.amount)) + else: + self._java_task_resource_requests.resource(v.resourceName, v.amount) + else: + self._java_task_resource_requests = None + self._task_resources = {} + + def cpus(self, amount): + if self._java_task_resource_requests is not None: + self._java_task_resource_requests.cpus(amount) + else: + self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount) + return self + + def resource(self, resourceName, amount): + if self._java_task_resource_requests is not None: + self._java_task_resource_requests.resource(resourceName, float(amount)) + else: + self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount) + return self + + @property + def requests(self): + if self._java_task_resource_requests is not None: + result = {} + taskRes = self._java_task_resource_requests.requestsJMap() + for k, v in taskRes.items(): + result[k] = TaskResourceRequest(v.resourceName(), v.amount()) + return result + else: + return self._task_resources diff --git a/python/pyspark/resource/tests/__init__.py b/python/pyspark/resource/tests/__init__.py new file mode 100644 index 0000000000000..cce3acad34a49 --- /dev/null +++ b/python/pyspark/resource/tests/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/pyspark/resource/tests/test_resources.py b/python/pyspark/resource/tests/test_resources.py new file mode 100644 index 0000000000000..9eb5a35e58df5 --- /dev/null +++ b/python/pyspark/resource/tests/test_resources.py @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import random +import unittest + +from pyspark.resource import ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder,\ + TaskResourceRequests + + +class ResourceProfileTests(unittest.TestCase): + def test_profile_before_sc(self): + rpb = ResourceProfileBuilder() + ereqs = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g") + ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com") + treqs = TaskResourceRequests().cpus(2).resource("gpu", 2) + + def assert_request_contents(exec_reqs, task_reqs): + self.assertEqual(len(exec_reqs), 5) + self.assertEqual(exec_reqs["cores"].amount, 2) + self.assertEqual(exec_reqs["memory"].amount, 6144) + self.assertEqual(exec_reqs["memoryOverhead"].amount, 1024) + self.assertEqual(exec_reqs["pyspark.memory"].amount, 2048) + self.assertEqual(exec_reqs["gpu"].amount, 2) + self.assertEqual(exec_reqs["gpu"].discoveryScript, "testGpus") + self.assertEqual(exec_reqs["gpu"].resourceName, "gpu") + self.assertEqual(exec_reqs["gpu"].vendor, "nvidia.com") + self.assertEqual(len(task_reqs), 2) + self.assertEqual(task_reqs["cpus"].amount, 2.0) + self.assertEqual(task_reqs["gpu"].amount, 2.0) + + assert_request_contents(ereqs.requests, treqs.requests) + rp = rpb.require(ereqs).require(treqs).build + assert_request_contents(rp.executorResources, rp.taskResources) + from pyspark import SparkContext, SparkConf + sc = SparkContext(conf=SparkConf()) + rdd = sc.parallelize(range(10)).withResources(rp) + return_rp = rdd.getResourceProfile() + assert_request_contents(return_rp.executorResources, return_rp.taskResources) + # intermix objects created before SparkContext init and after + rpb2 = ResourceProfileBuilder() + # use reqs created before SparkContext with Builder after + rpb2.require(ereqs) + rpb2.require(treqs) + rp2 = rpb2.build + self.assertTrue(rp2.id > 0) + rdd2 = sc.parallelize(range(10)).withResources(rp2) + return_rp2 = rdd2.getResourceProfile() + assert_request_contents(return_rp2.executorResources, return_rp2.taskResources) + ereqs2 = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g") + ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com") + treqs2 = TaskResourceRequests().cpus(2).resource("gpu", 2) + # use reqs created after SparkContext with Builder before + rpb.require(ereqs2) + rpb.require(treqs2) + rp3 = rpb.build + assert_request_contents(rp3.executorResources, rp3.taskResources) + sc.stop() + + +if __name__ == "__main__": + from pyspark.resource.tests.test_resources import * + + try: + import xmlrunner + testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py index 15cc48ae2153d..31c5a7510a165 100644 --- a/python/pyspark/tests/test_rdd.py +++ b/python/pyspark/tests/test_rdd.py @@ -26,6 +26,8 @@ from py4j.protocol import Py4JJavaError from pyspark import shuffle, RDD +from pyspark.resource import ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder,\ + TaskResourceRequests from pyspark.serializers import CloudPickleSerializer, BatchedSerializer, PickleSerializer,\ MarshalSerializer, UTF8Deserializer, NoOpSerializer from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME, QuietTest @@ -783,6 +785,34 @@ def fail_last(x): for i in range(4): self.assertEqual(i, next(it)) + def test_resourceprofile(self): + rp_builder = ResourceProfileBuilder() + ereqs = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g") + ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com") + treqs = TaskResourceRequests().cpus(2).resource("gpu", 2) + + def assert_request_contents(exec_reqs, task_reqs): + self.assertEqual(len(exec_reqs), 5) + self.assertEqual(exec_reqs["cores"].amount, 2) + self.assertEqual(exec_reqs["memory"].amount, 6144) + self.assertEqual(exec_reqs["memoryOverhead"].amount, 1024) + self.assertEqual(exec_reqs["pyspark.memory"].amount, 2048) + self.assertEqual(exec_reqs["gpu"].amount, 2) + self.assertEqual(exec_reqs["gpu"].discoveryScript, "testGpus") + self.assertEqual(exec_reqs["gpu"].resourceName, "gpu") + self.assertEqual(exec_reqs["gpu"].vendor, "nvidia.com") + self.assertEqual(len(task_reqs), 2) + self.assertEqual(task_reqs["cpus"].amount, 2.0) + self.assertEqual(task_reqs["gpu"].amount, 2.0) + + assert_request_contents(ereqs.requests, treqs.requests) + rp = rp_builder.require(ereqs).require(treqs).build + assert_request_contents(rp.executorResources, rp.taskResources) + rdd = self.sc.parallelize(range(10)).withResources(rp) + return_rp = rdd.getResourceProfile() + assert_request_contents(return_rp.executorResources, return_rp.taskResources) + rddWithoutRp = self.sc.parallelize(range(10)) + self.assertEqual(rddWithoutRp.getResourceProfile(), None) if __name__ == "__main__": import unittest diff --git a/python/pyspark/tests/test_worker.py b/python/pyspark/tests/test_worker.py index 9d7deb23da604..dba9298ee161a 100644 --- a/python/pyspark/tests/test_worker.py +++ b/python/pyspark/tests/test_worker.py @@ -29,6 +29,7 @@ from py4j.protocol import Py4JJavaError +from pyspark import SparkConf, SparkContext from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest if sys.version_info[0] >= 3: @@ -180,10 +181,14 @@ def test_reuse_worker_of_parallelize_xrange(self): not has_resource_module, "Memory limit feature in Python worker is dependent on " "Python's 'resource' module; however, not found.") -class WorkerMemoryTest(PySparkTestCase): +class WorkerMemoryTest(unittest.TestCase): + + def setUp(self): + class_name = self.__class__.__name__ + conf = SparkConf().set("spark.executor.pyspark.memory", "2g") + self.sc = SparkContext('local[4]', class_name, conf=conf) def test_memory_limit(self): - self.sc._conf.set("spark.executor.pyspark.memory", "2g") rdd = self.sc.parallelize(xrange(1), 1) def getrlimit(): @@ -197,6 +202,8 @@ def getrlimit(): self.assertEqual(soft_limit, 2 * 1024 * 1024 * 1024) self.assertEqual(hard_limit, 2 * 1024 * 1024 * 1024) + def tearDown(self): + self.sc.stop() if __name__ == "__main__": import unittest diff --git a/python/pyspark/util.py b/python/pyspark/util.py index 93137560de25e..cc614a2c0eaed 100644 --- a/python/pyspark/util.py +++ b/python/pyspark/util.py @@ -168,6 +168,21 @@ def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version): }) +def _parse_memory(s): + """ + Parse a memory string in the format supported by Java (e.g. 1g, 200m) and + return the value in MiB + + >>> _parse_memory("256m") + 256 + >>> _parse_memory("2g") + 2048 + """ + units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024} + if s[-1].lower() not in units: + raise ValueError("invalid format: " + s) + return int(float(s[:-1]) * units[s[-1].lower()]) + if __name__ == "__main__": import doctest (failure_count, test_count) = doctest.testmod() diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 7d62bf1783931..32867f914bbd5 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -19,6 +19,7 @@ Worker that receives input from Piped RDD. """ from __future__ import print_function +from __future__ import absolute_import import os import sys import time diff --git a/python/setup.py b/python/setup.py index 622e6077dc36e..e823eb9a80fc7 100755 --- a/python/setup.py +++ b/python/setup.py @@ -183,6 +183,7 @@ def _supports_symlinks(): 'pyspark.python.lib', 'pyspark.data', 'pyspark.licenses', + 'pyspark.resource', 'pyspark.examples.src.main.python'], include_package_data=True, package_dir={