Skip to content

Commit

Permalink
[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCom…
Browse files Browse the repository at this point in the history
…mitCoordinator

When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.

This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).

This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#8544 from JoshRosen/SPARK-10381.

(cherry picked from commit 38700ea)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
(cherry picked from commit 2bbcbc6)
  • Loading branch information
JoshRosen authored and Marcelo Vanzin committed Sep 29, 2015
1 parent 33fa88a commit 1a2cadc
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 69 deletions.
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}

def commit() {
SparkHadoopMapRedUtil.commitTask(
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
}

def commitJob() {
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,12 @@ case object TaskKilled extends TaskFailedReason {
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
case class TaskCommitDenied(
jobID: Int,
partitionID: Int,
attemptNumber: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ private[spark] class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
attemptID: Int)
attemptNumber: Int)
extends Exception(msg) {

def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptID)
def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ object SparkHadoopMapRedUtil extends Logging {
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
splitId: Int,
attemptId: Int): Unit = {
splitId: Int): Unit = {

val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)

Expand Down Expand Up @@ -122,7 +121,8 @@ object SparkHadoopMapRedUtil extends Logging {

if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
val taskAttemptNumber = TaskContext.get().attemptNumber()
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)

if (canCommit) {
performCommit()
Expand All @@ -132,7 +132,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, jobId, splitId, attemptId)
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
Expand All @@ -143,16 +143,4 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
}
}

def commitTask(
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
sparkTaskContext: TaskContext): Unit = {
commitTask(
committer,
mrTaskContext,
sparkTaskContext.stageId(),
sparkTaskContext.partitionId(),
sparkTaskContext.attemptNumber())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -970,8 +970,11 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)

outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
event.taskInfo.attempt, event.reason)
outputCommitCoordinator.taskCompleted(
stageId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, RpcEndpoint
private sealed trait OutputCommitCoordinationMessage extends Serializable

private case object StopCoordinator extends OutputCommitCoordinationMessage
private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)

/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
Expand All @@ -44,8 +44,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
var coordinatorRef: Option[RpcEndpointRef] = None

private type StageId = Int
private type PartitionId = Long
private type TaskAttemptId = Long
private type PartitionId = Int
private type TaskAttemptNumber = Int

/**
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
Expand All @@ -57,7 +57,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
private type CommittersByStageMap =
mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]

/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
Expand All @@ -75,14 +76,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* @param stage the stage number
* @param partition the partition number
* @param attempt a unique identifier for this task attempt
* @param attemptNumber how many times this task has been attempted
* (see [[TaskContext.attemptNumber()]])
* @return true if this task is authorized to commit, false otherwise
*/
def canCommit(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId): Boolean = {
val msg = AskPermissionToCommitOutput(stage, partition, attempt)
attemptNumber: TaskAttemptNumber): Boolean = {
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
coordinatorRef match {
case Some(endpointRef) =>
endpointRef.askWithRetry[Boolean](msg)
Expand All @@ -95,7 +97,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)

// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
}

// Called by DAGScheduler
Expand All @@ -107,7 +109,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def taskCompleted(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId,
attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit = synchronized {
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
Expand All @@ -117,12 +119,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
case Success =>
// The task output has been committed successfully
case denied: TaskCommitDenied =>
logInfo(
s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
s"attempt: $attemptNumber")
case otherReason =>
if (authorizedCommitters.get(partition).exists(_ == attempt)) {
logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
s" clearing lock")
if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
authorizedCommitters.remove(partition)
}
}
Expand All @@ -140,21 +142,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def handleAskPermissionToCommit(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId): Boolean = synchronized {
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(partition) match {
case Some(existingCommitter) =>
logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition; " +
s"existingCommitter = $existingCommitter")
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition; existingCommitter = $existingCommitter")
false
case None =>
logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
authorizedCommitters(partition) = attempt
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
authorizedCommitters(partition) = attemptNumber
true
}
case None =>
logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
s"partition $partition to commit")
false
}
}
Expand All @@ -174,9 +178,9 @@ private[spark] object OutputCommitCoordinator {
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
context.reply(
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
}
}
}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
val attempt: Int,
val attemptNumber: Int,
val launchTime: Long,
val executorId: String,
val host: String,
Expand Down Expand Up @@ -95,7 +95,10 @@ class TaskInfo(
}
}

def id: String = s"$index.$attempt"
@deprecated("Use attemptNumber", "1.6.0")
def attempt: Int = attemptNumber

def id: String = s"$index.$attemptNumber"

def duration: Long = {
if (!finished) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private[v1] object AllStagesResource {
new TaskData(
taskId = uiData.taskInfo.taskId,
index = uiData.taskInfo.index,
attempt = uiData.taskInfo.attempt,
attempt = uiData.taskInfo.attemptNumber,
launchTime = new Date(uiData.taskInfo.launchTime),
executorId = uiData.taskInfo.executorId,
host = uiData.taskInfo.host,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
serializationTimeProportionPos + serializationTimeProportion

val index = taskInfo.index
val attempt = taskInfo.attempt
val attempt = taskInfo.attemptNumber

val svgTag =
if (totalExecutionTime == 0) {
Expand Down Expand Up @@ -967,7 +967,7 @@ private[ui] class TaskDataSource(
new TaskTableRowData(
info.index,
info.taskId,
info.attempt,
info.attemptNumber,
info.speculative,
info.status,
info.taskLocality.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
("Attempt" -> taskInfo.attempt) ~
("Attempt" -> taskInfo.attemptNumber) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.{Span, Seconds}

import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, SparkFunSuite, TaskContext}
import org.apache.spark.util.Utils

/**
* Integration tests for the OutputCommitCoordinator.
*
* See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
*/
class OutputCommitCoordinatorIntegrationSuite
extends SparkFunSuite
with LocalSparkContext
with Timeouts {

override def beforeAll(): Unit = {
super.beforeAll()
val conf = new SparkConf()
.set("master", "local[2,4]")
.set("spark.speculation", "true")
.set("spark.hadoop.mapred.output.committer.class",
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
sc = new SparkContext("local[2, 4]", "test", conf)
}

test("exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
failAfter(Span(60, Seconds)) {
val tempDir = Utils.createTempDir()
try {
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
} finally {
Utils.deleteRecursively(tempDir)
}
}
}
}

private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
override def commitTask(context: TaskAttemptContext): Unit = {
val ctx = TaskContext.get()
if (ctx.attemptNumber < 1) {
throw new java.io.FileNotFoundException("Intentional exception")
}
super.commitTask(context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ import scala.language.postfixOps
* was not in SparkHadoopWriter, the tests would still pass because only one of the
* increments would be captured even though the commit in both tasks was executed
* erroneously.
*
* See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
* not use mocks.
*/
class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

Expand Down Expand Up @@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
val stage: Int = 1
val partition: Long = 2
val authorizedCommitter: Long = 3
val nonAuthorizedCommitter: Long = 100
val partition: Int = 2
val authorizedCommitter: Int = 3
val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage)
assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))

assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
// The non-authorized committer fails
outputCommitCoordinator.taskCompleted(
stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled)
// New tasks should still not be able to commit because the authorized committer has not failed
assert(
!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(
stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled)
// A new task should now be allowed to become the authorized committer
assert(
outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
// There can only be one authorized committer
assert(
!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
}
}

Expand Down
Loading

0 comments on commit 1a2cadc

Please sign in to comment.