From 7d76472db2770845f813381a4a810759dc745203 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 1 Dec 2016 18:44:43 +0900 Subject: [PATCH 1/5] Close first --- .../internal/io/SparkHadoopMapReduceWriter.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index aaeb3d003829a..b2ede160f2ad9 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -146,7 +146,7 @@ object SparkHadoopMapReduceWriter extends Logging { case c: Configurable => c.setConf(hadoopConf) case _ => () } - val writer = taskFormat.getRecordWriter(taskContext) + var writer = taskFormat.getRecordWriter(taskContext) .asInstanceOf[RecordWriter[K, V]] require(writer != null, "Unable to obtain RecordWriter") var recordsWritten = 0L @@ -163,12 +163,20 @@ object SparkHadoopMapReduceWriter extends Logging { outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } - + if (writer != null) { + writer.close(taskContext) + writer = null + } committer.commitTask(taskContext) }(catchBlock = { committer.abortTask(taskContext) logError(s"Task ${taskContext.getTaskAttemptID} aborted.") - }, finallyBlock = writer.close(taskContext)) + }, finallyBlock = { + if (writer != null) { + writer.close(taskContext) + writer = null + } + }) outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => From 3a66a74b1be08106d036419e9859bed1adc55636 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 1 Dec 2016 22:31:26 +0900 Subject: [PATCH 2/5] Resemble FileFormatWriter --- .../internal/io/SparkHadoopMapReduceWriter.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index b2ede160f2ad9..ca995e9910b1c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -169,12 +169,14 @@ object SparkHadoopMapReduceWriter extends Logging { } committer.commitTask(taskContext) }(catchBlock = { - committer.abortTask(taskContext) - logError(s"Task ${taskContext.getTaskAttemptID} aborted.") - }, finallyBlock = { - if (writer != null) { - writer.close(taskContext) - writer = null + try { + if (writer != null) { + writer.close(taskContext) + writer = null + } + } finally { + committer.abortTask(taskContext) + logError(s"Task ${taskContext.getTaskAttemptID} aborted.") } }) From 66ff918847a57b3ce4b5abbdf56498e3b871bbf0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 1 Dec 2016 22:35:36 +0900 Subject: [PATCH 3/5] Even resemble comments --- .../apache/spark/internal/io/SparkHadoopMapReduceWriter.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index ca995e9910b1c..6de1fc06858e4 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -154,6 +154,7 @@ object SparkHadoopMapReduceWriter extends Logging { // Write all rows in RDD partition. try { val ret = Utils.tryWithSafeFinallyAndFailureCallbacks { + // Write rows out, release resource and commit the task. while (iterator.hasNext) { val pair = iterator.next() writer.write(pair._1, pair._2) @@ -169,6 +170,7 @@ object SparkHadoopMapReduceWriter extends Logging { } committer.commitTask(taskContext) }(catchBlock = { + // If there is an error, release resource and then abort the task. try { if (writer != null) { writer.close(taskContext) From 9654fab9756350cc5dcb7c402673437a3b19e464 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 3 Dec 2016 01:49:47 +0900 Subject: [PATCH 4/5] Remove impossible null check --- .../spark/internal/io/SparkHadoopMapReduceWriter.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 6de1fc06858e4..f713ea2f5818d 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -164,10 +164,7 @@ object SparkHadoopMapReduceWriter extends Logging { outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } - if (writer != null) { - writer.close(taskContext) - writer = null - } + writer.close(taskContext) committer.commitTask(taskContext) }(catchBlock = { // If there is an error, release resource and then abort the task. From 48048622067f092ed247bc555e5461c073894a9c Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 3 Dec 2016 02:01:43 +0900 Subject: [PATCH 5/5] Let's check null for sure --- .../spark/internal/io/SparkHadoopMapReduceWriter.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index f713ea2f5818d..6de1fc06858e4 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -164,7 +164,10 @@ object SparkHadoopMapReduceWriter extends Logging { outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } - writer.close(taskContext) + if (writer != null) { + writer.close(taskContext) + writer = null + } committer.commitTask(taskContext) }(catchBlock = { // If there is an error, release resource and then abort the task.