Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-32511
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Aug 4, 2020
2 parents 19587e8 + 0660a05 commit 7342514
Show file tree
Hide file tree
Showing 111 changed files with 4,391 additions and 761 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()

if (!config.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) {
if (!config.get(EXECUTOR_ALLOW_SPARK_CONTEXT)) {
// In order to prevent SparkContext from being created in executors.
SparkContext.assertOnDriver()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ private[spark] object ConfigEntry {

val UNDEFINED = "<undefined>"

private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
private[spark] val knownConfigs =
new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()

def registerEntry(entry: ConfigEntry[_]): Unit = {
val existing = knownConfigs.putIfAbsent(entry.key, entry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1909,8 +1909,8 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val ALLOW_SPARK_CONTEXT_IN_EXECUTORS =
ConfigBuilder("spark.driver.allowSparkContextInExecutors")
private[spark] val EXECUTOR_ALLOW_SPARK_CONTEXT =
ConfigBuilder("spark.executor.allowSparkContext")
.doc("If set to true, SparkContext can be created in executors.")
.version("3.0.1")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CPUS_PER_TASK
import org.apache.spark.internal.config.Status._
Expand Down Expand Up @@ -868,13 +868,17 @@ private[spark] class AppStatusListener(
// check if there is a new peak value for any of the executor level memory metrics
// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed
// for the live UI.
event.executorUpdates.foreach { case (_, peakUpdates) =>
event.executorUpdates.foreach { case (key, peakUpdates) =>
liveExecutors.get(event.execId).foreach { exec =>
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(peakUpdates)) {
maybeUpdate(exec, now)
update(exec, now)
}
}

// Update stage level peak executor metrics.
updateStageLevelPeakExecutorMetrics(key._1, key._2, event.execId, peakUpdates, now)
}

// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush
// here to ensure the staleness of Spark UI doesn't last more than
// `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
Expand All @@ -885,17 +889,38 @@ private[spark] class AppStatusListener(
}
}

override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = {
override def onStageExecutorMetrics(event: SparkListenerStageExecutorMetrics): Unit = {
val now = System.nanoTime()

// check if there is a new peak value for any of the executor level memory metrics,
// while reading from the log. SparkListenerStageExecutorMetrics are only processed
// when reading logs.
liveExecutors.get(executorMetrics.execId).orElse(
deadExecutors.get(executorMetrics.execId)).foreach { exec =>
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) {
update(exec, now)
}
liveExecutors.get(event.execId).orElse(
deadExecutors.get(event.execId)).foreach { exec =>
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(event.executorMetrics)) {
update(exec, now)
}
}

// Update stage level peak executor metrics.
updateStageLevelPeakExecutorMetrics(
event.stageId, event.stageAttemptId, event.execId, event.executorMetrics, now)
}

private def updateStageLevelPeakExecutorMetrics(
stageId: Int,
stageAttemptId: Int,
executorId: String,
executorMetrics: ExecutorMetrics,
now: Long): Unit = {
Option(liveStages.get((stageId, stageAttemptId))).foreach { stage =>
if (stage.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics)) {
update(stage, now)
}
val esummary = stage.executorSummary(executorId)
if (esummary.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics)) {
update(esummary, now)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,8 @@ private[spark] class AppStatusStore(
tasks = Some(tasks),
executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
killedTasksSummary = stage.killedTasksSummary,
resourceProfileId = stage.resourceProfileId)
resourceProfileId = stage.resourceProfileId,
peakExecutorMetrics = stage.peakExecutorMetrics)
}

def rdd(rddId: Int): v1.RDDStorageInfo = {
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ private class LiveExecutorStageSummary(

var metrics = createMetrics(default = 0L)

val peakExecutorMetrics = new ExecutorMetrics()

override protected def doUpdate(): Any = {
val info = new v1.ExecutorStageSummary(
taskTime,
Expand All @@ -381,7 +383,8 @@ private class LiveExecutorStageSummary(
metrics.shuffleWriteMetrics.recordsWritten,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
isBlacklisted)
isBlacklisted,
Some(peakExecutorMetrics).filter(_.isSet))
new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
}

Expand Down Expand Up @@ -420,6 +423,8 @@ private class LiveStage extends LiveEntity {

var blackListedExecutors = new HashSet[String]()

val peakExecutorMetrics = new ExecutorMetrics()

// Used for cleanup of tasks after they reach the configured limit. Not written to the store.
@volatile var cleaning = false
var savedTasks = new AtomicInteger(0)
Expand Down Expand Up @@ -484,7 +489,8 @@ private class LiveStage extends LiveEntity {
tasks = None,
executorSummary = None,
killedTasksSummary = killedSummary,
resourceProfileId = info.resourceProfileId)
resourceProfileId = info.resourceProfileId,
Some(peakExecutorMetrics).filter(_.isSet))
}

override protected def doUpdate(): Any = {
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ class ExecutorStageSummary private[spark](
val shuffleWriteRecords : Long,
val memoryBytesSpilled : Long,
val diskBytesSpilled : Long,
val isBlacklistedForStage: Boolean)
val isBlacklistedForStage: Boolean,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics])

class ExecutorSummary private[spark](
val id: String,
Expand Down Expand Up @@ -259,7 +262,10 @@ class StageData private[spark](
val tasks: Option[Map[Long, TaskData]],
val executorSummary: Option[Map[String, ExecutorStageSummary]],
val killedTasksSummary: Map[String, Int],
val resourceProfileId: Int)
val resourceProfileId: Int,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakExecutorMetrics: Option[ExecutorMetrics])

class TaskData private[spark](
val taskId: Long,
Expand Down
29 changes: 23 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ui

import java.net.{URI, URL}
import java.net.{URI, URL, URLDecoder}
import java.util.EnumSet
import javax.servlet.DispatcherType
import javax.servlet.http._
Expand Down Expand Up @@ -377,8 +377,7 @@ private[spark] object JettyUtils extends Logging {
if (baseRequest.isSecure) {
return
}
val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort,
baseRequest.getRequestURI, baseRequest.getQueryString)
val httpsURI = createRedirectURI(scheme, securePort, baseRequest)
response.setContentLength(0)
response.sendRedirect(response.encodeRedirectURL(httpsURI))
baseRequest.setHandled(true)
Expand Down Expand Up @@ -440,16 +439,34 @@ private[spark] object JettyUtils extends Logging {
handler.addFilter(holder, "/*", EnumSet.allOf(classOf[DispatcherType]))
}

private def decodeURL(url: String, encoding: String): String = {
if (url == null) {
null
} else {
URLDecoder.decode(url, encoding)
}
}

// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
private def createRedirectURI(
scheme: String, server: String, port: Int, path: String, query: String) = {
private def createRedirectURI(scheme: String, port: Int, request: Request): String = {
val server = request.getServerName
val redirectServer = if (server.contains(":") && !server.startsWith("[")) {
s"[${server}]"
} else {
server
}
val authority = s"$redirectServer:$port"
new URI(scheme, authority, path, query, null).toString
val queryEncoding = if (request.getQueryEncoding != null) {
request.getQueryEncoding
} else {
// By default decoding the URI as "UTF-8" should be enough for SparkUI
"UTF-8"
}
// The request URL can be raw or encoded here. To avoid the request URL being
// encoded twice, let's decode it here.
val requestURI = decodeURL(request.getRequestURI, queryEncoding)
val queryString = decodeURL(request.getQueryString, queryEncoding)
new URI(scheme, authority, requestURI, queryString, null).toString
}

def toVirtualHosts(connectors: String*): Array[String] = connectors.map("@" + _).toArray
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
tasks = None,
executorSummary = None,
killedTasksSummary = Map(),
ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID,
peakExecutorMetrics = None)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand Down Expand Up @@ -28,19 +43,4 @@
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1562101355974
} ]
}, {
"id" : "application_1553914137147_0018",
"name" : "LargeBlocks",
"attempts" : [ {
"startTime" : "2019-04-08T20:39:44.286GMT",
"endTime" : "2019-04-08T20:40:46.454GMT",
"lastUpdated" : "",
"duration" : 62168,
"sparkUser" : "systest",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"startTimeEpoch" : 1554755984286,
"endTimeEpoch" : 1554756046454,
"lastUpdatedEpoch" : 0
} ]
} ]
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand All @@ -14,22 +29,20 @@
"lastUpdatedEpoch" : 0
} ]
}, {
"id": "application_1555004656427_0144",
"name": "Spark shell",
"attempts": [
{
"startTime": "2019-07-02T21:02:17.180GMT",
"endTime": "2019-07-02T21:02:35.974GMT",
"lastUpdated": "",
"duration": 18794,
"sparkUser": "tgraves",
"completed": true,
"appSparkVersion": "3.0.0-SNAPSHOT",
"startTimeEpoch": 1562101337180,
"lastUpdatedEpoch": 0,
"endTimeEpoch": 1562101355974
}
]
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2019-07-02T21:02:17.180GMT",
"endTime" : "2019-07-02T21:02:35.974GMT",
"lastUpdated" : "",
"duration" : 18794,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"endTimeEpoch" : 1562101355974,
"startTimeEpoch" : 1562101337180,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1553914137147_0018",
"name" : "LargeBlocks",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand Down
Loading

0 comments on commit 7342514

Please sign in to comment.