Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst

import java.sql.Timestamp

import scala.collection.JavaConverters._

import org.apache.spark.util.BoundedPriorityQueue
Expand Down Expand Up @@ -61,13 +63,23 @@ object QueryPlanningTracker {
/**
* Summary of a phase, with start time and end time so we can construct a timeline.
*/
class PhaseSummary(val startTimeMs: Long, val endTimeMs: Long) {
class PhaseSummary(
val startTimeMs: Long,
val endTimeMs: Long,
val name: String = "")
extends Serializable {

def durationMs: Long = endTimeMs - startTimeMs

override def toString: String = {
s"PhaseSummary($startTimeMs, $endTimeMs)"
}

def toFormatString: String = {
def dataTimeString(timeMs: Long): String = new Timestamp(timeMs).toLocalDateTime.toString
s"$name(Start: ${dataTimeString(startTimeMs)}, " +
s"End: ${dataTimeString(endTimeMs)}, DurationMs: $durationMs)"
}
}

/**
Expand All @@ -87,6 +99,14 @@ object QueryPlanningTracker {
localTracker.set(tracker)
try f finally { localTracker.set(originalTracker) }
}

/** Create a phase summary for the passed in function execution and name string. */
def createPhaseSummary[T](f: => T, phaseName: String = ""): (T, PhaseSummary) = {
val startTime = System.currentTimeMillis()
val ret = f
val endTime = System.currentTimeMillis
(ret, new PhaseSummary(startTime, endTime, phaseName))
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,10 @@ class SessionCatalog(
SubqueryAlias(table, db, viewDef)
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempViews.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
val (metadata, phaseSummary) =
QueryPlanningTracker.createPhaseSummary(
externalCatalog.getTable(db, table), phaseName = "LookUpRelation")
metadata.recordMetastoreOpsPhaseSummary(phaseSummary)
if (metadata.tableType == CatalogTableType.VIEW) {
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
logDebug(s"'$viewText' will be used for the view($table).")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import java.net.URI
import java.util.Date

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.QueryPlanningTracker.PhaseSummary
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -249,6 +251,14 @@ case class CatalogTable(

import CatalogTable._

/** Record all timeline between the table and the metastore. */
private val _metastoreOpsPhaseSummaries: ArrayBuffer[PhaseSummary] = ArrayBuffer.empty

def metastoreOpsPhaseSummaries: Seq[PhaseSummary] = _metastoreOpsPhaseSummaries.toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using table metadata to carry this information is not a good idea. Can we track it at the caller side?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using table metadata to carry this information is not a good idea.

Agree, as the catalog table case class used as the cache key, I also hit some problem before
f82b355 b908ecb.

Can we track it at the caller side?

How about tracking them by QueryPlanningTracker, and use the case class as the key of the phase map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key is the phase name, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key should be catalog table, it's the same problem with #23327 (comment). I'll address this after the conclusion made.


def recordMetastoreOpsPhaseSummary(elems: PhaseSummary*): Unit =
_metastoreOpsPhaseSummaries.append(elems: _*)

/**
* schema of this table's partition columns
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {

// Metadata that describes more details of this scan.
protected def metadata: Map[String, String]
// Metadata that should not be abbreviate.
protected val needExpandMetadataKey: Set[String] = Set.empty

override def simpleString: String = {
val metadataEntries = metadata.toSeq.sorted.map {
case (key, value) =>
key + ": " + StringUtils.abbreviate(redact(value), 100)
if (needExpandMetadataKey.contains(key)) {
key + ": " + redact(value)
} else {
key + ": " + StringUtils.abbreviate(redact(value), 100)
}
}
val metadataStr = truncatedString(metadataEntries, " ", ", ", "")
s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]")}$metadataStr"
Expand Down Expand Up @@ -180,13 +186,20 @@ case class FileSourceScanExec(
metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
}

@transient private[sql] lazy val metastoreOpsPhaseSummary = relation.location match {
case fileIndex: CatalogFileIndex if fileIndex.table.metastoreOpsPhaseSummaries.nonEmpty =>
fileIndex.table.metastoreOpsPhaseSummaries
case fileIndex: InMemoryFileIndex if fileIndex.metastoreOpsPhaseSummaries.nonEmpty =>
fileIndex.metastoreOpsPhaseSummaries
case _ => Seq.empty
}

@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
driverMetrics("metadataTime") = timeTakenMs
val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
driverMetrics("metadataTime") = timeTakenMs + metastoreOpsPhaseSummary.map(_.durationMs).sum
ret
}

Expand Down Expand Up @@ -261,6 +274,8 @@ case class FileSourceScanExec(
private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")

override val needExpandMetadataKey: Set[String] = Set("MetastoreOperationPhaseSummary")

override lazy val metadata: Map[String, String] = {
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
val location = relation.location
Expand All @@ -274,7 +289,9 @@ case class FileSourceScanExec(
"PartitionFilters" -> seqToString(partitionFilters),
"PushedFilters" -> seqToString(pushedDownFilters),
"DataFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)
"Location" -> locationDesc,
"MetastoreOperationPhaseSummary" ->
seqToString(metastoreOpsPhaseSummary.map(_.toFormatString)))
val withOptPartitionCount =
relation.partitionSchemaOption.map { _ =>
metadata + ("PartitionCount" -> selectedPartitions.size.toString)
Expand Down Expand Up @@ -325,7 +342,7 @@ case class FileSourceScanExec(
override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
"metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time"),
"metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ object SQLExecution {
withSQLConfPropagated(sparkSession) {
var ex: Option[Exception] = None
val startTime = System.nanoTime()
val planDescStr = queryExecution.toString
try {
sc.listenerBus.post(SparkListenerSQLExecutionStart(
executionId = executionId,
description = callSite.shortForm,
details = callSite.longForm,
physicalPlanDescription = queryExecution.toString,
physicalPlanDescription = planDescStr,
// `queryExecution.executedPlan` triggers query planning. If it fails, the exception
// will be caught and reported in the `SparkListenerSQLExecutionEnd`
sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
Expand All @@ -100,6 +101,14 @@ object SQLExecution {
event.duration = endTime - startTime
event.qe = queryExecution
event.executionFailure = ex

// Check for the physicalPlanDescription changed or not, update the UI if it changed.
val newPlanDescStr = queryExecution.toString
if (newPlanDescStr.length != planDescStr.length) {
event.planDescUpdate = Some(newPlanDescStr)
event.planInfoUpdate = Some(SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan))
}

sc.listenerBus.post(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.QueryPlanningTracker.PhaseSummary
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -69,23 +71,28 @@ class CatalogFileIndex(
*/
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
val (partitionSpec, phaseSummary) = QueryPlanningTracker.createPhaseSummary({
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
PartitionSpec(partitionSchema, partitions)
}, phaseName = "PartitionPruningInCatalogFileIndex")
table.recordMetastoreOpsPhaseSummary(phaseSummary)
new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
sparkSession, new Path(baseLocation.get),
fileStatusCache, partitionSpec, table.metastoreOpsPhaseSummaries)
} else {
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None)
sparkSession, rootPaths,
table.storage.properties,
userSpecifiedSchema = None,
metastoreOpsPhaseSummaries = table.metastoreOpsPhaseSummaries)
}
}

Expand Down Expand Up @@ -114,10 +121,11 @@ private class PrunedInMemoryFileIndex(
tableBasePath: Path,
fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec,
override val metadataOpsTimeNs: Option[Long])
metastoreOpsPhaseSummaries: Seq[PhaseSummary])
extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
Map.empty,
Some(partitionSpec.partitionColumns),
fileStatusCache)
fileStatusCache,
metastoreOpsPhaseSummaries)
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,4 @@ trait FileIndex {

/** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
def partitionSchema: StructType

/**
* Returns an optional metadata operation time, in nanoseconds, for listing files.
*
* We do file listing in query optimization (in order to get the proper statistics) and we want
* to account for file listing time in physical execution (as metrics). To do that, we save the
* file listing time in some implementations and physical execution calls it in this method
* to update the metrics.
*/
def metadataOpsTimeNs: Option[Long] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.QueryPlanningTracker.PhaseSummary
import org.apache.spark.sql.execution.streaming.FileStreamSink
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand All @@ -49,7 +50,8 @@ class InMemoryFileIndex(
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
fileStatusCache: FileStatusCache = NoopCache,
val metastoreOpsPhaseSummaries: Seq[PhaseSummary] = Seq.empty)
extends PartitioningAwareFileIndex(
sparkSession, parameters, userSpecifiedSchema, fileStatusCache) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Status._
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
Expand Down Expand Up @@ -227,9 +227,9 @@ class SQLAppStatusListener(
}
}

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
val SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) = event
private def createAndStorePlanGraph(
executionId: Long,
planInfo: SparkPlanInfo): SparkPlanGraph = {

def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
nodes.map {
Expand All @@ -247,17 +247,25 @@ class SQLAppStatusListener(
}
}

val planGraph = SparkPlanGraph(sparkPlanInfo)
val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
node.metrics.map { metric => (metric.accumulatorId, metric) }
}.toMap.values.toList

