Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48292][CORE] Revert [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status #46696

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,7 @@ class SparkContext(config: SparkConf) extends Logging {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(
conf,
isLocal,
listenerBus,
SparkContext.numDriverCores(master, conf),
this)
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}

private[spark] def env: SparkEnv = _env
Expand Down
12 changes: 1 addition & 11 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ object SparkEnv extends Logging {
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
sparkContext: SparkContext,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
Expand All @@ -281,7 +280,6 @@ object SparkEnv extends Logging {
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
Option(sparkContext),
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
Expand Down Expand Up @@ -317,7 +315,6 @@ object SparkEnv extends Logging {
/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
// scalastyle:off argcount
private def create(
conf: SparkConf,
executorId: String,
Expand All @@ -328,9 +325,7 @@ object SparkEnv extends Logging {
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
sc: Option[SparkContext] = None,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
// scalastyle:on argcount

val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

Expand Down Expand Up @@ -473,12 +468,7 @@ object SparkEnv extends Logging {
}

val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
if (isDriver) {
new OutputCommitCoordinator(conf, isDriver, sc)
} else {
new OutputCommitCoordinator(conf, isDriver)
}

new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ private case class AskPermissionToCommitOutput(
* This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
* for an extensive design discussion.
*/
private[spark] class OutputCommitCoordinator(
conf: SparkConf,
isDriver: Boolean,
sc: Option[SparkContext] = None) extends Logging {
private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {

// Initialized by SparkEnv
var coordinatorRef: Option[RpcEndpointRef] = None
Expand Down Expand Up @@ -158,10 +155,9 @@ private[spark] class OutputCommitCoordinator(
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
if (stageState.authorizedCommitters(partition) == taskId) {
sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " +
s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " +
s"but task commit success, data duplication may happen. " +
s"reason=$reason"))
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
stageState.authorizedCommitters(partition) = null
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.apache.spark.scheduler

import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Seconds, Span}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, TaskContext}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext}

/**
* Integration tests for the OutputCommitCoordinator.
Expand All @@ -44,15 +45,13 @@ class OutputCommitCoordinatorIntegrationSuite
sc = new SparkContext("local[2, 4]", "test", conf)
}

test("SPARK-39195: exception thrown in OutputCommitter.commitTask()") {
test("exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
val e = intercept[SparkException] {
failAfter(Span(60, Seconds)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we still check the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't throw error after revert...., it can run success.

withTempDir { tempDir =>
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
}
}.getCause.getMessage
assert(e.contains("failed; but task commit success, data duplication may happen.") &&
e.contains("Intentional exception"))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,11 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
outputCommitCoordinator =
spy[OutputCommitCoordinator](
new OutputCommitCoordinator(conf, isDriver = true, Option(this)))
spy[OutputCommitCoordinator](new OutputCommitCoordinator(conf, isDriver = true))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator))
SparkContext.numDriverCores(master), Some(outputCommitCoordinator))
}
}
// Use Mockito.spy() to maintain the default infrastructure everywhere else
Expand Down Expand Up @@ -190,9 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
// A new task should not be allowed to become stage failed because of potential data duplication
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
// A new task should now be allowed to become the authorized committer
assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
nonAuthorizedCommitter + 2))
// There can only be one authorized committer
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
nonAuthorizedCommitter + 3))
}

test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
Expand Down Expand Up @@ -226,8 +228,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))

// Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
// then fail the 1st attempt and since stage failed because of potential data duplication,
// make sure fail the 4th attempt.
// then fail the 1st attempt and make sure the 4th one can commit again.
stage += 1
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
Expand All @@ -236,9 +237,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
ExecutorLostFailure("0", exitCausedByApp = true, None))
// A new task should not be allowed to become the authorized committer since stage failed
// because of potential data duplication
assert(!outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
}

test("SPARK-24589: Make sure stage state is cleaned up") {
Expand Down