Skip to content

Commit

Permalink
Squash of adding initial autotuning to the stage skew analyzer now we…
Browse files Browse the repository at this point in the history
… try and suggest a different number of partitions. This is the same general code block we could try and use to \'right size\' the executor memory allocations too.
  • Loading branch information
holdenk committed May 9, 2024
1 parent adb51a9 commit 771e29e
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 61 deletions.
3 changes: 2 additions & 1 deletion src/main/scala/com/qubole/sparklens/QuboleJobListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
if (!stageMap.get(stageSubmitted.stageInfo.stageId).isDefined) {
val isSql = stageSubmitted.stageInfo.details.contains("org.apache.spark.sql")
val stageTimeSpan = new StageTimeSpan(stageSubmitted.stageInfo.stageId,
stageSubmitted.stageInfo.numTasks)
stageSubmitted.stageInfo.numTasks, isSql)
stageTimeSpan.setParentStageIDs(stageSubmitted.stageInfo.parentIds)
if (stageSubmitted.stageInfo.submissionTime.isDefined) {
stageTimeSpan.setStartTime(stageSubmitted.stageInfo.submissionTime.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class QuboleNotebookListener(sparkConf: SparkConf) extends QuboleJobListener(spa
* @param applicationEnd
*/
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
stageMap.map(x => x._2).foreach(x => x.tempTaskTimes.clear())
appInfo.endTime = applicationEnd.time
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ class StageSkewAnalyzer extends AppAnalyzer {
out.toString()
}

override def analyzeAndSuggest(appContext: AppContext, startTime: Long, endTime: Long):
(String, Map[String, String]) = {
// Only make suggestions on a "full" window
if (appContext.appInfo.startTime == startTime &&
appContext.appInfo.endTime == endTime) {
(analyze(appContext, startTime, endTime), computeSuggestions(appContext))
} else {
(analyze(appContext, startTime, endTime), Map.empty[String, String])
}
}


def bytesToString(size: Long): String = {
val TB = 1L << 40
Expand All @@ -55,7 +66,70 @@ class StageSkewAnalyzer extends AppAnalyzer {
"%.1f %s".formatLocal(Locale.US, value, unit)
}

def computeSuggestions(ac: AppContext): Map[String, String] = {
val conf = ac.initialSparkProperties.getOrElse {
Map.empty[String, String]
}
var suggested = new mutable.HashMap[String, String]
val shufflePartitions = conf.getOrElse("spark.sql.shuffle.partitions", "200").toInt
val aqeCoalesce = conf.getOrElse("spark.sql.adaptive.coalescePartitions.enabled", "true")
// In theory if the compute time for stages is generally "large" & skew is low
// we can suggest increasing the parallelism especially if it's bellow max execs*
// For now we want to filter for stages which are
// SQL shuffle reads (where the shuffle.partitions config is the one we change)
val newScaleFactor = ac.stageMap.values.map (
v => {
val numTasks = v.taskExecutionTimes.length
val minTime = v.taskExecutionTimes.min
val maxTime = v.taskExecutionTimes.max
// If this is less than 10% diff between min and max (e.g. limited skew) & it's "slow"
// consider increasing the number of partitions.
// We also check that numTasks is ~= shuffle partitions
// otherwise it's probably being configured through AQE target size / coalesce.
if (minTime - maxTime < 0.1 * maxTime &&
v.sqlTask &&
Math.abs(numTasks - shufflePartitions) < 10) {
// Try and figure out how many tasks to add to reach ~10 minutes
Math.max(Math.min((minTime / 60000), 4.0), 1.0)
} else {
1.0
}
}
).max
if (newScaleFactor > 1.0) {
suggested += (("spark.sql.shuffle.partitions",
(newScaleFactor * shufflePartitions).toInt.toString))
}

// Future: We might want to suggest increasing both max execs & parallelism if they're equal.

// We also may want to suggest turning of coalesce in AQE if we see "slow" stages
// with small data.
// TODO: Verify the AQE ran on that particular stage, right now we assume if we've got
// a shuffle read and num tasks is less than the default shuffle partitions then
// it's AQEs doing.
val problamaticCoalesces = ac.stageMap.values.find (
v => {
val minTaskLength = v.taskExecutionTimes.min
val numTasks = v.taskExecutionTimes.length
if (v.sqlTask && v.shuffleRead && numTasks < shufflePartitions &&
minTaskLength > 60000) {
true
} else {
false
}
}
)
if (! problamaticCoalesces.isEmpty) {
suggested += (("spark.sql.adaptive.coalescePartitions.enabled", "false"))
}
suggested.toMap
}

def computePerStageEfficiencyStatistics(ac: AppContext, out: mutable.StringBuilder): Unit = {
val conf = ac.initialSparkProperties.getOrElse {
out.println("WARNING: No config found using empty config.")
}


val totalTasks = ac.stageMap.map(x => x._2.taskExecutionTimes.length).sum
Expand Down
76 changes: 18 additions & 58 deletions src/main/scala/com/qubole/sparklens/timespan/StageTimeSpan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ import scala.collection.mutable
This keeps track of data per stage
*/

class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan {
class StageTimeSpan(val stageID: Int, numberOfTasks: Long, val sqlTask: Boolean) extends TimeSpan {
var stageMetrics = new AggregateMetrics()
var tempTaskTimes = new mutable.ListBuffer[( Long, Long, Long)]
// We actually want to keep the start times so we can suggest increasing
// max execs if we see very different start times since that can
// indicate a stall.
var taskTimes = new mutable.ListBuffer[( Long, Long, Long)]
var minTaskLaunchTime = Long.MaxValue
var maxTaskFinishTime = 0L
var parentStageIDs:Seq[Int] = null
Expand All @@ -40,6 +43,9 @@ class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan {
var taskExecutionTimes = Array.emptyIntArray
var taskPeakMemoryUsage = Array.emptyLongArray

// Is there any shuffle read happening
var shuffleRead = false

def updateAggregateTaskMetrics (taskMetrics: TaskMetrics, taskInfo: TaskInfo): Unit = {
stageMetrics.update(taskMetrics, taskInfo)
}
Expand All @@ -49,14 +55,18 @@ class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan {
}

def updateTasks(taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = {
if (taskInfo != null && taskMetrics != null) {
tempTaskTimes += ((taskInfo.taskId, taskMetrics.executorRunTime, taskMetrics.peakExecutionMemory))
// Ignore speculative tasks
if (taskInfo != null && taskMetrics != null && !taskInfo.speculative) {
taskTimes += ((taskInfo.taskId, taskMetrics.executorRunTime, taskMetrics.peakExecutionMemory))
if (taskInfo.launchTime < minTaskLaunchTime) {
minTaskLaunchTime = taskInfo.launchTime
}
if (taskInfo.finishTime > maxTaskFinishTime) {
maxTaskFinishTime = taskInfo.finishTime
}
if (taskMetrics.shuffleReadMetrics != null) {
shuffleRead = true
}
}
}

Expand All @@ -65,33 +75,18 @@ class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan {
setStartTime(minTaskLaunchTime)
setEndTime(maxTaskFinishTime)

taskExecutionTimes = new Array[Int](tempTaskTimes.size)
taskExecutionTimes = new Array[Int](taskTimes.size)

var currentIndex = 0
tempTaskTimes.sortWith(( left, right) => left._1 < right._1)
taskTimes.sortWith(( left, right) => left._1 < right._1)
.foreach( x => {
taskExecutionTimes( currentIndex) = x._2.toInt
currentIndex += 1
})

val countPeakMemoryUsage = {
if (tempTaskTimes.size > 64) {
64
}else {
tempTaskTimes.size
}
}

taskPeakMemoryUsage = tempTaskTimes
taskPeakMemoryUsage = taskTimes
.map( x => x._3)
.sortWith( (a, b) => a > b)
.take(countPeakMemoryUsage).toArray

/*
Clean the tempTaskTimes. We don't want to keep all this objects hanging around for
long time
*/
tempTaskTimes.clear()
.sortWith( (a, b) => a > b).toArray
}

override def getMap(): Map[String, _ <: Any] = {
Expand All @@ -109,38 +104,3 @@ class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan {
) ++ super.getStartEndTime()
}
}

object StageTimeSpan {

def getTimeSpan(json: Map[String, JValue]): mutable.HashMap[Int, StageTimeSpan] = {
implicit val formats = DefaultFormats

val map = new mutable.HashMap[Int, StageTimeSpan]

json.keys.map(key => {
val value = json.get(key).get
val timeSpan = new StageTimeSpan(
(value \ "stageID").extract[Int],
(value \ "numberOfTasks").extract[Long]
)
timeSpan.stageMetrics = AggregateMetrics.getAggregateMetrics((value \ "stageMetrics")
.extract[JValue])
timeSpan.minTaskLaunchTime = (value \ "minTaskLaunchTime").extract[Long]
timeSpan.maxTaskFinishTime = (value \ "maxTaskFinishTime").extract[Long]


timeSpan.parentStageIDs = Json4sWrapper.parse((value \ "parentStageIDs").extract[String]).extract[List[Int]]
timeSpan.taskExecutionTimes = Json4sWrapper.parse((value \ "taskExecutionTimes").extract[String])
.extract[List[Int]].toArray

timeSpan.taskPeakMemoryUsage = Json4sWrapper.parse((value \ "taskPeakMemoryUsage").extract[String])
.extract[List[Long]].toArray

timeSpan.addStartEnd(value)

map.put(key.toInt, timeSpan)
})
map
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qubole.sparklens.analyzer

import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo}
import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan}
import com.qubole.sparklens.helper.JobOverlapHelper

import org.scalatest.funsuite.AnyFunSuite

import org.apache.spark.SparkConf

import scala.collection.mutable

class StageSkewAnalyzerSuite extends AnyFunSuite {

val startTime = 0
val endTime = 60000000000L
// Make a non-SQL task
val stage1 = new StageTimeSpan(1, 1, false)
// SQL tasks
// stage 2 should impact the target number of partitions
val stage2 = new StageTimeSpan(2, 2, true)
// stage 3 should have us turn off AQE if present
val stage3 = new StageTimeSpan(3, 1, true)
stage1.shuffleRead = true
stage1.taskExecutionTimes = Array[Int](6000000)
stage2.shuffleRead = true
stage2.taskExecutionTimes = 0.to(200).map(x => 60000 * 2).toArray
stage3.shuffleRead = true
// One really long task
stage3.taskExecutionTimes = Array[Int](60000000)

def createDummyAppContext(stageTimeSpans: mutable.HashMap[Int, StageTimeSpan]): AppContext = {

val jobMap = new mutable.HashMap[Long, JobTimeSpan]

val jobSQLExecIDMap = new mutable.HashMap[Long, Long]

val execStartTimes = new mutable.HashMap[String, ExecutorTimeSpan]()

val appInfo = new ApplicationInfo()
appInfo.startTime = startTime
appInfo.endTime = endTime


val conf = new SparkConf()

new AppContext(appInfo,
new AggregateMetrics(),
mutable.HashMap[String, HostTimeSpan](),
mutable.HashMap[String, ExecutorTimeSpan](),
jobMap,
jobSQLExecIDMap,
stageTimeSpans,
mutable.HashMap[Int, Long](),
Some(conf.getAll.toMap))
}

test("Change number of partitions when not skewed but long") {
val stageTimeSpans = new mutable.HashMap[Int, StageTimeSpan]
stageTimeSpans(1) = stage1
stageTimeSpans(2) = stage2
val ac = createDummyAppContext(stageTimeSpans)
val sska = new StageSkewAnalyzer()
val suggestions = sska.computeSuggestions(ac)
assert(suggestions.get("spark.sql.adaptive.coalescePartitions.enabled") == None,
"Leave AQE on if we 'just' have long partitions")
assert(suggestions.get("spark.sql.shuffle.partitions") == Some("400"),
"We aim for lots of partitions")
}

test("Turn off AQE if stage 3 is present") {
val stageTimeSpans = new mutable.HashMap[Int, StageTimeSpan]
stageTimeSpans(1) = stage1
stageTimeSpans(3) = stage3
val ac = createDummyAppContext(stageTimeSpans)
val sska = new StageSkewAnalyzer()
val suggestions = sska.computeSuggestions(ac)
assert(suggestions.get("spark.sql.adaptive.coalescePartitions.enabled") == Some("false"),
"Turn of AQE when bad coalesce occurs")
}

test("stage 1 should do nothing") {
val stageTimeSpans = new mutable.HashMap[Int, StageTimeSpan]
stageTimeSpans(1) = stage1
val ac = createDummyAppContext(stageTimeSpans)
val sska = new StageSkewAnalyzer()
val suggestions = sska.computeSuggestions(ac)
assert(suggestions == Map.empty[String, String], "Don't suggest on non-SQL stages")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PQParallelStageSchedulerSuite extends AnyFunSuite {


def createStageTimeSpan(stageID: Int, taskCount: Int, minTaskLaunchTime: Long, maxTaskFinishTime: Long, parentStages: Seq[Int]): StageTimeSpan = {
val stageTimeSpan = new StageTimeSpan(stageID, taskCount)
val stageTimeSpan = new StageTimeSpan(stageID, taskCount, false)
stageTimeSpan.minTaskLaunchTime = minTaskLaunchTime
stageTimeSpan.maxTaskFinishTime = maxTaskFinishTime

Expand Down

0 comments on commit 771e29e

Please sign in to comment.