From 42af1db00ae827f89c140d9f4274ab60620d76b1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 21 Jan 2015 12:39:51 +0800 Subject: [PATCH] remove unnecessary use of existential type in DAGScheduler --- .../apache/spark/scheduler/ActiveJob.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 22 +++++++++---------- .../spark/scheduler/DAGSchedulerEvent.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 6 ++--- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala index b755d8fb15757..f8a86637e91dc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.CallSite private[spark] class ActiveJob( val jobId: Int, val finalStage: Stage, - val func: (TaskContext, Iterator[_]) => _, + val func: (TaskContext, Iterator[Any]) => Any, val partitions: Array[Int], val callSite: CallSite, val listener: JobListener, diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1cfe98673773a..37738aa4134da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -26,7 +26,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack} import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps -import scala.reflect.ClassTag import scala.util.control.NonFatal import akka.pattern.ask @@ -477,18 +476,17 @@ class DAGScheduler( val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { - return new JobWaiter[U](this, jobId, 0, resultHandler) + new JobWaiter(this, jobId, 0, resultHandler) + } else { + val func2 = func.asInstanceOf[(TaskContext, Iterator[Any]) => Any] + val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) + eventProcessLoop.post(JobSubmitted( + jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)) + waiter } - - assert(partitions.size > 0) - val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] - val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) - eventProcessLoop.post(JobSubmitted( - jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)) - waiter } - def runJob[T, U: ClassTag]( + def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], @@ -521,7 +519,7 @@ class DAGScheduler( : PartialResult[R] = { val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) - val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] + val func2 = func.asInstanceOf[(TaskContext, Iterator[Any]) => Any] val partitions = (0 until rdd.partitions.size).toArray val jobId = nextJobId.getAndIncrement() eventProcessLoop.post(JobSubmitted( @@ -708,7 +706,7 @@ class DAGScheduler( private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], - func: (TaskContext, Iterator[_]) => _, + func: (TaskContext, Iterator[Any]) => Any, partitions: Array[Int], allowLocal: Boolean, callSite: CallSite, diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 2b6f7e4205c32..e97a35e48bf38 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -38,7 +38,7 @@ private[scheduler] sealed trait DAGSchedulerEvent private[scheduler] case class JobSubmitted( jobId: Int, finalRDD: RDD[_], - func: (TaskContext, Iterator[_]) => _, + func: (TaskContext, Iterator[Any]) => Any, partitions: Array[Int], allowLocal: Boolean, callSite: CallSite, diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index eb116213f69fc..ceabf29ffd5f3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -200,8 +200,8 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar * below, we do not expect this function to ever be executed; instead, we will return results * directly through CompletionEvents. */ - private val jobComputeFunc = (context: TaskContext, it: Iterator[(_)]) => - it.next.asInstanceOf[Tuple2[_, _]]._1 + private val jobComputeFunc = (context: TaskContext, it: Iterator[Any]) => + it.next.asInstanceOf[(Any, Any)]._1 /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { @@ -228,7 +228,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar private def submit( rdd: RDD[_], partitions: Array[Int], - func: (TaskContext, Iterator[_]) => _ = jobComputeFunc, + func: (TaskContext, Iterator[Any]) => Any = jobComputeFunc, allowLocal: Boolean = false, listener: JobListener = jobListener): Int = { val jobId = scheduler.nextJobId.getAndIncrement()