Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25158][SQL]Executor accidentally exit because ScriptTransformationWriterThread throw Exception. #22149

Closed

Conversation

LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Aug 20, 2018

What changes were proposed in this pull request?

Run Spark-Sql job use transform features(ScriptTransformationExec) with config spark.speculation = true, sometimes job fails and we found many Executor Dead through Executor Tab, through analysis log and code we found :

ScriptTransformationExec start a new thread(ScriptTransformationWriterThread), the new thread is very likely to throw TaskKilledException(from iter.map.foreach part) when speculation is on, this exception will captured by SparkUncaughtExceptionHandler which registered during Executor start, SparkUncaughtExceptionHandler will call System.exit (SparkExitCode.UNCAUGHT_EXCEPTION) to shutdown Executor, this is unexpected.

We should not kill the executor just because ScriptTransformationWriterThread fails. log the error(not only TaskKilledException) instead of throwing it is enough, Exception already pass to ScriptTransformationExec and handle by TaskRunner.

How was this patch tested?

Register TestUncaughtExceptionHandler to test case in ScriptTransformationSuite, then assert there is no Uncaught Exception handled.

Before this patch "script transformation should not swallow errors from upstream operators (no serde)" and "script transformation should not swallow errors from upstream operators (with serde)" throwing IllegalArgumentException and handle by TestUncaughtExceptionHandler .

@LuciferYang
Copy link
Contributor Author

ping @xuanyuanking

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, we have tested this in our product env. cc @cloud-fan for the further review.

@xuanyuanking
Copy link
Member

Gental ping @gatorsmile.

@gatorsmile
Copy link
Member

Is that possible to add a test case?

@gatorsmile
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95320 has finished for PR 22149 at commit 412497f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member

retest this please.

@xuanyuanking
Copy link
Member

Is that possible to add a test case?

Thanks for your reply Xiao, we encountered some difficulties during the test case, cause this need mock on speculative behavior. We will keep looking into this.

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95323 has finished for PR 22149 at commit 412497f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Aug 30, 2018

 Is that possible to add a test case?

Thanks for your reply Xiao, I tried to add a test case to reproduce bad case, but it only reproduce in small probability,we need to ensure that Task still run in iter.map(outputProjection).foreach { row => xxxx} code part of ScriptTransformationWriterThread.run method and not completed when TaskRunner response KillTask cmd , timing is difficult to accuracy control.

@LuciferYang
Copy link
Contributor Author

@gatorsmile should we fix this problem?

@@ -308,6 +308,12 @@ private class ScriptTransformationWriterThread(
}
threwException = false
} catch {
// TaskKilledException should not be thrown again, otherwise it will be captured by
// SparkUncaughtExceptionHandler, then Executor will exit because of TaskKilledException.
case e: TaskKilledException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which line of code may throw this exception? and how is this exception handled in other operators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply @cloud-fan ,
The error log and problem analysis of one bad case I paste in https://issues.apache.org/jira/browse/SPARK-25158.
In that case .UnsafeInMemorySorter$SortedIterator.loadNext throw TaskKilledException.
In another case TaskKilledException throw by InterruptibleIterator.hasNext , and from the error stack we can see that line 281 iter.map(outputProjection).foreach is the stack root.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually ScriptTransformationExec like a streaming pipe, but other operators basically no such characteristics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand perhaps ScriptTransformationWriterThread should not throw any Exception because the we do Exception mark and handle by ScriptTransformationExec , but I'm not sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually ScriptTransformationExec like a streaming pipe, but other operators basically no such characteristics.

can you elaborate on it? How TaskKilledException is handled in other operators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, maybe I didn't express it clearly.

There is no essential difference between ScriptTransformationExec and other operators in exception handle, exceptions handle by catch block of Executor.TaskRunner#run method.

The key point isScriptTransformationExec create a new thread named ScriptTransformationWriterThread .

Exception throw from ScriptTransformationWriterThread thread cannot catch by ScriptTransformationExec and catch block of Executor.TaskRunner#run method, then it will handle by SparkUncaughtExceptionHandler which we register in Executor at !isLocal run model.

SparkUncaughtExceptionHandler will call System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) when case _ if exitOnUncaughtException

Copy link
Contributor

@cloud-fan cloud-fan Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So ScriptTransformationExec is special because

  1. it starts a new thread
  2. the new thread is very likely to throw TaskKilledException, when speculation is on.