val planGraph = SparkPlanGraph(planInfo)
val graphToStore = new SparkPlanGraphWrapper(
executionId,
toStoredNodes(planGraph.nodes),
planGraph.edges)
kvstore.write(graphToStore)

planGraph
}

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
val SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) = event

val planGraph = createAndStorePlanGraph(executionId, sparkPlanInfo)
val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
node.metrics.map { metric => (metric.accumulatorId, metric) }
}.toMap.values.toList

val exec = getOrCreateExecution(executionId)
exec.description = description
exec.details = details
Expand All @@ -273,6 +281,12 @@ class SQLAppStatusListener(
exec.metricsValues = aggregateMetrics(exec)
exec.completionTime = Some(new Date(time))
exec.endEvents += 1
// Update both physicalPlanDescription and planGraph while the description changed.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the changes in SQLAppStatusListener and SQLExecution in e29ad45 is to support metastore operation phase happened in HiveTableScanExec.rawPartitions. Different from other phases, rawPartitions was initialized in execution but the UI and plan description generated before execution. If we think it's not worth to change the execution end event, the changes and metrics for rawPartitions will revert together.

if (event.planDescUpdate.isDefined) {
exec.physicalPlanDescription = event.planDescUpdate.get
kvstore.delete(classOf[SparkPlanGraphWrapper], executionId)
createAndStorePlanGraph(executionId, event.planInfoUpdate.get)
}
update(exec)

// Remove stale LiveStageMetrics objects for stages that are not active anymore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)

// The exception object that caused this execution to fail. None if the execution doesn't fail.
@JsonIgnore private[sql] var executionFailure: Option[Exception] = None

// The latest physical plan description that updated in query execution.
@JsonIgnore private[sql] var planDescUpdate: Option[String] = None

// The latest SparkPlanInfo that updated in query execution.
@JsonIgnore private[sql] var planInfoUpdate: Option[SparkPlanInfo] = None
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
assert(isIncluded(df.queryExecution, "PushedFilters"))
assert(isIncluded(df.queryExecution, "DataFilters"))
assert(isIncluded(df.queryExecution, "Location"))
assert(isIncluded(df.queryExecution, "MetastoreOperationPhaseSummary"))
}
}

Expand Down
Loading