Skip to content

Commit

Permalink
Fix Replace Where Operation Metrics
Browse files Browse the repository at this point in the history
* ReplaceWhere is a Delta write operation mode that allows overwriting the table by appending data in the df and deleting rows that match the specified condition.
* Operation Metrics is operation based history like numRemovedFiles, numAddedFiles that is retrievable via the `DESC HISTORY` command
* A new modification of replaceWhere command was added recently that allows for non-partition commands. The replaceWhere is now implemented as an append and Delete(Delete command can write too)
* Modified the metrics collection to look at stats and also collect through UDF

replaceWhere
- writeFiles(append) -> basicWriteStatsTracker  (conflict with below when registerSQLMetrics is called)
- DeleteCommand (delete + write out rows) -> basicWriteStatsTracker

metrics from first write is not reported

- added unit tests

GitOrigin-RevId: b9c6d1036aa8d4a707fb649ab8fa0498558cc0d3
  • Loading branch information
rahulsmahadev authored and tdas committed Aug 11, 2022
1 parent d1c9a2f commit 39aff15
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.constraints.Constraint
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeIntoClause
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -66,7 +68,46 @@ object DeltaOperations {
partitionBy.map("partitionBy" -> JsonUtils.toJson(_)) ++
predicate.map("predicate" -> _)

override val operationMetrics: Set[String] = DeltaOperationMetrics.WRITE
val replaceWhereMetricsEnabled = SparkSession.active.conf.get(
DeltaSQLConf.REPLACEWHERE_METRICS_ENABLED)

override def transformMetrics(metrics: Map[String, SQLMetric]): Map[String, String] = {
// Need special handling for replaceWhere as it is implemented as a Write + Delete.
if (predicate.nonEmpty && replaceWhereMetricsEnabled) {
var strMetrics = super.transformMetrics(metrics)
// find the case where deletedRows are not captured
if (strMetrics.get("numDeletedRows").exists(_ == "0") &&
strMetrics.get("numRemovedFiles").exists(_ != "0")) {
// identify when row level metrics are unavailable. This will happen when the entire
// table or partition are deleted.
strMetrics -= "numDeletedRows"
strMetrics -= "numCopiedRows"
strMetrics -= "numAddedFiles"
}

// in the case when stats are not collected we need to remove all row based metrics
// If the DF provided to replaceWhere is an empty DataFrame and we don't have stats
// we won't return row level metrics.
if (strMetrics.get("numOutputRows").exists(_ == "0") &&
strMetrics.get("numFiles").exists(_ != 0)) {
strMetrics -= "numDeletedRows"
strMetrics -= "numOutputRows"
strMetrics -= "numCopiedRows"
}

strMetrics
} else {
super.transformMetrics(metrics)
}
}

override val operationMetrics: Set[String] = if (predicate.isEmpty ||
!replaceWhereMetricsEnabled) {
DeltaOperationMetrics.WRITE
} else {
// Need special handling for replaceWhere as rows/files are deleted as well.
DeltaOperationMetrics.WRITE_REPLACE_WHERE
}
override def changesData: Boolean = true
}
/** Recorded during streaming inserts. */
Expand Down Expand Up @@ -428,6 +469,24 @@ private[delta] object DeltaOperationMetrics {
"rewriteTimeMs" // time taken to rewrite the matched files
)

val WRITE_REPLACE_WHERE = Set(
"numFiles", // number of files written
"numOutputBytes", // size in bytes of the written
"numOutputRows", // number of rows written
"numRemovedFiles", // number of files removed
"numAddedChangeFiles", // number of CDC files
"numDeletedRows", // number of rows removed
"numCopiedRows" // number of rows copied in the process of deleting files
)

val WRITE_REPLACE_WHERE_PARTITIONS = Set(
"numFiles", // number of files written
"numOutputBytes", // size in bytes of the written contents
"numOutputRows", // number of rows written
"numAddedChangeFiles", // number of CDC files
"numRemovedFiles" // number of files removed
)

