Skip to content

Commit

Permalink
[Spark]Add additional metrics regarding Vacuum to get better visibility
Browse files Browse the repository at this point in the history
This change adds additional metrics regarding Vacuum to get better visibility

Closes #2534

GitOrigin-RevId: d125486836b167c63fa0b5a0f3535fc4cfc04274
  • Loading branch information
rajeshparangi authored and allisonport-db committed Jan 31, 2024
1 parent 4308771 commit ef751d2
Showing 1 changed file with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
clock: Clock = new SystemClock): DataFrame = {
recordDeltaOperation(deltaLog, "delta.gc") {

val vacuumStartTime = System.currentTimeMillis()
val path = deltaLog.dataPath
val deltaHadoopConf = deltaLog.newDeltaHadoopConf()
val fs = path.getFileSystem(deltaHadoopConf)
Expand Down Expand Up @@ -210,6 +211,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
org.apache.spark.sql.Encoders.product[FileNameAndSize]

val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path
val filesAndDirsPresentBeforeDelete = allFilesAndDirs.count()

// The logic below is as follows:
// 1. We take all the files and directories listed in our reservoir
Expand Down Expand Up @@ -264,6 +266,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val timeTakenToIdentifyEligibleFiles =
System.currentTimeMillis() - startTimeToIdentifyEligibleFiles


val numFiles = diffFiles.count()
if (dryRun) {
val stats = DeltaVacuumStats(
Expand All @@ -272,14 +275,19 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
defaultRetentionMillis = snapshotTombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete,
objectsDeleted = numFiles,
sizeOfDataToDelete = sizeOfDataToDelete,
timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
timeTakenForDelete = 0L)
timeTakenForDelete = 0L,
vacuumStartTime = vacuumStartTime,
vacuumEndTime = System.currentTimeMillis,
numPartitionColumns = partitionColumns.size
)

recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
s"a total of $dirCounts directories that are safe to delete.")
logInfo(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
s"a total of $dirCounts directories that are safe to delete. Vacuum stats: $stats")

return diffFiles.map(f => stringToPath(f).toString).toDF("path")
}
Expand Down Expand Up @@ -308,12 +316,18 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
defaultRetentionMillis = snapshotTombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete,
objectsDeleted = filesDeleted,
sizeOfDataToDelete = sizeOfDataToDelete,
timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
timeTakenForDelete = timeTakenForDelete)
timeTakenForDelete = timeTakenForDelete,
vacuumStartTime = vacuumStartTime,
vacuumEndTime = System.currentTimeMillis,
numPartitionColumns = partitionColumns.size)
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))
logInfo(s"Deleted $filesDeleted files ($sizeOfDataToDelete bytes) and directories in " +
s"a total of $dirCounts directories. Vacuum stats: $stats")


spark.createDataset(Seq(basePath)).toDF("path")
Expand Down Expand Up @@ -576,7 +590,12 @@ case class DeltaVacuumStats(
defaultRetentionMillis: Long,
minRetainedTimestamp: Long,
dirsPresentBeforeDelete: Long,
filesAndDirsPresentBeforeDelete: Long,
objectsDeleted: Long,
sizeOfDataToDelete: Long,
timeTakenToIdentifyEligibleFiles: Long,
timeTakenForDelete: Long)
timeTakenForDelete: Long,
vacuumStartTime: Long,
vacuumEndTime: Long,
numPartitionColumns: Long
)

0 comments on commit ef751d2

Please sign in to comment.