Skip to content

Commit

Permalink
Fixed unit test failures. One more to go.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jul 30, 2014
1 parent cae0af3 commit d256b45
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ private[spark] class ShuffleMapTask(
this(stageId, rdd.broadcasted, dep, rdd.partitions(partitionId), locs)
}

/** A constructor used only in test suites. This does not require passing in an RDD. */
def this(partitionId: Int) {
this(0, null, null, new Partition { override def index = 0 }, null)
}

@transient private val preferredLocs: Seq[TaskLocation] = {
if (locs == null) Nil else locs.toSet.toSeq
}
Expand Down
8 changes: 1 addition & 7 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,19 +155,13 @@ class RDDSuite extends FunSuite with SharedSparkContext {
override def getPartitions: Array[Partition] = Array(onlySplit)
override val getDependencies = List[Dependency[_]]()
override def compute(split: Partition, context: TaskContext): Iterator[Int] = {
if (shouldFail) {
throw new Exception("injected failure")
} else {
Array(1, 2, 3, 4).iterator
}
throw new Exception("injected failure")
}
}.cache()
val thrown = intercept[Exception]{
rdd.collect()
}
assert(thrown.getMessage.contains("injected failure"))
shouldFail = false
assert(rdd.collect().toList === List(1, 2, 3, 4))
}

test("empty RDD") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.util.Utils

class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {

test("test LRU eviction of stages") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
Expand Down Expand Up @@ -66,7 +67,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0, null, null, 0, null)
var task = new ShuffleMapTask(0)
val taskType = Utils.getFormattedClassName(task)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
Expand All @@ -76,22 +77,22 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo =
new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
task = new ShuffleMapTask(0)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToData.size === 1)

// finish this task, should get updated duration
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
task = new ShuffleMapTask(0)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
.shuffleRead === 2000)

// finish this task, should get updated duration
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
task = new ShuffleMapTask(0)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
.shuffleRead === 1000)
Expand All @@ -103,7 +104,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val metrics = new TaskMetrics()
val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
val task = new ShuffleMapTask(0, null, null, 0, null)
val task = new ShuffleMapTask(0)
val taskType = Utils.getFormattedClassName(task)

// Go through all the failure cases to make sure we are counting them as failures.
Expand Down

0 comments on commit d256b45

Please sign in to comment.