From c1a635e722e36714582ab10ec04a361ff67c3aa5 Mon Sep 17 00:00:00 2001 From: Liu Shaohui Date: Fri, 5 May 2017 16:58:23 +0800 Subject: [PATCH 1/2] FileFormatWriter wrap the FetchFailedException which breaks the failure recovery chain --- .../spark/sql/execution/datasources/FileFormatWriter.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 4ec09bff429c5..eac789e74353d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -31,6 +31,7 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -259,8 +260,10 @@ object FileFormatWriter extends Logging { } }) } catch { - case t: Throwable => - throw new SparkException("Task failed while writing rows", t) + case e: FetchFailedException => + throw e + case _ => + throw new SparkException("Task failed while writing rows.", e) } } From c869d9c7acfe4fe9c43070185cbe303241248f08 Mon Sep 17 00:00:00 2001 From: Liu Shaohui Date: Mon, 8 May 2017 09:19:20 +0800 Subject: [PATCH 2/2] Fix bugs --- .../spark/sql/execution/datasources/FileFormatWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index eac789e74353d..23d5efeddf4a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -262,8 +262,8 @@ object FileFormatWriter extends Logging { } catch { case e: FetchFailedException => throw e - case _ => - throw new SparkException("Task failed while writing rows.", e) + case t: Throwable => + throw new SparkException("Task failed while writing rows.", t) } }