Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix nullability computation in union output #89

Merged
merged 31 commits into from
Sep 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6152099
[SPARK-10676] [DOCS] Add documentation for SASL encryption options.
Sep 21, 2015
7ab4d17
[SPARK-10495] [SQL] Read date values in JSON data stored by Spark 1.5.0.
yhuai Sep 22, 2015
ed74d30
[DOC] [PYSPARK] [MLLIB] Added newlines to docstrings to fix parameter…
noel-smith Sep 22, 2015
86f9a35
[SPARK-10495] [SQL] [BRANCH-1.5] Fix build.
yhuai Sep 22, 2015
bb8e481
[SPARK-10711] [SPARKR] Do not assume spark.submit.deployMode is alway…
falaki Sep 22, 2015
f83b6e6
[SPARK-10716] [BUILD] spark-1.5.0-bin-hadoop2.6.tgz file doesn't unco…
srowen Sep 22, 2015
d0e6e53
[SPARK-10649] [STREAMING] Prevent inheriting job group and irrelevant…
tdas Sep 22, 2015
03215e3
[SPARK-8567] [SQL] Increase the timeout of o.a.s.sql.hive.HiveSparkSu…
yhuai Sep 22, 2015
a2b0fee
[SQL] [MINOR] map -> foreach.
rxin Sep 22, 2015
646155e
[SPARK-10695] [DOCUMENTATION] [MESOS] Fixing incorrect value informati…
SleepyThread Sep 22, 2015
c3112a9
[SPARK-10593] [SQL] fix resolve output of Generate
Sep 22, 2015
54334d3
[SPARK-10740] [SQL] handle nondeterministic expressions correctly for…
cloud-fan Sep 22, 2015
d83dcc9
[SPARK-10672] [SQL] Do not fail when we cannot save the metadata of a…
yhuai Sep 22, 2015
6b1e5c2
[SPARK-10737] [SQL] When using UnsafeRows, SortMergeJoin may return w…
yhuai Sep 22, 2015
3339916
[SPARK-10714] [SPARK-8632] [SPARK-10685] [SQL] Refactor Python UDF ha…
rxin Sep 22, 2015
7dce786
Fix nullability computation in union output
mbautin Sep 18, 2015
583cdb8
bumped maven-release-plugin
markhamstra Sep 21, 2015
6b3a590
Removed 'final' from SetOperation#output
markhamstra Sep 21, 2015
5466b67
Merge branch 'branch-1.5' of github.com:apache/spark into csd-1.5
markhamstra Sep 22, 2015
5ffd084
[SPARK-10640] History server fails to parse TaskCommitDenied
Sep 22, 2015
118ebd4
Revert "[SPARK-10640] History server fails to parse TaskCommitDenied"
Sep 23, 2015
26187ab
[SPARK-10640] History server fails to parse TaskCommitDenied
Sep 22, 2015
73d0621
[SPARK-10310] [SQL] Fixes script transformation field/line delimiters
zhichao-li Sep 23, 2015
7f07cc6
[SPARK-10663] Removed unnecessary invocation of DataFrame.toDF method.
hagenhaus Sep 23, 2015
8a23ef5
[SPARK-10652] [SPARK-10742] [STREAMING] Set meaningful job descriptio…
tdas Sep 23, 2015
6a616d0
[SPARK-10224] [STREAMING] Fix the issue that blockIntervalTimer won't…
zsxwing Sep 23, 2015
4174b94
[SPARK-10769] [STREAMING] [TESTS] Fix o.a.s.streaming.CheckpointSuite…
zsxwing Sep 23, 2015
6c6cadb
[SPARK-9710] [TEST] Fix RPackageUtilsSuite when R is not available.
Aug 10, 2015
64cc62c
[SPARK-10403] Allow UnsafeRowSerializer to work with tungsten-sort Sh…
JoshRosen Sep 23, 2015
27a333f
[SPARK-5260] [CORE] don't submit stage until its dependencies map out…
suyanNone Sep 21, 2015
86443ea
Merge branch 'branch-1.5' of github.com:apache/spark into csd-1.5
markhamstra Sep 23, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark

import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

// ==============================================================================================
// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol!
// ==============================================================================================

