From 097e7a21e15d3adf45687bd58ff095088f0282f7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 5 Dec 2014 17:45:41 -0800 Subject: [PATCH 1/8] [SPARK-4737] Catching task serialization exception in TaskSetManager Our previous attempt at handling un-serializable tasks involved selectively sampling a task from a task set, and attempting to serialize it. If the serialization was successful, we assumed that all tasks in the task set would also be serializable. Unfortunately, this is not always the case. For example, ParallelCollectionRDD may have both empty and non-empty partitions, and the empty partitions would be serializable while the non-empty partitions actually contain non-serializable objects. This is one of many examples where sampling task serialization breaks. When task serialization exceptions occurred in the TaskSchedulerImpl and TaskSetManager, the result was that the exception was not caught and the entire scheduler would crash. It would restart, but in a bad state. There's no reason why the stage should not be aborted if any serialization error occurs when submitting a task set. If any task in a task set throws an exception upon serialization, the task set manager informs the DAGScheduler that the stage failed, aborts the stage. The TaskSchedulerImpl needs to return a set of task descriptions that were successfully submitted, but the set will be empty in the case of a serialization error. --- .../spark/TaskNotSerializableException.scala | 27 +++++++++++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 20 ------------- .../spark/scheduler/TaskSchedulerImpl.scala | 24 +++++++++------ .../spark/scheduler/TaskSetManager.scala | 25 ++++++++++++---- .../org/apache/spark/SharedSparkContext.scala | 2 +- .../scala/org/apache/spark/rdd/RDDSuite.scala | 21 +++++++++++++ .../scheduler/NotSerializableFakeTask.scala | 23 ++++++++++++++ .../scheduler/TaskSchedulerImplSuite.scala | 30 +++++++++++++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 14 +++++++++ 9 files changed, 150 insertions(+), 36 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala create mode 100644 core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala diff --git a/core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala b/core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala new file mode 100644 index 0000000000000..ddaed7836902b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Exception thrown when a task cannot be serialized + */ +@DeveloperApi +class TaskNotSerializableException(error: Throwable) extends Exception(error) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb8ccfbdbdcbb..4f119873bafe0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -865,26 +865,6 @@ class DAGScheduler( } if (tasks.size > 0) { - // Preemptively serialize a task to make sure it can be serialized. We are catching this - // exception here because it would be fairly hard to catch the non-serializable exception - // down the road, where we have several different implementations for local scheduler and - // cluster schedulers. - // - // We've already serialized RDDs and closures in taskBinary, but here we check for all other - // objects such as Partition. - try { - closureSerializer.serialize(tasks.head) - } catch { - case e: NotSerializableException => - abortStage(stage, "Task not serializable: " + e.toString) - runningStages -= stage - return - case NonFatal(e) => // Other exceptions, such as IllegalArgumentException from Kryo. - abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}") - runningStages -= stage - return - } - logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingTasks ++= tasks logDebug("New pending tasks: " + stage.pendingTasks) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index cd3c015321e85..e262d2dd54136 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -256,15 +256,21 @@ private[spark] class TaskSchedulerImpl( val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) { - for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { - tasks(i) += task - val tid = task.taskId - taskIdToTaskSetId(tid) = taskSet.taskSet.id - taskIdToExecutorId(tid) = execId - executorsByHost(host) += execId - availableCpus(i) -= CPUS_PER_TASK - assert(availableCpus(i) >= 0) - launchedTask = true + try { + for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { + tasks(i) += task + val tid = task.taskId + taskIdToTaskSetId(tid) = taskSet.taskSet.id + taskIdToExecutorId(tid) = execId + executorsByHost(host) += execId + availableCpus(i) -= CPUS_PER_TASK + assert(availableCpus(i) >= 0) + launchedTask = true + } + } catch { + case e: TaskNotSerializableException => { + return Seq.empty[Seq[TaskDescription]] + } } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index cabdc655f89bf..0a19d64f8a911 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.io.NotSerializableException +import java.nio.ByteBuffer import java.util.Arrays import scala.collection.mutable.ArrayBuffer @@ -417,6 +418,7 @@ private[spark] class TaskSetManager( * @param host the host Id of the offered resource * @param maxLocality the maximum locality we want to schedule the tasks at */ + @throws[TaskNotSerializableException] def resourceOffer( execId: String, host: String, @@ -456,12 +458,23 @@ private[spark] class TaskSetManager( } // Serialize and return the task val startTime = clock.getTime() - // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here - // we assume the task can be serialized without exceptions. - val serializedTask = Task.serializeWithDependencies( - task, sched.sc.addedFiles, sched.sc.addedJars, ser) + var serializedTaskOpt : Option[ByteBuffer] = None + try { + serializedTaskOpt = Some(Task.serializeWithDependencies( + task, sched.sc.addedFiles, sched.sc.addedJars, ser)) + } catch { + // If the task cannot be serialized, then there's no point to re-attempt the task, + // as it will always fail. So just abort the whole task-set. + case e : Throwable => { + logError(s"Failed to serialize task $taskId, not attempting to retry it.", e) + abort(s"Failed to serialize task $taskId, not attempt to retry it. Exception" + + s"duringserialization is: $e") + throw new TaskNotSerializableException(e) + } + } + val serializedTask = serializedTaskOpt.get if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && - !emittedTaskSizeWarning) { + !emittedTaskSizeWarning) { emittedTaskSizeWarning = true logWarning(s"Stage ${task.stageId} contains a task of very large size " + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + @@ -474,7 +487,7 @@ private[spark] class TaskSetManager( // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format( - taskName, taskId, host, taskLocality, serializedTask.limit)) + taskName, taskId, host, taskLocality, serializedTask.limit)) sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala index 0b6511a80df1d..3d2700b7e6be4 100644 --- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala @@ -30,7 +30,7 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite => var conf = new SparkConf(false) override def beforeAll() { - _sc = new SparkContext("local", "test", conf) + _sc = new SparkContext("local[4]", "test", conf) super.beforeAll() } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 46fcb80fa1845..e50425aabb2a6 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.rdd +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} + +import com.esotericsoftware.kryo.KryoException + import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -887,6 +891,23 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 3) } + test("parallelize with exception thrown on serialization should not hang") { + class BadSerializable extends Serializable { + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream): Unit = throw new KryoException("Bad serialization") + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream): Unit = {} + } + // Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if there were + // more threads in the Spark Context than there were number of objects in this sequence. + intercept[Throwable] { + sc.parallelize(Seq(new BadSerializable, new BadSerializable)).collect + } + // Check that the context has not crashed + sc.parallelize(1 to 100).map(x => x*2).collect + } + /** A contrived RDD that allows the manual addition of dependencies after creation. */ private class CyclicalDependencyRDD[T: ClassTag] extends RDD[T](sc, Nil) { private val mutableDependencies: ArrayBuffer[Dependency[_]] = ArrayBuffer.empty diff --git a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala new file mode 100644 index 0000000000000..01682661657a3 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala @@ -0,0 +1,23 @@ +package org.apache.spark.scheduler + +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} + +import org.apache.spark.TaskContext + +/** + * A Task implementation that fails to serialize. + */ +class NotSerializableFakeTask(myId: Int, stageId: Int) extends Task[Array[Byte]](stageId, 0) { + override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte] + override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream): Unit = { + if (stageId == 0) { + throw new IllegalStateException("Cannot serialize") + } + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream): Unit = {} +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 7532da88c6065..71b501c64e86d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -331,4 +331,34 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin assert(1 === taskDescriptions.length) assert("executor0" === taskDescriptions(0).executorId) } + + test("Scheduler does not crash when tasks are not serializable") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskCpus = 2 + + sc.conf.set("spark.task.cpus", taskCpus.toString) + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + val dagScheduler = new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + val numFreeCores = 1 + taskScheduler.setDAGScheduler(dagScheduler) + var taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) + val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus), + new WorkerOffer("executor1", "host1", numFreeCores)) + taskScheduler.submitTasks(taskSet) + var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten + assert(0 === taskDescriptions.length) + + // Now check that we can still submit tasks + taskSet = FakeTask.createTaskSet(1) + taskScheduler.submitTasks(taskSet) + taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten + assert(1 === taskDescriptions.length) + assert("executor0" === taskDescriptions(0).executorId) + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 472191551a01f..84b9b788237bf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} import java.util.Random import scala.collection.mutable.ArrayBuffer @@ -563,6 +564,19 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.emittedTaskSizeWarning) } + test("Not serializable exception thrown if the task cannot be serialized") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + + val taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + + intercept[TaskNotSerializableException] { + manager.resourceOffer("exec1", "host1", ANY) + } + assert(manager.isZombie) + } + test("abort the job if total size of results is too large") { val conf = new SparkConf().set("spark.driver.maxResultSize", "2m") sc = new SparkContext("local", "test", conf) From bf5e706918d92c761fa537a88bc15ec2c4cc7838 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 8 Dec 2014 12:39:45 -0800 Subject: [PATCH 2/8] Fixing indentation. --- .../org/apache/spark/scheduler/TaskSetManager.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 0a19d64f8a911..cd449b4c82b54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -458,9 +458,9 @@ private[spark] class TaskSetManager( } // Serialize and return the task val startTime = clock.getTime() - var serializedTaskOpt : Option[ByteBuffer] = None + var serializedTask : ByteBuffer = null try { - serializedTaskOpt = Some(Task.serializeWithDependencies( + serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser)) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, @@ -472,9 +472,8 @@ private[spark] class TaskSetManager( throw new TaskNotSerializableException(e) } } - val serializedTask = serializedTaskOpt.get if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && - !emittedTaskSizeWarning) { + !emittedTaskSizeWarning) { emittedTaskSizeWarning = true logWarning(s"Stage ${task.stageId} contains a task of very large size " + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + @@ -487,7 +486,7 @@ private[spark] class TaskSetManager( // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format( - taskName, taskId, host, taskLocality, serializedTask.limit)) + taskName, taskId, host, taskLocality, serializedTask.limit)) sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) From 5f486f462233ae63987aa483e6d6eab342feef96 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 8 Dec 2014 12:59:38 -0800 Subject: [PATCH 3/8] Adding license header for fake task class --- .../scheduler/NotSerializableFakeTask.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala index 01682661657a3..c456d69c22157 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.scheduler import java.io.{ObjectInputStream, ObjectOutputStream, IOException} From 94844d736ed0d8322e2e0dda762961a9170d6a1d Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 8 Dec 2014 13:13:13 -0800 Subject: [PATCH 4/8] Fixing compilation error, one brace too many --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index cd449b4c82b54..7bd4e7411163d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -460,17 +460,16 @@ private[spark] class TaskSetManager( val startTime = clock.getTime() var serializedTask : ByteBuffer = null try { - serializedTask = Task.serializeWithDependencies( - task, sched.sc.addedFiles, sched.sc.addedJars, ser)) + serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, + sched.sc.addedJars, ser) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. - case e : Throwable => { + case e : Throwable => logError(s"Failed to serialize task $taskId, not attempting to retry it.", e) abort(s"Failed to serialize task $taskId, not attempt to retry it. Exception" + s"duringserialization is: $e") throw new TaskNotSerializableException(e) - } } if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { From b2a430d9f3fc8dac3c2a20aab6bd07bae8f17691 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 5 Jan 2015 16:41:59 -0500 Subject: [PATCH 5/8] Not returning empty seq when a task set cannot be serialized. Addressing Josh Rosen's comments. --- .../spark/scheduler/TaskSchedulerImpl.scala | 49 ++++++++++--------- .../spark/scheduler/TaskSetManager.scala | 12 ++--- .../scala/org/apache/spark/rdd/RDDSuite.scala | 4 +- .../scheduler/TaskSchedulerImplSuite.scala | 6 +-- 4 files changed, 38 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index e262d2dd54136..c64507f348ffa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -21,6 +21,8 @@ import java.nio.ByteBuffer import java.util.{TimerTask, Timer} import java.util.concurrent.atomic.AtomicLong +import org.apache.spark.scheduler.TaskLocality.TaskLocality + import scala.concurrent.duration._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -249,31 +251,34 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY var launchedTask = false - for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { - do { - launchedTask = false - for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { - try { - for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { - tasks(i) += task - val tid = task.taskId - taskIdToTaskSetId(tid) = taskSet.taskSet.id - taskIdToExecutorId(tid) = execId - executorsByHost(host) += execId - availableCpus(i) -= CPUS_PER_TASK - assert(availableCpus(i) >= 0) - launchedTask = true - } - } catch { - case e: TaskNotSerializableException => { - return Seq.empty[Seq[TaskDescription]] - } + def resourceOfferSingleTaskSet(taskSet: TaskSetManager, maxLocality: TaskLocality) : Unit = { + for (i <- 0 until shuffledOffers.size) { + val execId = shuffledOffers(i).executorId + val host = shuffledOffers(i).host + if (availableCpus(i) >= CPUS_PER_TASK) { + try { + for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { + tasks(i) += task + val tid = task.taskId + taskIdToTaskSetId(tid) = taskSet.taskSet.id + taskIdToExecutorId(tid) = execId + executorsByHost(host) += execId + availableCpus(i) -= CPUS_PER_TASK + assert(availableCpus(i) >= 0) + launchedTask = true + } + } catch { + case e: TaskNotSerializableException => { + return } } } + } + } + for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { + do { + launchedTask = false + resourceOfferSingleTaskSet(taskSet, maxLocality) } while (launchedTask) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 7bd4e7411163d..c39425e624d02 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import scala.math.{min, max} +import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.executor.TaskMetrics @@ -458,17 +459,16 @@ private[spark] class TaskSetManager( } // Serialize and return the task val startTime = clock.getTime() - var serializedTask : ByteBuffer = null - try { - serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, + val serializedTask : ByteBuffer = try { + Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. - case e : Throwable => + case NonFatal(e) => logError(s"Failed to serialize task $taskId, not attempting to retry it.", e) - abort(s"Failed to serialize task $taskId, not attempt to retry it. Exception" + - s"duringserialization is: $e") + abort(s"Failed to serialize task $taskId, not attempt to retry it. Exception " + + s"during serialization is: $e") throw new TaskNotSerializableException(e) } if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index e50425aabb2a6..62c7b1b066dab 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -894,10 +894,10 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("parallelize with exception thrown on serialization should not hang") { class BadSerializable extends Serializable { @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream): Unit = throw new KryoException("Bad serialization") + private def writeObject(out: ObjectOutputStream) : Unit = throw new KryoException("Bad serialization") @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = {} + private def readObject(in: ObjectInputStream) : Unit = {} } // Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if there were // more threads in the Spark Context than there were number of objects in this sequence. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 71b501c64e86d..12a3df5d6afe4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -354,11 +354,11 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin assert(0 === taskDescriptions.length) // Now check that we can still submit tasks - taskSet = FakeTask.createTaskSet(1) + // Even if one of the tasks has not-serializable tasks, the other task set should still be processed without error taskScheduler.submitTasks(taskSet) + taskScheduler.submitTasks(FakeTask.createTaskSet(1)) taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten - assert(1 === taskDescriptions.length) - assert("executor0" === taskDescriptions(0).executorId) + assert(taskDescriptions.map(_.executorId) === Seq("executor0")) } } From dfa145b6cd968d3cc5d341b2e5a2f85c62c27112 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 6 Jan 2015 13:56:46 -0500 Subject: [PATCH 6/8] Fixing style from Josh Rosen's feedback --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 6 ++++-- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index c64507f348ffa..6fc9d475ae722 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -21,8 +21,6 @@ import java.nio.ByteBuffer import java.util.{TimerTask, Timer} import java.util.concurrent.atomic.AtomicLong -import org.apache.spark.scheduler.TaskLocality.TaskLocality - import scala.concurrent.duration._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -33,6 +31,7 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.util.Utils import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId @@ -269,6 +268,9 @@ private[spark] class TaskSchedulerImpl( } } catch { case e: TaskNotSerializableException => { + logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") + // Do not offer resources for this task, but don't throw an error to allow other + // task sets to be submitted. return } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c39425e624d02..8c116eb6137cb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -459,7 +459,7 @@ private[spark] class TaskSetManager( } // Serialize and return the task val startTime = clock.getTime() - val serializedTask : ByteBuffer = try { + val serializedTask: ByteBuffer = try { Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) } catch { From 5267929054cce06dd1c422a6a010e82b81b22a13 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 8 Jan 2015 16:45:36 -0800 Subject: [PATCH 7/8] Fixing style suggestions from Andrew Or. --- .../spark/TaskNotSerializableException.scala | 6 +- .../spark/scheduler/TaskSchedulerImpl.scala | 67 +++++++++++-------- .../spark/scheduler/TaskSetManager.scala | 9 ++- .../scala/org/apache/spark/rdd/RDDSuite.scala | 6 +- .../scheduler/NotSerializableFakeTask.scala | 2 +- 5 files changed, 48 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala b/core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala index ddaed7836902b..9df61062e1f85 100644 --- a/core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala +++ b/core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala @@ -20,8 +20,6 @@ package org.apache.spark import org.apache.spark.annotation.DeveloperApi /** - * :: DeveloperApi :: - * Exception thrown when a task cannot be serialized + * Exception thrown when a task cannot be serialized. */ -@DeveloperApi -class TaskNotSerializableException(error: Throwable) extends Exception(error) +private[spark] class TaskNotSerializableException(error: Throwable) extends Exception(error) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6fc9d475ae722..c2374875a8c27 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -210,6 +210,42 @@ private[spark] class TaskSchedulerImpl( .format(manager.taskSet.id, manager.parent.name)) } + private def resourceOfferSingleTaskSet( + taskSet: TaskSetManager, + maxLocality: TaskLocality, + shuffledOffers: Seq[WorkerOffer], + availableCpus: Array[Int], + tasks: Seq[ArrayBuffer[TaskDescription]]) + : Boolean = + { + var launchedTask = false + for (i <- 0 until shuffledOffers.size) { + val execId = shuffledOffers(i).executorId + val host = shuffledOffers(i).host + if (availableCpus(i) >= CPUS_PER_TASK) { + try { + for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { + tasks(i) += task + val tid = task.taskId + taskIdToTaskSetId(tid) = taskSet.taskSet.id + taskIdToExecutorId(tid) = execId + executorsByHost(host) += execId + availableCpus(i) -= CPUS_PER_TASK + assert(availableCpus(i) >= 0) + launchedTask = true + } + } catch { + case e: TaskNotSerializableException => + logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") + // Do not offer resources for this task, but don't throw an error to allow other + // task sets to be submitted. + return launchedTask + } + } + } + return launchedTask + } + /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so @@ -250,37 +286,10 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY var launchedTask = false - def resourceOfferSingleTaskSet(taskSet: TaskSetManager, maxLocality: TaskLocality) : Unit = { - for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { - try { - for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { - tasks(i) += task - val tid = task.taskId - taskIdToTaskSetId(tid) = taskSet.taskSet.id - taskIdToExecutorId(tid) = execId - executorsByHost(host) += execId - availableCpus(i) -= CPUS_PER_TASK - assert(availableCpus(i) >= 0) - launchedTask = true - } - } catch { - case e: TaskNotSerializableException => { - logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") - // Do not offer resources for this task, but don't throw an error to allow other - // task sets to be submitted. - return - } - } - } - } - } for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { - launchedTask = false - resourceOfferSingleTaskSet(taskSet, maxLocality) + launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers, + availableCpus, tasks) } while (launchedTask) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8c116eb6137cb..2c4125310ad95 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -460,15 +460,14 @@ private[spark] class TaskSetManager( // Serialize and return the task val startTime = clock.getTime() val serializedTask: ByteBuffer = try { - Task.serializeWithDependencies(task, sched.sc.addedFiles, - sched.sc.addedJars, ser) + Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. case NonFatal(e) => - logError(s"Failed to serialize task $taskId, not attempting to retry it.", e) - abort(s"Failed to serialize task $taskId, not attempt to retry it. Exception " + - s"during serialization is: $e") + val msg = s"Failed to serialize task $taskId, not attempting to retry it." + logError(msg, e) + abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 62c7b1b066dab..4fbef3f73a2f3 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -891,13 +891,13 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 3) } - test("parallelize with exception thrown on serialization should not hang") { + test("task serialization exception should not hang scheduler") { class BadSerializable extends Serializable { @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream) : Unit = throw new KryoException("Bad serialization") + private def writeObject(out: ObjectOutputStream): Unit = throw new KryoException("Bad serialization") @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream) : Unit = {} + private def readObject(in: ObjectInputStream): Unit = {} } // Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if there were // more threads in the Spark Context than there were number of objects in this sequence. diff --git a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala index c456d69c22157..6b75c98839e03 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala @@ -24,7 +24,7 @@ import org.apache.spark.TaskContext /** * A Task implementation that fails to serialize. */ -class NotSerializableFakeTask(myId: Int, stageId: Int) extends Task[Array[Byte]](stageId, 0) { +private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int) extends Task[Array[Byte]](stageId, 0) { override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte] override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() From 154598404f181fbb761f38385b2e844c792c1b03 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 8 Jan 2015 18:23:13 -0800 Subject: [PATCH 8/8] Some more style fixes from Andrew Or. --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index c2374875a8c27..78b655095c359 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -215,9 +215,7 @@ private[spark] class TaskSchedulerImpl( maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], - tasks: Seq[ArrayBuffer[TaskDescription]]) - : Boolean = - { + tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId @@ -288,8 +286,8 @@ private[spark] class TaskSchedulerImpl( var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { - launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers, - availableCpus, tasks) + launchedTask = resourceOfferSingleTaskSet( + taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) }