Skip to content

Commit

Permalink
[SPARK-29641][PYTHON][CORE] Stage Level Sched: Add python api's and t…
Browse files Browse the repository at this point in the history
…ests

### What changes were proposed in this pull request?

As part of the Stage level scheduling features, add the Python api's to set resource profiles.
This also adds the functionality to properly apply the pyspark memory configuration when specified in the ResourceProfile. The pyspark memory configuration is being passed in the task local properties. This was an easy way to get it to the PythonRunner that needs it. I modeled this off how the barrier task scheduling is passing the addresses. As part of this I added in the JavaRDD api's because those are needed by python.

### Why are the changes needed?

python api for this feature

### Does this PR introduce any user-facing change?

Yes adds the java and python apis for user to specify a ResourceProfile to use stage level scheduling.

### How was this patch tested?

unit tests and manually tested on yarn. Tests also run to verify it errors properly on standalone and local mode where its not yet supported.

Closes #28085 from tgravescs/SPARK-29641-pr-base.

Lead-authored-by: Thomas Graves <tgraves@nvidia.com>
Co-authored-by: Thomas Graves <tgraves@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
2 people authored and HyukjinKwon committed Apr 23, 2020
1 parent c619990 commit 95aec09
Show file tree
Hide file tree
Showing 22 changed files with 791 additions and 27 deletions.
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

Expand All @@ -49,6 +50,20 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
*/
def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel))

/**
* Specify a ResourceProfile to use when calculating this RDD. This is only supported on
* certain cluster managers and currently requires dynamic allocation to be enabled.
* It will result in new executors with the resources specified being acquired to
* calculate the RDD.
*/
def withResources(rp: ResourceProfile): JavaRDD[T] = wrapRDD(rdd.withResources(rp))

/**
* Get the ResourceProfile specified with this RDD or None if it wasn't specified.
* @return the user specified ResourceProfile or null if none was specified
*/
def getResourceProfile(): ResourceProfile = rdd.getResourceProfile()

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
* This method blocks until all blocks are deleted.
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES}
import org.apache.spark.internal.config.Python._
import org.apache.spark.resource.ResourceProfile.{EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util._

Expand Down Expand Up @@ -85,9 +86,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private val conf = SparkEnv.get.conf
protected val bufferSize: Int = conf.get(BUFFER_SIZE)
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
// each python worker gets an equal part of the allocation. the worker pool will grow to the
// number of concurrent tasks, which is determined by the number of cores in this executor.
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / conf.get(EXECUTOR_CORES))

// All the Python functions should have the same exec, version and envvars.
protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars
Expand All @@ -106,26 +104,41 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
// Authentication helper used when serving method calls via socket from Python side.
private lazy val authHelper = new SocketAuthHelper(conf)

// each python worker gets an equal part of the allocation. the worker pool will grow to the
// number of concurrent tasks, which is determined by the number of cores in this executor.
private def getWorkerMemoryMb(mem: Option[Long], cores: Int): Option[Long] = {
mem.map(_ / cores)
}

def compute(
inputIterator: Iterator[IN],
partitionIndex: Int,
context: TaskContext): Iterator[OUT] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get

// Get the executor cores and pyspark memory, they are passed via the local properties when
// the user specified them in a ResourceProfile.
val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores
if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
// SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor
// this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool
// see https://github.com/numpy/numpy/issues/10455
conf.getOption("spark.executor.cores").foreach(envVars.put("OMP_NUM_THREADS", _))
execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
}
envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread
if (reuseWorker) {
envVars.put("SPARK_REUSE_WORKER", "1")
}
if (memoryMb.isDefined) {
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString)
// SPARK-30299 this could be wrong with standalone mode when executor
// cores might not be correct because it defaults to all cores on the box.
val execCores = execCoresProp.map(_.toInt).getOrElse(conf.get(EXECUTOR_CORES))
val workerMemoryMb = getWorkerMemoryMb(memoryMb, execCores)
if (workerMemoryMb.isDefined) {
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString)
}
envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.resource

