From 240aea1057c6670eed159e5f8ee951258e12f34c Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 10:15:23 -0700 Subject: [PATCH 01/18] Log error when spill file wasn't deleted --- .../collection/unsafe/sort/UnsafeSorterSpillReader.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 4989b05d63e23..d2284de4fe97e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -24,12 +24,15 @@ import org.apache.spark.storage.BlockId; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description * of the file format). */ final class UnsafeSorterSpillReader extends UnsafeSorterIterator { + private final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class); private final File file; private InputStream in; @@ -73,7 +76,9 @@ public void loadNext() throws IOException { numRecordsRemaining--; if (numRecordsRemaining == 0) { in.close(); - file.delete(); + if (!file.delete()) { + logger.error("Unable to delete spill file {}", file.getPath()); + } in = null; din = null; } From 6abdb8a272f899bf1640ad84ef9e6a1390ef3861 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 12:28:27 -0700 Subject: [PATCH 02/18] Check that the file exists before logging error --- .../util/collection/unsafe/sort/UnsafeSorterSpillReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index d2284de4fe97e..c43010ecf9976 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -76,7 +76,7 @@ public void loadNext() throws IOException { numRecordsRemaining--; if (numRecordsRemaining == 0) { in.close(); - if (!file.delete()) { + if (!file.delete() && file.exists()) { logger.error("Unable to delete spill file {}", file.getPath()); } in = null; From 69624087bcff59b62271280b3355557811bc05fc Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 13:34:35 -0700 Subject: [PATCH 03/18] Log error if file cannot be deleted in ExternalShuffleBlockResolver --- .../network/shuffle/ExternalShuffleBlockResolver.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index c5f93bb47f55c..822b46c1f2629 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -114,10 +114,14 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF "recover state for existing applications", registeredExecutorFile, e); if (registeredExecutorFile.isDirectory()) { for (File f : registeredExecutorFile.listFiles()) { - f.delete(); + if (!f.delete()) { + logger.error("error deleting " + f.getPath()); + } } } - registeredExecutorFile.delete(); + if (!registeredExecutorFile.delete()) { + logger.error("error deleting " + registeredExecutorFile.getPath()); + } options.createIfMissing(true); try { tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options); From cd46f2850998eb38c46916190d5af059afd28844 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 14:01:18 -0700 Subject: [PATCH 04/18] Add check for return value from file.delete() --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 4 +++- .../scala/org/apache/spark/deploy/RPackageUtils.scala | 10 +++++++--- .../spark/deploy/history/FsHistoryProvider.scala | 8 ++++++-- .../deploy/master/FileSystemPersistenceEngine.scala | 5 ++++- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 69da180593bb5..8cb6468c6af8a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -875,7 +875,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial if (!path.isEmpty) { val file = new File(path) if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning("Error deleting " + file.getPath()) + } } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 4b28866dcaa7c..7d160b6790eaa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -175,8 +175,10 @@ private[deploy] object RPackageUtils extends Logging { print(s"ERROR: Failed to build R package in $file.", printStream) print(RJarDoc, printStream) } - } finally { - rSource.delete() // clean up + } finally { // clean up + if (!rSource.delete()) { + logWarning(s"Error deleting ${rSource.getPath()}") + } } } else { if (verbose) { @@ -211,7 +213,9 @@ private[deploy] object RPackageUtils extends Logging { val filesToBundle = listFilesRecursively(dir, Seq(".zip")) // create a zip file from scratch, do not append to existing file. val zipFile = new File(dir, name) - zipFile.delete() + if (!zipFile.delete()) { + logWarning(s"Error deleting ${zipFile.getPath()}") + } val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false)) try { filesToBundle.foreach { file => diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 8eb2ba1e8683b..f7303f50036fb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -242,7 +242,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logError("Exception encountered when attempting to update last scan time", e) lastScanTime } finally { - fs.delete(path) + if (!fs.delete(path)) { + logWarning(s"Error deleting ${path.getPath()}") + } } } @@ -405,7 +407,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val path = new Path(logDir, attempt.logPath) if (fs.exists(path)) { - fs.delete(path, true) + if (!fs.delete(path, true)) { + logWarning(s"Error deleting ${path.getPath()}") + } } } catch { case e: AccessControlException => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index aa379d4cd61e7..13d96b246ff65 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -45,7 +45,10 @@ private[master] class FileSystemPersistenceEngine( } override def unpersist(name: String): Unit = { - new File(dir + File.separator + name).delete() + File f = new File(dir + File.separator + name) + if (!f.delete()) { + logWarning(s"Error deleting ${f.getPath()}") + } } override def read[T: ClassTag](prefix: String): Seq[T] = { From 411a7c1dc41f923a3dcd5f7f9b5fec76245ca5ea Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 14:48:46 -0700 Subject: [PATCH 05/18] Add check for return value from file.delete() --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- .../scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala | 4 +++- .../org/apache/spark/rdd/ReliableRDDCheckpointData.scala | 4 +++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f7303f50036fb..b13df01c5588d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -408,7 +408,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val path = new Path(logDir, attempt.logPath) if (fs.exists(path)) { if (!fs.delete(path, true)) { - logWarning(s"Error deleting ${path.getPath()}") + logWarning(s"Error deleting ${path.getPath()}") } } } catch { diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 1c3b5da19ceba..b210d9d3c4fa0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -144,7 +144,9 @@ private[spark] object ReliableCheckpointRDD extends Logging { } else { // Some other copy of this task must've finished before us and renamed it logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") - fs.delete(tempOutputPath, false) + if (!fs.delete(tempOutputPath, false)) { + logWarning("Error deleting ${tempOutputPath.getPath()}") + } } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index e9f6060301ba3..08aa4b55e73ac 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -101,7 +101,9 @@ private[spark] object ReliableRDDCheckpointData { checkpointPath(sc, rddId).foreach { path => val fs = path.getFileSystem(sc.hadoopConfiguration) if (fs.exists(path)) { - fs.delete(path, true) + if (!fs.delete(path, true)) { + logWarning("Error deleting ${path.getPath()}") + } } } } From e44577c7e43b6ef326eb20f3b0383bf58cced023 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 15:06:11 -0700 Subject: [PATCH 06/18] Add check for return value from file.delete() --- .../org/apache/spark/rdd/ReliableCheckpointRDD.scala | 2 +- .../org/apache/spark/scheduler/EventLoggingListener.scala | 8 ++++++-- .../spark/scheduler/cluster/SimrSchedulerBackend.scala | 4 +++- .../apache/spark/shuffle/FileShuffleBlockResolver.scala | 5 ++++- .../apache/spark/shuffle/IndexShuffleBlockResolver.scala | 8 ++++++-- .../network/shuffle/ExternalShuffleBlockResolver.java | 2 +- 6 files changed, 21 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index b210d9d3c4fa0..823de381415a3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -145,7 +145,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { // Some other copy of this task must've finished before us and renamed it logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") if (!fs.delete(tempOutputPath, false)) { - logWarning("Error deleting ${tempOutputPath.getPath()}") + logWarning(s"Error deleting ${tempOutputPath.getPath()}") } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 5a06ef02f5c57..000a021a528cf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -109,7 +109,9 @@ private[spark] class EventLoggingListener( if (shouldOverwrite && fileSystem.exists(path)) { logWarning(s"Event log $path already exists. Overwriting...") - fileSystem.delete(path, true) + if (!fileSystem.delete(path, true)) { + logWarning(s"Error deleting $path") + } } /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). @@ -216,7 +218,9 @@ private[spark] class EventLoggingListener( if (fileSystem.exists(target)) { if (shouldOverwrite) { logWarning(s"Event log $target already exists. Overwriting...") - fileSystem.delete(target, true) + if (!fileSystem.delete(target, true)) { + logWarning(s"Error deleting $target") + } } else { throw new IOException("Target log file already exists (%s)".format(logPath)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 0324c9dab910b..641638a77d5f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -65,7 +65,9 @@ private[spark] class SimrSchedulerBackend( override def stop() { val conf = SparkHadoopUtil.get.newConfiguration(sc.conf) val fs = FileSystem.get(conf) - fs.delete(new Path(driverFilePath), false) + if (!fs.delete(new Path(driverFilePath), false)) { + logWarning(s"error deleting ${driverFilePath}") + } super.stop() } diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index d9902f96dfd4e..cd253a78c2b19 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -127,7 +127,10 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) case Some(state) => for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) { val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) - blockManager.diskBlockManager.getFile(blockId).delete() + val file = blockManager.diskBlockManager.getFile(blockId) + if (!file.delete()) { + logWarning(s"Error deleting ${file.getPath()}") + } } logInfo("Deleted all files for shuffle " + shuffleId) true diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index d0163d326dba7..67e5f99896467 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -60,12 +60,16 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB def removeDataByMap(shuffleId: Int, mapId: Int): Unit = { var file = getDataFile(shuffleId, mapId) if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning(s"Error deleting data ${file.getPath()}") + } } file = getIndexFile(shuffleId, mapId) if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning(s"Error deleting index ${file.getPath()}") + } } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 822b46c1f2629..1a4fb890cd372 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -120,7 +120,7 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF } } if (!registeredExecutorFile.delete()) { - logger.error("error deleting " + registeredExecutorFile.getPath()); + logger.error("error deleting " + registeredExecutorFile.getPath()); } options.createIfMissing(true); try { From 740de99a25a53e66f5214718e8c861a9681b3c8b Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 15:19:46 -0700 Subject: [PATCH 07/18] Fix compilation error --- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 +- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++-- .../spark/deploy/master/FileSystemPersistenceEngine.scala | 2 +- .../org/apache/spark/rdd/ReliableRDDCheckpointData.scala | 2 +- .../org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 8cb6468c6af8a..f4115a8331802 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -839,7 +839,7 @@ private class PythonAccumulatorParam(@transient private val serverHost: String, * write the data into disk after deserialization, then Python can read it from disks. */ // scalastyle:off no.finalize -private[spark] class PythonBroadcast(@transient var path: String) extends Serializable { +private[spark] class PythonBroadcast(@transient var path: String) extends Serializable, Logging { /** * Read data from disks, then copy it to `out` diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b13df01c5588d..5eb8adf97d90b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -243,7 +243,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) lastScanTime } finally { if (!fs.delete(path)) { - logWarning(s"Error deleting ${path.getPath()}") + logWarning(s"Error deleting ${path}") } } } @@ -408,7 +408,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val path = new Path(logDir, attempt.logPath) if (fs.exists(path)) { if (!fs.delete(path, true)) { - logWarning(s"Error deleting ${path.getPath()}") + logWarning(s"Error deleting ${path}") } } } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 13d96b246ff65..1aa8cd5013b49 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -45,7 +45,7 @@ private[master] class FileSystemPersistenceEngine( } override def unpersist(name: String): Unit = { - File f = new File(dir + File.separator + name) + val f = new File(dir + File.separator + name) if (!f.delete()) { logWarning(s"Error deleting ${f.getPath()}") } diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index 08aa4b55e73ac..8b98ffcebfbb7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -89,7 +89,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v } -private[spark] object ReliableRDDCheckpointData { +private[spark] object ReliableRDDCheckpointData with Logging { /** Return the path of the directory to which this RDD's checkpoint data is written. */ def checkpointPath(sc: SparkContext, rddId: Int): Option[Path] = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 67e5f99896467..27bec76666649 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -40,7 +40,7 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID */ // Note: Changes to the format in this file should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData(). -private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver { +private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver, Logging { private lazy val blockManager = SparkEnv.get.blockManager From a9148c0f7c080fc914a6e3a3fdafeac33383a6b3 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 15:44:05 -0700 Subject: [PATCH 08/18] fix compilation error --- .../src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 3 ++- .../scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala | 2 +- .../org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f4115a8331802..9506950e98b2f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -839,7 +839,8 @@ private class PythonAccumulatorParam(@transient private val serverHost: String, * write the data into disk after deserialization, then Python can read it from disks. */ // scalastyle:off no.finalize -private[spark] class PythonBroadcast(@transient var path: String) extends Serializable, Logging { +private[spark] class PythonBroadcast(@transient var path: String) extends Serializable + with Logging { /** * Read data from disks, then copy it to `out` diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index 8b98ffcebfbb7..0a3ec3abcf764 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -89,7 +89,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v } -private[spark] object ReliableRDDCheckpointData with Logging { +private[spark] object ReliableRDDCheckpointData extends Logging { /** Return the path of the directory to which this RDD's checkpoint data is written. */ def checkpointPath(sc: SparkContext, rddId: Int): Option[Path] = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 27bec76666649..1e64722d30218 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -40,7 +40,8 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID */ // Note: Changes to the format in this file should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData(). -private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver, Logging { +private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver + with Logging { private lazy val blockManager = SparkEnv.get.blockManager From 68edb55f36cba86340c84eb94e39069e4933ee5e Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 16:05:07 -0700 Subject: [PATCH 09/18] Add import for Logging --- .../main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala | 2 +- .../org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 823de381415a3..a69be6a068bbf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -145,7 +145,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { // Some other copy of this task must've finished before us and renamed it logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") if (!fs.delete(tempOutputPath, false)) { - logWarning(s"Error deleting ${tempOutputPath.getPath()}") + logWarning(s"Error deleting ${tempOutputPath}") } } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 1e64722d30218..65887d119d65a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -21,7 +21,7 @@ import java.io._ import com.google.common.io.ByteStreams -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.{SparkConf, SparkEnv, Logging} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.storage._ From 8fface7cab7c07f2c9d6b87a655c38de6779d062 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 17:51:59 -0700 Subject: [PATCH 10/18] Change error to warn --- .../util/collection/unsafe/sort/UnsafeSorterSpillReader.java | 2 +- .../spark/network/shuffle/ExternalShuffleBlockResolver.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index c43010ecf9976..566a3621829f8 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -77,7 +77,7 @@ public void loadNext() throws IOException { if (numRecordsRemaining == 0) { in.close(); if (!file.delete() && file.exists()) { - logger.error("Unable to delete spill file {}", file.getPath()); + logger.warn("Unable to delete spill file {}", file.getPath()); } in = null; din = null; diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 1a4fb890cd372..3bc897c499108 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -115,12 +115,12 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF if (registeredExecutorFile.isDirectory()) { for (File f : registeredExecutorFile.listFiles()) { if (!f.delete()) { - logger.error("error deleting " + f.getPath()); + logger.warn("error deleting " + f.getPath()); } } } if (!registeredExecutorFile.delete()) { - logger.error("error deleting " + registeredExecutorFile.getPath()); + logger.warn("error deleting " + registeredExecutorFile.getPath()); } options.createIfMissing(true); try { From d0cdd1249148580f280d36cb518b9029bb904aaf Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Sep 2015 18:08:35 -0700 Subject: [PATCH 11/18] Add check for return value from file.delete() --- .../main/scala/org/apache/spark/storage/DiskStore.scala | 8 ++++++-- .../spark/util/collection/ExternalAppendOnlyMap.scala | 8 ++++++-- .../org/apache/spark/util/collection/ExternalSorter.scala | 4 +++- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index feb9533604ffb..7dad464013c81 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -86,7 +86,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } catch { case e: Throwable => if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning("Error deleting ${file}") + } } throw e } @@ -155,7 +157,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc override def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) if (file.exists()) { - file.delete() + f (!file.delete()) { + logWarning("Error deleting ${file}") + } } else { false } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index f929b12606f0a..545b2cb40aab2 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -208,7 +208,9 @@ class ExternalAppendOnlyMap[K, V, C]( writer.revertPartialWritesAndClose() } if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning("Error deleting ${file}") + } } } } @@ -489,7 +491,9 @@ class ExternalAppendOnlyMap[K, V, C]( fileStream = null } if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning("Error deleting ${file}") + } } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 2a30f751ff03d..b52b2daec5679 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -318,7 +318,9 @@ private[spark] class ExternalSorter[K, V, C]( writer.revertPartialWritesAndClose() } if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning("Error deleting ${file}") + } } } } From 2b4bdaca4b0f473baa38677be6daa638429179cc Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 22 Sep 2015 06:28:10 -0700 Subject: [PATCH 12/18] Address review comments --- .../util/collection/unsafe/sort/UnsafeSorterSpillReader.java | 2 +- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 +- .../spark/network/shuffle/ExternalShuffleBlockResolver.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 566a3621829f8..ac1101aeab182 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -32,7 +32,7 @@ * of the file format). */ final class UnsafeSorterSpillReader extends UnsafeSorterIterator { - private final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class); + private final static Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class); private final File file; private InputStream in; diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 9506950e98b2f..fab258f3e0639 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -877,7 +877,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial val file = new File(path) if (file.exists()) { if (!file.delete()) { - logWarning("Error deleting " + file.getPath()) + logWarning("Error deleting ${file.getPath}") } } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 3bc897c499108..0d4dd6afac769 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -115,12 +115,12 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF if (registeredExecutorFile.isDirectory()) { for (File f : registeredExecutorFile.listFiles()) { if (!f.delete()) { - logger.warn("error deleting " + f.getPath()); + logger.warn("error deleting {}", f.getPath()); } } } if (!registeredExecutorFile.delete()) { - logger.warn("error deleting " + registeredExecutorFile.getPath()); + logger.warn("error deleting {}", registeredExecutorFile.getPath()); } options.createIfMissing(true); try { From 84e74671c3f29c3626dd9898387781dcc9a94bf4 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 22 Sep 2015 06:50:07 -0700 Subject: [PATCH 13/18] Fix typo --- core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 7dad464013c81..96fca7c4ddd39 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -157,7 +157,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc override def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) if (file.exists()) { - f (!file.delete()) { + if (!file.delete()) { logWarning("Error deleting ${file}") } } else { From 47640f6bf50ade32ef651d70c8915613a67a0454 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 22 Sep 2015 07:15:48 -0700 Subject: [PATCH 14/18] Address compilation error --- core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 96fca7c4ddd39..883d30e881a38 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -158,7 +158,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc val file = diskManager.getFile(blockId.name) if (file.exists()) { if (!file.delete()) { - logWarning("Error deleting ${file}") + logWarning("Error deleting ${file.getPath()}") } } else { false From dc069768b704f4873eac81a7f4adf625d373b9d9 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 22 Sep 2015 09:02:32 -0700 Subject: [PATCH 15/18] Address review comments --- .../main/scala/org/apache/spark/storage/DiskStore.scala | 8 +++++--- .../spark/util/collection/ExternalAppendOnlyMap.scala | 4 ++-- .../org/apache/spark/util/collection/ExternalSorter.scala | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 883d30e881a38..c008b9dc16327 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -87,7 +87,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc case e: Throwable => if (file.exists()) { if (!file.delete()) { - logWarning("Error deleting ${file}") + logWarning(s"Error deleting ${file}") } } throw e @@ -157,9 +157,11 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc override def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) if (file.exists()) { - if (!file.delete()) { - logWarning("Error deleting ${file.getPath()}") + val ret = file.delete() + if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") } + ret } else { false } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 545b2cb40aab2..29c5732f5a8c1 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -209,7 +209,7 @@ class ExternalAppendOnlyMap[K, V, C]( } if (file.exists()) { if (!file.delete()) { - logWarning("Error deleting ${file}") + logWarning(s"Error deleting ${file}") } } } @@ -492,7 +492,7 @@ class ExternalAppendOnlyMap[K, V, C]( } if (file.exists()) { if (!file.delete()) { - logWarning("Error deleting ${file}") + logWarning(s"Error deleting ${file}") } } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index b52b2daec5679..749be34d8e8fd 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -319,7 +319,7 @@ private[spark] class ExternalSorter[K, V, C]( } if (file.exists()) { if (!file.delete()) { - logWarning("Error deleting ${file}") + logWarning(s"Error deleting ${file}") } } } From 2885c7018d7619cfb58dee32bbd1845e7be536d7 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 22 Sep 2015 12:37:47 -0700 Subject: [PATCH 16/18] Address review comments --- .../util/collection/unsafe/sort/UnsafeSorterSpillReader.java | 2 +- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index ac1101aeab182..501dfe77d13cb 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -32,7 +32,7 @@ * of the file format). */ final class UnsafeSorterSpillReader extends UnsafeSorterIterator { - private final static Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class); + private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class); private final File file; private InputStream in; diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fab258f3e0639..a6c34bd98b9ff 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -877,7 +877,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial val file = new File(path) if (file.exists()) { if (!file.delete()) { - logWarning("Error deleting ${file.getPath}") + logWarning(s"Error deleting ${file.getPath}") } } } From c059526f768578536165cdcc003398ba81de2d31 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 22 Sep 2015 12:54:15 -0700 Subject: [PATCH 17/18] Address review comments --- .../scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index 0a3ec3abcf764..6b0dc3128d7b0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -102,7 +102,7 @@ private[spark] object ReliableRDDCheckpointData extends Logging { val fs = path.getFileSystem(sc.hadoopConfiguration) if (fs.exists(path)) { if (!fs.delete(path, true)) { - logWarning("Error deleting ${path.getPath()}") + logWarning(s"Error deleting ${path.getPath()}") } } } From 8561388ea9cbedc608f1ba0953d09c0ec06179a6 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 22 Sep 2015 13:16:20 -0700 Subject: [PATCH 18/18] Fix compilation error --- .../scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index 6b0dc3128d7b0..91cad6662e4d2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -102,7 +102,7 @@ private[spark] object ReliableRDDCheckpointData extends Logging { val fs = path.getFileSystem(sc.hadoopConfiguration) if (fs.exists(path)) { if (!fs.delete(path, true)) { - logWarning(s"Error deleting ${path.getPath()}") + logWarning(s"Error deleting ${path.toString()}") } } }