Skip to content

Commit

Permalink
Remove local execution tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 17, 2015
1 parent ffa8c9b commit 8975d96
Showing 1 changed file with 9 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ class DAGSchedulerSuite
}

before {
// Enable local execution for this test
val conf = new SparkConf().set("spark.localExecution.enabled", "true")
sc = new SparkContext("local", "DAGSchedulerSuite", conf)
sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
failure = null
Expand All @@ -165,12 +163,7 @@ class DAGSchedulerSuite
sc.listenerBus,
mapOutputTracker,
blockManagerMaster,
sc.env) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
runLocallyWithinThread(job)
}
}
sc.env)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
}

Expand Down Expand Up @@ -277,37 +270,6 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

test("local job") {
val rdd = new PairOfIntsRDD(sc, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
Array(42 -> 0).iterator
override def getPartitions: Array[Partition] =
Array( new Partition { override def index: Int = 0 } )
override def getPreferredLocations(split: Partition): List[String] = Nil
override def toString: String = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(
JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
}

test("local job oom") {
val rdd = new PairOfIntsRDD(sc, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new java.lang.OutOfMemoryError("test local job oom")
override def getPartitions = Array( new Partition { override def index = 0 } )
override def getPreferredLocations(split: Partition) = Nil
override def toString = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(
JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
assert(results.size == 0)
assertDataStructuresEmpty()
}

test("run trivial job w/ dependency") {
val baseRdd = new MyRDD(sc, 1, Nil)
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
Expand Down Expand Up @@ -445,12 +407,7 @@ class DAGSchedulerSuite
sc.listenerBus,
mapOutputTracker,
blockManagerMaster,
sc.env) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
runLocallyWithinThread(job)
}
}
sc.env)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(noKillScheduler)
val jobId = submit(new MyRDD(sc, 1, Nil), Array(0))
cancel(jobId)
Expand Down Expand Up @@ -748,27 +705,12 @@ class DAGSchedulerSuite
// Run this on executors
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }

// Run this within a local thread
sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)

// Make sure we can still run local commands as well as cluster commands.
// Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}

test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
val e1 = intercept[SparkDriverExecutionException] {
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
(context: TaskContext, iter: Iterator[Int]) => iter.size,
Seq(0),
allowLocal = true,
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
}
assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])

val e2 = intercept[SparkDriverExecutionException] {
val e = intercept[SparkDriverExecutionException] {
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
Expand All @@ -777,11 +719,10 @@ class DAGSchedulerSuite
allowLocal = false,
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
}
assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])

// Make sure we can still run local commands as well as cluster commands.
// Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}

test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {
Expand All @@ -794,9 +735,8 @@ class DAGSchedulerSuite
rdd.reduceByKey(_ + _, 1).count()
}

// Make sure we can still run local commands as well as cluster commands.
// Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}

test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") {
Expand All @@ -810,9 +750,8 @@ class DAGSchedulerSuite
}
assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName))

// Make sure we can still run local commands as well as cluster commands.
// Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}

test("accumulator not calculated for resubmitted result stage") {
Expand Down

0 comments on commit 8975d96

Please sign in to comment.