I think we should not kill the executor just because ScriptTransformationWriterThread fails. We should log the error(not only TaskKilledException), instead of throwing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot argree more ~

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99112 has finished for PR 22149 at commit db7167c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def cleanStatus: Unit = _exception = null

override def uncaughtException(t: Thread, e: Throwable): Unit = {
logError(s"Thread ${t.getName} handle by TestUncaughtExceptionHandler")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do logging in test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needn't ~


protected override def beforeAll(): Unit = {
super.beforeAll()
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we better get default handler and restore it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~ getDefaultUncaughtExceptionHandler and store it at beforeAll method, then restore use defaultUncaughtExceptionHandler at afterAll method

case t: Throwable =>
// An error occurred while writing input, so kill the child process. According to the
// Javadoc this call will not throw an exception:
_exception = t
proc.destroy()
throw t
logError(s"Thread-ScriptTransformation-Feed exit cause by: ", t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need to use string interpolation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx ~ no need~

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99116 has finished for PR 22149 at commit 6a90e8c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99113 has finished for PR 22149 at commit 077f16a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99114 has finished for PR 22149 at commit 82cb013.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99115 has finished for PR 22149 at commit 941fd9e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LuciferYang
Copy link
Contributor Author

@cloud-fan should we fix this problems?

@cloud-fan
Copy link
Contributor

the fix LGTM, but can we provide a better test? IIUC the test can be:

  1. create a UDF which would throw exception when producing data
  2. use spark.range(...).select(myUDF(...)) as input plan and do script transform
  3. assert that the query fails, but the executor is still alive.

@LuciferYang
Copy link
Contributor Author

@cloud-fan ok~

@LuciferYang
Copy link
Contributor Author

@cloud-fan There are two things to confirm before submit the test case:

  1. This case is about hive transform not UDF, so I need add a python script file for this case, will we accept this?

  2. SparkUncaughtExceptionHandler not register to Executor in local mode, the test case maybe only add a TestUncaughtExceptionHandler like I add to ScriptTransformationSuite and register it manually, then verify job fails but there is no exception catch by TestUncaughtExceptionHandler like asserts in ScriptTransformationSuite

@LuciferYang
Copy link
Contributor Author

I found there are some python scripts in spark-hive module, I'll try to reuse it.

@LuciferYang LuciferYang changed the title [SPARK-25158][SQL]Executor accidentally exit because ScriptTransformationWriterThread throws TaskKilledException. [SPARK-25158][SQL]Executor accidentally exit because ScriptTransformationWriterThread throw Exception. Feb 3, 2019
@SparkQA
Copy link

SparkQA commented Feb 3, 2019

Test build #102019 has finished for PR 22149 at commit 401f7d0.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • class TestUncaughtExceptionHandler extends Thread.UncaughtExceptionHandler

@SparkQA
Copy link

SparkQA commented Feb 3, 2019

Test build #102020 has finished for PR 22149 at commit 4ed4c9c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 3, 2019

Test build #102021 has finished for PR 22149 at commit 33e4343.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Feb 3, 2019

Test build #102025 has finished for PR 22149 at commit 33e4343.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -308,12 +308,15 @@ private class ScriptTransformationWriterThread(
}
threwException = false
} catch {
// SPARK-25158 Exception should not be thrown again, otherwise it will be captured by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All exceptions? How about the fatal exceptions? Should we do it only when it is NonFatal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All exceptions already raise to ScriptTransformationExec use _exception, they're all dealt with catch block of TaskRunner whatever it is a NonFatal Exception or a Fatal Exception

spark
.range(5)
.selectExpr("id AS a")
.createOrReplaceTempView("test")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap the code with withTempView

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done~

test("SPARK-25158: " +
"Executor accidentally exit because ScriptTransformationWriterThread throw Exception") {
withSQLConf("hive.script.recordwriter" ->
classOf[NonWritableRecordWriter].getCanonicalName) {
Copy link
Contributor

@cloud-fan cloud-fan Feb 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works, but I'm wondering if we can replace it with a UDF. e.g.

val badUDF = udf { (id: Long): Long => throw new RuntimeException(...) }
spark
  .range(5)
  .select(badUDF('id).as("a"))
  .createOrReplaceTempView("test")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Now run this case we can see expected log as follwing:

23:32:45.800 ERROR org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread: Thread-ScriptTransformation-Feed exit cause by: 
org.apache.spark.SparkException: Failed to execute user defined function(SQLQuerySuite$$Lambda$754/1644529474: (int) => int)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:106)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:730)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.$anonfun$run$1(ScriptTransformationExec.scala:281)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1899)
	at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformationExec.scala:270)
Caused by: java.lang.RuntimeException: Failed to produce data.
	at org.apache.spark.sql.hive.execution.SQLQuerySuite.$anonfun$new$458(SQLQuerySuite.scala:2370)
	at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:23)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:104)
	... 10 more

