From 39719d8e1a8004dfecd4a1724a548b147b60780a Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Fri, 18 Sep 2020 10:24:29 -0400 Subject: [PATCH] Added config option to enable parallel deletes for vacuum command Signed-off-by: Adam Binford --- .../sql/delta/commands/VacuumCommand.scala | 29 +++++++++++++++---- .../sql/delta/sources/DeltaSQLConf.scala | 8 +++++ .../spark/sql/delta/DeltaVacuumSuite.scala | 20 +++++++++++++ 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 84525c8d7c..8509c0bb18 100644 --- a/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} @@ -43,7 +44,7 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} * will be ignored. Then we take a diff of the files and delete directories that were already empty, * and all files that are within the table that are no longer tracked. */ -object VacuumCommand extends VacuumCommandImpl { +object VacuumCommand extends VacuumCommandImpl with Serializable { /** * Additional check on retention duration to prevent people from shooting themselves in the foot. @@ -120,6 +121,8 @@ object VacuumCommand extends VacuumCommandImpl { new SerializableConfiguration(sessionHadoopConf)) val basePath = fs.makeQualified(path).toString var isBloomFiltered = false + val parallelDeleteEnabled = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_ENABLED) val validFiles = snapshot.state .mapPartitions { actions => @@ -224,7 +227,7 @@ object VacuumCommand extends VacuumCommandImpl { } logInfo(s"Deleting untracked files and empty directories in $path") - val filesDeleted = delete(diff, fs) + val filesDeleted = delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled) val stats = DeltaVacuumStats( isDryRun = false, @@ -270,9 +273,25 @@ trait VacuumCommandImpl extends DeltaCommand { /** * Attempts to delete the list of candidate files. Returns the number of files deleted. */ - protected def delete(diff: Dataset[String], fs: FileSystem): Long = { - val fileResultSet = diff.toLocalIterator().asScala - fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f)) + protected def delete( + diff: Dataset[String], + spark: SparkSession, + basePath: String, + hadoopConf: Broadcast[SerializableConfiguration], + parallel: Boolean): Long = { + import spark.implicits._ + if (parallel) { + diff.mapPartitions { files => + val fs = new Path(basePath).getFileSystem(hadoopConf.value.value) + val filesDeletedPerPartition = + files.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f)) + Iterator(filesDeletedPerPartition) + }.reduce(_ + _) + } else { + val fs = new Path(basePath).getFileSystem(hadoopConf.value.value) + val fileResultSet = diff.toLocalIterator().asScala + fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f)) + } } protected def stringToPath(path: String): Path = new Path(new URI(path)) diff --git a/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index e9f10aa689..6b78fa600c 100644 --- a/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -230,6 +230,14 @@ object DeltaSQLConf { .booleanConf .createWithDefault(true) + val DELTA_VACUUM_PARALLEL_DELETE_ENABLED = + buildConf("vacuum.parallelDelete.enabled") + .doc("Enables parallelizing the deletion of files during a vacuum command. Enabling " + + "may result hitting rate limits on some storage backends. When enabled, parallelization " + + "is controlled by the default number of shuffle partitions.") + .booleanConf + .createWithDefault(false) + val DELTA_SCHEMA_AUTO_MIGRATE = buildConf("schema.autoMerge.enabled") .doc("If true, enables schema merging on appends and on overwrites.") diff --git a/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala b/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala index 60fa725c0e..42ccbb5fc4 100644 --- a/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala +++ b/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala @@ -242,6 +242,26 @@ trait DeltaVacuumSuiteBase extends QueryTest } } + test("parallel file delete") { + withEnvironment { (tempDir, clock) => + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath, clock) + withSQLConf("spark.databricks.delta.vacuum.parallelDelete.enabled" -> "true") { + gcTest(deltaLog, clock)( + CreateFile("file1.txt", commitToActionLog = true), + CreateFile("file2.txt", commitToActionLog = true), + LogicallyDeleteFile("file1.txt"), + CheckFiles(Seq("file1.txt", "file2.txt")), + AdvanceClock(defaultTombstoneInterval + 1000), + GC(dryRun = false, Seq(tempDir)), + CheckFiles(Seq("file1.txt"), exist = false), + CheckFiles(Seq("file2.txt")), + GC(dryRun = false, Seq(tempDir)), // shouldn't throw an error with no files to delete + CheckFiles(Seq("file2.txt")) + ) + } + } + } + test("retention duration must be greater than 0") { withEnvironment { (tempDir, clock) => val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath, clock)