/**
* :: DeveloperApi ::
* Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
Expand Down
54 changes: 43 additions & 11 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => JM
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
import scala.util.control.NonFatal

import com.google.common.base.Charsets.UTF_8
import org.apache.hadoop.conf.Configuration
Expand All @@ -38,7 +39,6 @@ import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{SerializableConfiguration, Utils}

import scala.util.control.NonFatal

private[spark] class PythonRDD(
@transient parent: RDD[_],
Expand All @@ -61,11 +61,39 @@ private[spark] class PythonRDD(
if (preservePartitoning) firstParent.partitioner else None
}

val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)

override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val runner = new PythonRunner(
command, envVars, pythonIncludes, pythonExec, pythonVer, broadcastVars, accumulator,
bufferSize, reuse_worker)
runner.compute(firstParent.iterator(split, context), split.index, context)
}
}


/**
* A helper class to run Python UDFs in Spark.
*/
private[spark] class PythonRunner(
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]],
bufferSize: Int,
reuse_worker: Boolean)
extends Logging {

def compute(
inputIterator: Iterator[_],
partitionIndex: Int,
context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
if (reuse_worker) {
envVars += ("SPARK_REUSE_WORKER" -> "1")
Expand All @@ -75,7 +103,7 @@ private[spark] class PythonRDD(
@volatile var released = false

// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)
val writerThread = new WriterThread(env, worker, inputIterator, partitionIndex, context)

context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
Expand Down Expand Up @@ -183,13 +211,16 @@ private[spark] class PythonRDD(
new InterruptibleIterator(context, stdoutIterator)
}

val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)

/**
* The thread responsible for writing the data from the PythonRDD's parent iterator to the
* Python process.
*/
class WriterThread(env: SparkEnv, worker: Socket, split: Partition, context: TaskContext)
class WriterThread(
env: SparkEnv,
worker: Socket,
inputIterator: Iterator[_],
partitionIndex: Int,
context: TaskContext)
extends Thread(s"stdout writer for $pythonExec") {

@volatile private var _exception: Exception = null
Expand All @@ -211,11 +242,11 @@ private[spark] class PythonRDD(
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
val dataOut = new DataOutputStream(stream)
// Partition index
dataOut.writeInt(split.index)
dataOut.writeInt(partitionIndex)
// Python version of driver
PythonRDD.writeUTF(pythonVer, dataOut)
// sparkFilesDir
PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
PythonRDD.writeUTF(SparkFiles.getRootDirectory(), dataOut)
// Python includes (*.zip and *.egg files)
dataOut.writeInt(pythonIncludes.length)
for (include <- pythonIncludes) {
Expand Down Expand Up @@ -246,7 +277,7 @@ private[spark] class PythonRDD(
dataOut.writeInt(command.length)
dataOut.write(command)
// Data values
PythonRDD.writeIteratorToStream(firstParent.iterator(split, context), dataOut)
PythonRDD.writeIteratorToStream(inputIterator, dataOut)
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
dataOut.writeInt(SpecialLengths.END_OF_STREAM)
dataOut.flush()
Expand Down Expand Up @@ -327,7 +358,8 @@ private[spark] object PythonRDD extends Logging {

// remember the broadcasts sent to each worker
private val workerBroadcasts = new mutable.WeakHashMap[Socket, mutable.Set[Long]]()
private def getWorkerBroadcasts(worker: Socket) = {

def getWorkerBroadcasts(worker: Socket): mutable.Set[Long] = {
synchronized {
workerBroadcasts.getOrElseUpdate(worker, new mutable.HashSet[Long]())
}
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/api/r/RUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] object RUtils {
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
} else {
val sparkConf = SparkEnv.get.conf
(sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode"))
(sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode", "client"))
}

val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster"
Expand All @@ -67,7 +67,11 @@ private[spark] object RUtils {

/** Check if R is installed before running tests that use R commands. */
def isRInstalled: Boolean = {
val builder = new ProcessBuilder(Seq("R", "--version"))
builder.start().waitFor() == 0
try {
val builder = new ProcessBuilder(Seq("R", "--version"))
builder.start().waitFor() == 0
} catch {
case e: Exception => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ class DAGScheduler(
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingTasks.clear()
stage.pendingPartitions.clear()

// First figure out the indexes of partition ids to compute.
val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
Expand Down Expand Up @@ -910,8 +910,8 @@ class DAGScheduler(

if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
Expand Down Expand Up @@ -1002,7 +1002,7 @@ class DAGScheduler(
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
Expand Down Expand Up @@ -1048,7 +1048,7 @@ class DAGScheduler(
shuffleStage.addOutputLoc(smt.partitionId, status)
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
Expand Down Expand Up @@ -1101,7 +1101,7 @@ class DAGScheduler(

case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingTasks += task
stage.pendingPartitions += task.partitionId

case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[spark] abstract class Stage(
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]

var pendingTasks = new HashSet[Task[_]]
val pendingPartitions = new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ private[spark] class TaskSetManager(
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
taskName, taskId, host, taskLocality, serializedTask.limit))
logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
s"$taskLocality, ${serializedTask.limit} bytes)")

sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
Expand Down
62 changes: 60 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.ui

import java.text.SimpleDateFormat
import java.util.{Locale, Date}
import java.util.{Date, Locale}

import scala.xml.{Node, Text, Unparsed}
import scala.util.control.NonFatal
import scala.xml._
import scala.xml.transform.{RewriteRule, RuleTransformer}

import org.apache.spark.Logging
import org.apache.spark.ui.scope.RDDOperationGraph
Expand Down Expand Up @@ -395,4 +397,60 @@ private[spark] object UIUtils extends Logging {
</script>
}

/**
* Returns HTML rendering of a job or stage description. It will try to parse the string as HTML
* and make sure that it only contains anchors with root-relative links. Otherwise,
* the whole string will rendered as a simple escaped text.
*
* Note: In terms of security, only anchor tags with root relative links are supported. So any
* attempts to embed links outside Spark UI, or other tags like <script> will cause in the whole
* description to be treated as plain text.
*/
def makeDescription(desc: String, basePathUri: String): NodeSeq = {
import scala.language.postfixOps

// If the description can be parsed as HTML and has only relative links, then render
// as HTML, otherwise render as escaped string
try {
// Try to load the description as unescaped HTML
val xml = XML.loadString(s"""<span class="description-input">$desc</span>""")

// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span")
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
allowedNodeLabels.contains(node.label)
}
if (illegalNodes.nonEmpty) {
throw new IllegalArgumentException(
"Only HTML anchors allowed in job descriptions\n" +
illegalNodes.map { n => s"${n.label} in $n"}.mkString("\n\t"))
}

// Verify that all links are relative links starting with "/"
val allLinks =
xml \\ "a" flatMap { _.attributes } filter { _.key == "href" } map { _.value.toString }
if (allLinks.exists { ! _.startsWith ("/") }) {
throw new IllegalArgumentException(
"Links in job descriptions must be root-relative:\n" + allLinks.mkString("\n\t"))
}

// Prepend the relative links with basePathUri
val rule = new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
case e: Elem if e \ "@href" nonEmpty =>
val relativePath = e.attribute("href").get.toString
val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}"
e % Attribute(null, "href", fullUri, Null)
case _ => n
}
}
}
new RuleTransformer(rule).transform(xml)
} catch {
case NonFatal(e) =>
logWarning(s"Invalid job description: $desc ", e)
<span class="description-input">{desc}</span>
}
}
}
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.spark.ui.jobs

import scala.collection.mutable.{HashMap, ListBuffer}
import scala.xml.{Node, NodeSeq, Unparsed, Utility}

import java.util.Date
import javax.servlet.http.HttpServletRequest

import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.xml._

import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}

/** Page showing list of all ongoing and recently finished jobs */
private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
Expand Down Expand Up @@ -224,14 +224,16 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
val jobDescription = UIUtils.makeDescription(lastStageDescription, parent.basePath)

val detailUrl =
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
<tr id={"job-" + job.jobId}>
<td sorttable_customkey={job.jobId.toString}>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
<span class="description-input" title={lastStageDescription}>{lastStageDescription}</span>
{jobDescription}
<a href={detailUrl} class="name-link">{lastStageName}</a>
</td>
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.ui.jobs

import scala.xml.Node
import scala.xml.Text

import java.util.Date

import scala.xml.{Node, Text}

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.scheduler.StageInfo
Expand Down Expand Up @@ -116,7 +115,7 @@ private[ui] class StageTableBase(
stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
desc <- stageData.description
} yield {
<span class="description-input" title={desc}>{desc}</span>
UIUtils.makeDescription(desc, basePathUri)
}
<div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div>
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ private[spark] object JsonProtocol {
("Metrics" -> metrics)
case ExecutorLostFailure(executorId) =>
("Executor ID" -> executorId)
case taskCommitDenied: TaskCommitDenied =>
("Job ID" -> taskCommitDenied.jobID) ~
("Partition ID" -> taskCommitDenied.partitionID) ~
("Attempt Number" -> taskCommitDenied.attemptNumber)
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
Expand Down Expand Up @@ -769,6 +773,7 @@ private[spark] object JsonProtocol {
val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
val taskKilled = Utils.getFormattedClassName(TaskKilled)
val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
val unknownReason = Utils.getFormattedClassName(UnknownReason)

Expand All @@ -793,6 +798,14 @@ private[spark] object JsonProtocol {
ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `taskCommitDenied` =>
// Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
// de/serialization logic was not added until 1.5.1. To provide backward compatibility
// for reading those logs, we need to provide default values for all the fields.
val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
TaskCommitDenied(jobId, partitionId, attemptNo)
case `executorLostFailure` =>
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
ExecutorLostFailure(executorId.getOrElse("Unknown"))
Expand Down
Loading