and uncaughtExceptionHandler catch nothing, use the original code uncaughtExceptionHandler will catch a SparkException

@SparkQA
Copy link

SparkQA commented Feb 11, 2019

Test build #102179 has finished for PR 22149 at commit 8b30ebc.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2019

Test build #102184 has finished for PR 22149 at commit c4887e5.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please


// Use a bad udf to generate failed inputs.
import org.apache.spark.sql.functions.udf
val badUDF = udf({x: Int =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

val badUDF = org.apache.spark.sql.functions.udf((x: Int) => {
  if ... else throw ...
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done~

|USING 'python $scriptFilePath/scripts/test_transform.py "\t"'
""".stripMargin).collect()
}
assert(e.getMessage.contains("Failed to produce data."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the result if we run this test without your fix and the TestUncaughtExceptionHandler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep throw t, TestUncaughtExceptionHandler will catch a SparkException.
Keep throw t and not set TestUncaughtExceptionHandler as DefaultUncaughtExceptionHandler, this case always success but we can see log as follwing:

ERROR org.apache.spark.util.Utils: Uncaught exception in thread Thread-ScriptTransformation-Feed

and the Uncaught exception will catch by SparkUncaughtExceptionHandler in non local model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if keep throw t and use SparkUncaughtExceptionHandler instead of TestUncaughtExceptionHandler , the case always success but we can see log as follwoing:

ERROR org.apache.spark.util.Utils: Uncaught exception in thread Thread-ScriptTransformation-Feed
ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Thread-ScriptTransformation-Feed,5,main]
Process finished with exit code 50

The test process finished with exit code 50 not 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test process finished with exit code 50 not 0

What's the return data of the query then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No return data... we intercept a SparkException, and the query should failed in local mode, I think we should be concerned that there are no uncaught exceptions like TaskKilledException catch by UncaughtExceptionHandler in non local model .

@SparkQA
Copy link

SparkQA commented Feb 11, 2019

Test build #102191 has finished for PR 22149 at commit a5e3d11.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2019

Test build #102190 has finished for PR 22149 at commit c4887e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 5864e8e Feb 12, 2019
@LuciferYang
Copy link
Contributor Author

@cloud-fan thx~

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ationWriterThread throw Exception.

## What changes were proposed in this pull request?

Run Spark-Sql job use transform features(`ScriptTransformationExec`) with config `spark.speculation = true`, sometimes job fails and we found many Executor Dead through `Executor Tab`, through analysis log and code we found :

`ScriptTransformationExec` start a new thread(`ScriptTransformationWriterThread`), the new thread is very likely to throw `TaskKilledException`(from iter.map.foreach part) when speculation is on, this exception will captured by `SparkUncaughtExceptionHandler` which registered during Executor start, `SparkUncaughtExceptionHandler` will call `System.exit (SparkExitCode.UNCAUGHT_EXCEPTION)` to shutdown `Executor`, this is unexpected.

We should not kill the executor just because `ScriptTransformationWriterThread` fails. log the error(not only `TaskKilledException`) instead of throwing it is enough, Exception already pass to `ScriptTransformationExec` and handle by `TaskRunner`.

## How was this patch tested?

Register `TestUncaughtExceptionHandler` to test case in `ScriptTransformationSuite`, then assert there is no Uncaught Exception handled.

Before this patch "script transformation should not swallow errors from upstream operators (no serde)" and "script transformation should not swallow errors from upstream operators (with serde)"  throwing `IllegalArgumentException` and handle by `TestUncaughtExceptionHandler` .

Closes apache#22149 from LuciferYang/fix-transformation-task-kill.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@LuciferYang LuciferYang deleted the fix-transformation-task-kill branch June 10, 2020 08:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants