Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tgravescs committed Mar 31, 2020
1 parent af602b6 commit 2b515c8
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
envVars.put("SPARK_REUSE_WORKER", "1")
}
val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_PROPERTY)).map(_.toLong)
logInfo(s"task context pyspark memory is: $memoryMb")
val workerMemoryMb = getWorkerMemoryMb(memoryMb)
if (workerMemoryMb.isDefined) {
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
import org.apache.spark.util.Utils

/**
* Resource profile to associate with an RDD. A ResourceProfile allows the user to
Expand Down
26 changes: 17 additions & 9 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,22 @@ private[spark] class DAGScheduler(
}
}

/**
* PythonRunner needs to know what the pyspark memory setting is for the profile being run.
* Pass it in the local properties of the task if it's set for the stage profile.
*/
private def addPysparkMemToProperties(stage: Stage, properties: Properties): Unit = {
val pysparkMem = if (stage.resourceProfileId == DEFAULT_RESOURCE_PROFILE_ID) {
logDebug("Using the default pyspark executor memory")
sc.conf.get(PYSPARK_EXECUTOR_MEMORY)
} else {
val rp = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId)
logDebug(s"Using profile ${stage.resourceProfileId} pyspark executor memory")
rp.getPysparkMemory
}
pysparkMem.map(m => properties.setProperty(PYSPARK_MEMORY_PROPERTY, m.toString))
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
Expand All @@ -1156,15 +1172,7 @@ private[spark] class DAGScheduler(
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
val pysparkMem = if (stage.resourceProfileId == DEFAULT_RESOURCE_PROFILE_ID) {
logDebug("Using the default pyspark executor memory")
sc.conf.get(PYSPARK_EXECUTOR_MEMORY)
} else {
val rp = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId)
logDebug(s"Using profile ${stage.resourceProfileId} pyspark executor memory")
rp.getPysparkMemory
}
pysparkMem.map(m => properties.setProperty(PYSPARK_MEMORY_PROPERTY, m.toString))
addPysparkMemToProperties(stage, properties)

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
Expand Down

0 comments on commit 2b515c8

Please sign in to comment.