Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
Browse files Browse the repository at this point in the history
…s-pushdown

# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
  • Loading branch information
MaxGekk committed May 29, 2020
2 parents 4c37c9a + 91148f4 commit 8bfd599
Show file tree
Hide file tree
Showing 133 changed files with 1,511 additions and 783 deletions.
23 changes: 5 additions & 18 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Expand Up @@ -214,7 +214,6 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
val jobPage = Option(request.getParameter(jobTag + ".page")).map(_.toInt).getOrElse(1)
val currentTime = System.currentTimeMillis()

try {
new JobPagedTable(
Expand All @@ -226,7 +225,6 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
UIUtils.prependBaseUri(request, parent.basePath),
"jobs", // subPath
killEnabled,
currentTime,
jobIdTitle
).table(jobPage)
} catch {
Expand Down Expand Up @@ -399,7 +397,6 @@ private[ui] class JobDataSource(
store: AppStatusStore,
jobs: Seq[v1.JobData],
basePath: String,
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
Expand All @@ -410,15 +407,9 @@ private[ui] class JobDataSource(
// so that we can avoid creating duplicate contents during sorting the data
private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))

private var _slicedJobIds: Set[Int] = null

override def dataSize: Int = data.size

override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = {
val r = data.slice(from, to)
_slicedJobIds = r.map(_.jobData.jobId).toSet
r
}
override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = data.slice(from, to)

private def jobRow(jobData: v1.JobData): JobTableRowData = {
val duration: Option[Long] = JobDataUtil.getDuration(jobData)
Expand Down Expand Up @@ -479,17 +470,17 @@ private[ui] class JobPagedTable(
basePath: String,
subPath: String,
killEnabled: Boolean,
currentTime: Long,
jobIdTitle: String
) extends PagedTable[JobTableRowData] {

private val (sortColumn, desc, pageSize) = getTableParameters(request, jobTag, jobIdTitle)
private val parameterPath = basePath + s"/$subPath/?" + getParameterOtherTable(request, jobTag)
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

override def tableId: String = jobTag + "-table"

override def tableCssClass: String =
"table table-bordered table-sm table-striped " +
"table-head-clickable table-cell-width-limited"
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"

override def pageSizeFormField: String = jobTag + ".pageSize"

Expand All @@ -499,13 +490,11 @@ private[ui] class JobPagedTable(
store,
data,
basePath,
currentTime,
pageSize,
sortColumn,
desc)

override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$jobTag.sort=$encodedSortColumn" +
Expand All @@ -514,10 +503,8 @@ private[ui] class JobPagedTable(
s"#$tableHeaderId"
}

override def goButtonFormPath: String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
override def goButtonFormPath: String =
s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc#$tableHeaderId"
}

override def headers: Seq[Node] = {
// Information for each header: title, sortable, tooltip
Expand Down
14 changes: 3 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Expand Up @@ -212,7 +212,6 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
stageData,
UIUtils.prependBaseUri(request, parent.basePath) +
s"/stages/stage/?id=${stageId}&attempt=${stageAttemptId}",
currentTime,
pageSize = taskPageSize,
sortColumn = taskSortColumn,
desc = taskSortDesc,
Expand Down Expand Up @@ -452,7 +451,6 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We

private[ui] class TaskDataSource(
stage: StageData,
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean,
Expand All @@ -474,8 +472,6 @@ private[ui] class TaskDataSource(
_tasksToShow
}

def tasks: Seq[TaskData] = _tasksToShow

def executorLogs(id: String): Map[String, String] = {
executorIdToLogs.getOrElseUpdate(id,
store.asOption(store.executorSummary(id)).map(_.executorLogs).getOrElse(Map.empty))
Expand All @@ -486,14 +482,15 @@ private[ui] class TaskDataSource(
private[ui] class TaskPagedTable(
stage: StageData,
basePath: String,
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean,
store: AppStatusStore) extends PagedTable[TaskData] {

import ApiHelper._

private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

override def tableId: String = "task-table"

override def tableCssClass: String =
Expand All @@ -505,25 +502,20 @@ private[ui] class TaskPagedTable(

override val dataSource: TaskDataSource = new TaskDataSource(
stage,
currentTime,
pageSize,
sortColumn,
desc,
store)

override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
basePath +
s"&$pageNumberFormField=$page" +
s"&task.sort=$encodedSortColumn" +
s"&task.desc=$desc" +
s"&$pageSizeFormField=$pageSize"
}

override def goButtonFormPath: String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
s"$basePath&task.sort=$encodedSortColumn&task.desc=$desc"
}
override def goButtonFormPath: String = s"$basePath&task.sort=$encodedSortColumn&task.desc=$desc"

def headers: Seq[Node] = {
import ApiHelper._
Expand Down
21 changes: 6 additions & 15 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Expand Up @@ -116,16 +116,17 @@ private[ui] class StagePagedTable(
override def tableId: String = stageTag + "-table"

override def tableCssClass: String =
"table table-bordered table-sm table-striped " +
"table-head-clickable table-cell-width-limited"
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"

override def pageSizeFormField: String = stageTag + ".pageSize"

override def pageNumberFormField: String = stageTag + ".page"

private val (sortColumn, desc, pageSize) = getTableParameters(request, stageTag, "Stage Id")

val parameterPath = UIUtils.prependBaseUri(request, basePath) + s"/$subPath/?" +
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

private val parameterPath = UIUtils.prependBaseUri(request, basePath) + s"/$subPath/?" +
getParameterOtherTable(request, stageTag)

override val dataSource = new StageDataSource(
Expand All @@ -138,7 +139,6 @@ private[ui] class StagePagedTable(
)

override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$stageTag.sort=$encodedSortColumn" +
Expand All @@ -147,10 +147,8 @@ private[ui] class StagePagedTable(
s"#$tableHeaderId"
}

override def goButtonFormPath: String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
override def goButtonFormPath: String =
s"$parameterPath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc#$tableHeaderId"
}

override def headers: Seq[Node] = {
// stageHeadersAndCssClasses has three parts: header title, sortable and tooltip information.
Expand Down Expand Up @@ -311,15 +309,9 @@ private[ui] class StageDataSource(
// table so that we can avoid creating duplicate contents during sorting the data
private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc))

private var _slicedStageIds: Set[Int] = _

override def dataSize: Int = data.size

override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = {
val r = data.slice(from, to)
_slicedStageIds = r.map(_.stageId).toSet
r
}
override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = data.slice(from, to)

private def stageRow(stageData: v1.StageData): StageTableRowData = {
val formattedSubmissionTime = stageData.submissionTime match {
Expand Down Expand Up @@ -350,7 +342,6 @@ private[ui] class StageDataSource(
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""


new StageTableRowData(
stageData,
Some(stageData),
Expand Down
Expand Up @@ -487,6 +487,7 @@ private[spark] object JsonProtocol {
("Callsite" -> rddInfo.callSite) ~
("Parent IDs" -> parentIds) ~
("Storage Level" -> storageLevel) ~
("Barrier" -> rddInfo.isBarrier) ~
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
Expand Down
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY

class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
Expand All @@ -37,10 +38,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
.setAppName("test-cluster")
.set(TEST_NO_STAGE_RETRY, true)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000)
}

// TODO (SPARK-31730): re-enable it
ignore("global sync by barrier() call") {
test("global sync by barrier() call") {
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
Expand All @@ -57,10 +58,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("share messages with allGather() call") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -78,10 +76,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("throw exception if we attempt to synchronize with different blocking calls") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -100,10 +95,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("successively sync with allGather and barrier") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -129,8 +121,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
assert(times2.max - times2.min <= 1000)
}

// TODO (SPARK-31730): re-enable it
ignore("support multiple barrier() call within a single task") {
test("support multiple barrier() call within a single task") {
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
Expand Down Expand Up @@ -285,6 +276,9 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with

test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
initLocalClusterSparkContext(2)
// It's required to reset the delay timer when a task is scheduled, otherwise all the tasks
// could get scheduled at ANY level.
sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
Expand Down
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWr
import org.apache.spark.deploy.history.EventLogTestHelper._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EVENT_LOG_DIR, EVENT_LOG_ENABLED}
import org.apache.spark.io._
import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
import org.apache.spark.resource.ResourceProfile
Expand Down Expand Up @@ -100,6 +101,49 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
testStageExecutorMetricsEventLogging()
}

test("SPARK-31764: isBarrier should be logged in event log") {
val conf = new SparkConf()
conf.set(EVENT_LOG_ENABLED, true)
conf.set(EVENT_LOG_DIR, testDirPath.toString)
val sc = new SparkContext("local", "test-SPARK-31764", conf)
val appId = sc.applicationId

sc.parallelize(1 to 10)
.barrier()
.mapPartitions(_.map(elem => (elem, elem)))
.filter(elem => elem._1 % 2 == 0)
.reduceByKey(_ + _)
.collect
sc.stop()

val eventLogStream = EventLogFileReader.openEventLog(new Path(testDirPath, appId), fileSystem)
val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(parse(line)))
val jobStartEvents = events
.filter(event => event.isInstanceOf[SparkListenerJobStart])
.map(_.asInstanceOf[SparkListenerJobStart])

assert(jobStartEvents.size === 1)
val stageInfos = jobStartEvents.head.stageInfos
assert(stageInfos.size === 2)

val stage0 = stageInfos(0)
val rddInfosInStage0 = stage0.rddInfos
assert(rddInfosInStage0.size === 3)
val sortedRddInfosInStage0 = rddInfosInStage0.sortBy(_.scope.get.name)
assert(sortedRddInfosInStage0(0).scope.get.name === "filter")
assert(sortedRddInfosInStage0(0).isBarrier === true)
assert(sortedRddInfosInStage0(1).scope.get.name === "mapPartitions")
assert(sortedRddInfosInStage0(1).isBarrier === true)
assert(sortedRddInfosInStage0(2).scope.get.name === "parallelize")
assert(sortedRddInfosInStage0(2).isBarrier === false)

val stage1 = stageInfos(1)
val rddInfosInStage1 = stage1.rddInfos
assert(rddInfosInStage1.size === 1)
assert(rddInfosInStage1(0).scope.get.name === "reduceByKey")
assert(rddInfosInStage1(0).isBarrier === false) // reduceByKey
}

/* ----------------- *
* Actual test logic *
* ----------------- */
Expand Down
Expand Up @@ -98,7 +98,6 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
val taskTable = new TaskPagedTable(
stageData,
basePath = "/a/b/c",
currentTime = 0,
pageSize = 10,
sortColumn = "Index",
desc = false,
Expand Down

0 comments on commit 8bfd599

Please sign in to comment.