import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
Expand All @@ -38,6 +39,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {

def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap

def requestsJMap: JMap[String, ExecutorResourceRequest] = requests.asJava

/**
* Specify heap memory. The value specified will be converted to MiB.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
import org.apache.spark.util.Utils

/**
* Resource profile to associate with an RDD. A ResourceProfile allows the user to
Expand Down Expand Up @@ -76,6 +75,10 @@ class ResourceProfile(
taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
}

private[spark] def getPySparkMemory: Option[Long] = {
executorResources.get(ResourceProfile.PYSPARK_MEM).map(_.amount.toLong)
}

/*
* This function takes into account fractional amounts for the task resource requirement.
* Spark only supports fractional amounts < 1 to basically allow for multiple tasks
Expand Down Expand Up @@ -330,4 +333,7 @@ object ResourceProfile extends Logging {
private[spark] def getTaskCpusOrDefaultForProfile(rp: ResourceProfile, conf: SparkConf): Int = {
rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
}

private[spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = "resource.pyspark.memory"
private[spark] val EXECUTOR_CORES_LOCAL_PROPERTY = "resource.executor.cores"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.resource

import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
Expand All @@ -37,6 +38,8 @@ private[spark] class TaskResourceRequests() extends Serializable {

def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap

def requestsJMap: JMap[String, TaskResourceRequest] = requests.asJava

/**
* Specify number of cpus per Task.
*
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
Expand Down Expand Up @@ -1175,6 +1176,27 @@ private[spark] class DAGScheduler(
}
}

/**
* `PythonRunner` needs to know what the pyspark memory and cores settings are for the profile
* being run. Pass them in the local properties of the task if it's set for the stage profile.
*/
private def addPySparkConfigsToProperties(stage: Stage, properties: Properties): Unit = {
val rp = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId)
val pysparkMem = rp.getPySparkMemory
// use the getOption on EXECUTOR_CORES.key instead of using the EXECUTOR_CORES config reader
// because the default for this config isn't correct for standalone mode. Here we want
// to know if it was explicitly set or not. The default profile always has it set to either
// what user specified or default so special case it here.
val execCores = if (rp.id == DEFAULT_RESOURCE_PROFILE_ID) {
sc.conf.getOption(config.EXECUTOR_CORES.key)
} else {
val profCores = rp.getExecutorCores.map(_.toString)
if (profCores.isEmpty) sc.conf.getOption(config.EXECUTOR_CORES.key) else profCores
}
pysparkMem.map(mem => properties.setProperty(PYSPARK_MEMORY_LOCAL_PROPERTY, mem.toString))
execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
Expand All @@ -1194,6 +1216,7 @@ private[spark] class DAGScheduler(
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
addPySparkConfigsToProperties(stage, properties)

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/java/test/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@
import org.apache.spark.partial.BoundedDouble;
import org.apache.spark.partial.PartialResult;
import org.apache.spark.rdd.RDD;
import org.apache.spark.resource.ExecutorResourceRequests;
import org.apache.spark.resource.ResourceProfile;
import org.apache.spark.resource.ResourceProfileBuilder;
import org.apache.spark.resource.TaskResourceRequests;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.LongAccumulator;
Expand Down Expand Up @@ -897,6 +901,16 @@ public void persist() {
assertEquals(1, rdd.first().intValue());
}

@Test
public void withResources() {
ExecutorResourceRequests ereqs = new ExecutorResourceRequests().cores(4);
TaskResourceRequests treqs = new TaskResourceRequests().cpus(1);
ResourceProfile rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build();
JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
in1.withResources(rp1);
assertEquals(rp1, in1.getResourceProfile());
}

@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
Expand Down
15 changes: 15 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,21 @@ def __hash__(self):
)


pyspark_resource = Module(
name="pyspark-resource",
dependencies=[
pyspark_core
],
source_file_regexes=[
"python/pyspark/resource"
],
python_test_goals=[
# unittests
"pyspark.resource.tests.test_resources",
]
)


pyspark_streaming = Module(
name="pyspark-streaming",
dependencies=[
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def killChild():
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.resource.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
Expand Down
66 changes: 48 additions & 18 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
from pyspark.resource.executorrequests import ExecutorResourceRequests
from pyspark.resource.resourceprofile import ResourceProfile
from pyspark.resource.taskrequests import TaskResourceRequests
from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, ExternalMerger, \
get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync
from pyspark.util import fail_on_stopiteration
from pyspark.util import fail_on_stopiteration, _parse_memory


__all__ = ["RDD"]
Expand Down Expand Up @@ -125,22 +128,6 @@ def __new__(cls, mean, confidence, low, high):
return obj


def _parse_memory(s):
"""
Parse a memory string in the format supported by Java (e.g. 1g, 200m) and
return the value in MiB
>>> _parse_memory("256m")
256
>>> _parse_memory("2g")
2048
"""
units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024}
if s[-1].lower() not in units:
raise ValueError("invalid format: " + s)
return int(float(s[:-1]) * units[s[-1].lower()])


def _create_local_socket(sock_info):
"""
Create a local socket that can be used to load deserialized data from the JVM
Expand Down Expand Up @@ -256,6 +243,7 @@ def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSeri
self._jrdd = jrdd
self.is_cached = False
self.is_checkpointed = False
self.has_resource_profile = False
self.ctx = ctx
self._jrdd_deserializer = jrdd_deserializer
self._id = jrdd.id()
Expand Down Expand Up @@ -2483,6 +2471,47 @@ def _is_barrier(self):
"""
return self._jrdd.rdd().isBarrier()

def withResources(self, profile):
"""
.. note:: Experimental
Specify a :class:`pyspark.resource.ResourceProfile` to use when calculating this RDD.
This is only supported on certain cluster managers and currently requires dynamic
allocation to be enabled. It will result in new executors with the resources specified
being acquired to calculate the RDD.
.. versionadded:: 3.1.0
"""
self.has_resource_profile = True
if profile._java_resource_profile is not None:
jrp = profile._java_resource_profile
else:
builder = self.ctx._jvm.org.apache.spark.resource.ResourceProfileBuilder()
ereqs = ExecutorResourceRequests(self.ctx._jvm, profile._executor_resource_requests)
treqs = TaskResourceRequests(self.ctx._jvm, profile._task_resource_requests)
builder.require(ereqs._java_executor_resource_requests)
builder.require(treqs._java_task_resource_requests)
jrp = builder.build()

self._jrdd.withResources(jrp)
return self

def getResourceProfile(self):
"""
.. note:: Experimental
Get the :class:`pyspark.resource.ResourceProfile` specified with this RDD or None
if it wasn't specified.
:return: the user specified ResourceProfile or None if none were specified
.. versionadded:: 3.1.0
"""
rp = self._jrdd.getResourceProfile()
if rp is not None:
return ResourceProfile(_java_resource_profile=rp)
else:
return None


def _prepare_for_python_RDD(sc, command):
# the serialized command will be compressed by broadcast
Expand Down Expand Up @@ -2587,6 +2616,7 @@ def pipeline_func(split, iterator):
self._prev_jrdd = prev._prev_jrdd # maintain the pipeline
self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
self.is_cached = False
self.has_resource_profile = False
self.is_checkpointed = False
self.ctx = prev.ctx
self.prev = prev
Expand Down Expand Up @@ -2629,7 +2659,7 @@ def id(self):
return self._id

def _is_pipelinable(self):
return not (self.is_cached or self.is_checkpointed)
return not (self.is_cached or self.is_checkpointed or self.has_resource_profile)

def _is_barrier(self):
return self.is_barrier
Expand Down
29 changes: 29 additions & 0 deletions python/pyspark/resource/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
APIs to let users manipulate resource requirements.
"""
from pyspark.resource.executorrequests import ExecutorResourceRequest, ExecutorResourceRequests
from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
from pyspark.resource.resourceprofilebuilder import ResourceProfileBuilder
from pyspark.resource.resourceprofile import ResourceProfile

__all__ = [
"TaskResourceRequest", "TaskResourceRequests", "ExecutorResourceRequest",
"ExecutorResourceRequests", "ResourceProfile", "ResourceProfileBuilder",
]
Loading

0 comments on commit 95aec09

Please sign in to comment.