/**
* Deleting the entire table or partition would prevent row level metrics from being recorded.
* This is used only in test to verify specific delete cases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,22 @@ case class AddFile(
}
}

@JsonIgnore
@transient
lazy val numLogicalRecords: Option[Long] = {
if (stats == null || stats.isEmpty) {
None
} else {
val node = new ObjectMapper().readTree(stats)
if (node.has("numRecords") && !node.get("numRecords").isNull) {
var numRecordsInFile = node.get("numRecords").asLong()
Some(numRecordsInFile)
} else {
None
}
}
}

}

object AddFile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}
import org.apache.spark.sql.types.{StringType, StructType}

Expand Down Expand Up @@ -117,6 +118,73 @@ case class WriteIntoDelta(
case _ => dt
}

/**
* Replace where operationMetrics need to be recorded separately.
* @param newFiles - AddFile and AddCDCFile added by write job
* @param deleteActions - AddFile, RemoveFile, AddCDCFile added by Delete job
*/
private def registerReplaceWhereMetrics(
spark: SparkSession,
txn: OptimisticTransaction,
newFiles: Seq[Action],
deleteActions: Seq[Action]): Unit = {
var numFiles = 0L
var numCopiedRows = 0L
var numOutputBytes = 0L
var numNewRows = 0L
var numAddedChangedFiles = 0L
var hasRowLevelMetrics = true

newFiles.foreach {
case a: AddFile =>
numFiles += 1
numOutputBytes += a.size
if (a.numRecords.isEmpty) {
hasRowLevelMetrics = false
} else {
numNewRows += a.numRecords.get
}
case cdc: AddCDCFile =>
numAddedChangedFiles += 1
case _ =>
}

deleteActions.foreach {
case a: AddFile =>
numFiles += 1
numOutputBytes += a.size
if (a.numRecords.isEmpty) {
hasRowLevelMetrics = false
} else {
numCopiedRows += a.numRecords.get
}
case cdc: AddCDCFile =>
numAddedChangedFiles += 1
// Remove metrics will be handled by the delete command.
case _ =>
}

var sqlMetrics = Map(
"numFiles" -> new SQLMetric("number of files written", numFiles),
"numOutputBytes" -> new SQLMetric("number of output bytes", numOutputBytes),
"numAddedChangeFiles" -> new SQLMetric(
"number of change files added", numAddedChangedFiles)
)
if (hasRowLevelMetrics) {
sqlMetrics ++= Map(
"numOutputRows" -> new SQLMetric("number of rows added", numNewRows + numCopiedRows),
"numCopiedRows" -> new SQLMetric("number of copied rows", numCopiedRows)
)
} else {
// this will get filtered out in DeltaOperations.WRITE transformMetrics
sqlMetrics ++= Map(
"numOutputRows" -> new SQLMetric("number of rows added", 0L),
"numCopiedRows" -> new SQLMetric("number of copied rows", 0L)
)
}
txn.registerSQLMetrics(spark, sqlMetrics)
}

def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
import sparkSession.implicits._
if (txn.readVersion > -1) {
Expand Down Expand Up @@ -263,6 +331,12 @@ case class WriteIntoDelta(
(newFiles, newFiles.collect { case a: AddFile => a }, Nil)
}

// Need to handle replace where metrics separately.
if (replaceWhere.nonEmpty && replaceOnDataColsEnabled &&
sparkSession.conf.get(DeltaSQLConf.REPLACEWHERE_METRICS_ENABLED)) {
registerReplaceWhereMetrics(sparkSession, txn, newFiles, deletedFiles)
}

val fileActions = if (rearrangeOnly) {
val changeFiles = newFiles.collect { case c: AddCDCFile => c }
if (changeFiles.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,17 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val REPLACEWHERE_METRICS_ENABLED =
buildConf("replaceWhere.dataColumns.metrics.enabled")
.internal()
.doc(
"""
|When enabled, replaceWhere operations metrics on arbitrary expression and
|arbitrary columns is enabled. This will not report row level metrics for partitioned
|tables and tables with no stats.""".stripMargin)
.booleanConf
.createWithDefault(true)

val REPLACEWHERE_CONSTRAINT_CHECK_ENABLED =
buildConf("replaceWhere.constraintCheck.enabled")
.doc(
Expand Down
Loading

0 comments on commit 39aff15

Please sign in to comment.