From 151015a0839e1ecfcce9b3668fd35ace2626dd41 Mon Sep 17 00:00:00 2001 From: Long Tian Date: Tue, 26 Jun 2018 16:46:52 +0800 Subject: [PATCH] [SPARK-22425][CORE][SQL] record inputs/outputs that imported/generated by DataFrameReader/DataFrameWriter to event log --- .../spark/scheduler/SparkListener.scala | 18 ++++++++++++ .../spark/status/AppStatusListener.scala | 29 +++++++++++++++++++ .../apache/spark/status/AppStatusStore.scala | 8 +++++ .../org/apache/spark/status/storeTypes.scala | 12 ++++++++ .../org/apache/spark/util/JsonProtocol.scala | 19 +++++++++++- .../apache/spark/sql/DataFrameReader.scala | 29 ++++++++++++++----- .../apache/spark/sql/DataFrameWriter.scala | 8 +++++ 7 files changed, 114 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 8a112f6a37b96..18ef294b7ad60 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -185,6 +185,24 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent @DeveloperApi case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent +/** + * An internal class that describes the input data of an event log. + */ +@DeveloperApi +case class SparkListenerInputUpdate(format: String, + options: Map[String, String], + locations: Seq[String] = Seq.empty[String]) + extends SparkListenerEvent + +/** + * An internal class that describes the non-table output of an event log. + */ +@DeveloperApi +case class SparkListenerOutputUpdate(format: String, + mode: String, + options: Map[String, String]) + extends SparkListenerEvent + /** * Interface for listening to events from the Spark scheduler. Most applications should probably * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class. diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5ea161cd0d151..1798062708f6c 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -19,6 +19,7 @@ package org.apache.spark.status import java.util.Date import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong import java.util.function.Function import scala.collection.JavaConverters._ @@ -73,6 +74,10 @@ private[spark] class AppStatusListener( // around liveExecutors. @volatile private var activeExecutorCount = 0 + private val inputDataSetId = new AtomicLong(0) + private val outputDataSetId = new AtomicLong(0) + private val maxRecords = conf.getInt("spark.data.maxRecords", 1000) + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) { count => cleanupExecutors(count) } @@ -92,9 +97,33 @@ private[spark] class AppStatusListener( override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case SparkListenerLogStart(version) => sparkVersion = version + case update: SparkListenerInputUpdate => + mayUpdateInput(update) + case update: SparkListenerOutputUpdate => + mayUpdateOutput(update) case _ => } + private def mayUpdateInput(inputUpdateEvent: SparkListenerInputUpdate): Unit = { + val id = inputDataSetId.getAndIncrement + if (id < maxRecords) { + kvstore.write(new InputDataWrapper(id, + inputUpdateEvent.format, + inputUpdateEvent.options.toMap, + inputUpdateEvent.locations)) + } + } + + private def mayUpdateOutput(outputUpdateEvent: SparkListenerOutputUpdate): Unit = { + val id = outputDataSetId.getAndIncrement + if (id < maxRecords) { + kvstore.write(new OutputDataWrapper(id, + outputUpdateEvent.format, + outputUpdateEvent.mode, + outputUpdateEvent.options.toMap)) + } + } + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { assert(event.appId.isDefined, "Application without IDs are not supported.") diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 688f25a9fdea1..ddb022973343a 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -482,6 +482,14 @@ private[spark] class AppStatusStore( } } + def inputs(): Seq[InputDataWrapper] = { + store.view(classOf[InputDataWrapper]).asScala.toSeq + } + + def outputs(): Seq[OutputDataWrapper] = { + store.view(classOf[OutputDataWrapper]).asScala.toSeq + } + def pool(name: String): PoolData = { store.read(classOf[PoolData], name) } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 646cf25880e37..17cb28a2c25f6 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -438,6 +438,18 @@ private[spark] class AppSummary( } +private[spark] class InputDataWrapper( + @KVIndexParam @JsonIgnore val inputSetId: Long, + val format: String, + val options: Map[String, String], + val locations: Seq[String]) + +private[spark] class OutputDataWrapper( + @KVIndexParam @JsonIgnore val outputSetId: Long, + val format: String, + val mode: String, + val options: Map[String, String]) + /** * A cached view of a specific quantile for one stage attempt's metrics. */ diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 50c6461373dee..43714e8c78797 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -252,6 +252,20 @@ private[spark] object JsonProtocol { ("Block Updated Info" -> blockUpdatedInfo) } + def inputEventFromJson(json: JValue): SparkListenerInputUpdate = { + val inputFormat = (json \ "format").extract[String] + val options = mapFromJson(json \ "options") + val locations = (json \ "locations").extract[Seq[JValue]].map(_.toString) + SparkListenerInputUpdate(inputFormat, options, locations) + } + + def outputEventFromJson(json: JValue): SparkListenerOutputUpdate = { + val mode = (json \ "mode").extract[String] + val outputFormat = (json \ "format").extract[String] + val options = mapFromJson(json \ "options") + SparkListenerOutputUpdate(outputFormat, mode, options) + } + /** ------------------------------------------------------------------- * * JSON serialization methods for classes SparkListenerEvents depend on | * -------------------------------------------------------------------- */ @@ -508,7 +522,6 @@ private[spark] object JsonProtocol { ("Stack Trace" -> stackTraceToJson(exception.getStackTrace)) } - /** --------------------------------------------------- * * JSON deserialization methods for SparkListenerEvents | * ---------------------------------------------------- */ @@ -532,6 +545,8 @@ private[spark] object JsonProtocol { val logStart = Utils.getFormattedClassName(SparkListenerLogStart) val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate) val blockUpdate = Utils.getFormattedClassName(SparkListenerBlockUpdated) + val inputUpdate = Utils.getFormattedClassName(SparkListenerInputUpdate) + val outputUpdate = Utils.getFormattedClassName(SparkListenerOutputUpdate) } def sparkEventFromJson(json: JValue): SparkListenerEvent = { @@ -556,6 +571,8 @@ private[spark] object JsonProtocol { case `logStart` => logStartFromJson(json) case `metricsUpdate` => executorMetricsUpdateFromJson(json) case `blockUpdate` => blockUpdateFromJson(json) + case `inputUpdate` => inputEventFromJson(json) + case `outputUpdate` => outputEventFromJson(json) case other => mapper.readValue(compact(render(json)), Utils.classForName(other)) .asInstanceOf[SparkListenerEvent] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ec9352a7fa055..c23035ba1e274 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -29,9 +29,10 @@ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.SparkListenerInputUpdate import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser} +import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource @@ -214,13 +215,25 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { private def loadV1Source(paths: String*) = { // Code path for data source v1. - sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = paths, - userSpecifiedSchema = userSpecifiedSchema, - className = source, - options = extraOptions.toMap).resolveRelation()) + + val relation = DataSource.apply( + sparkSession, + paths = paths, + userSpecifiedSchema = userSpecifiedSchema, + className = source, + options = extraOptions.toMap).resolveRelation() + + relation match { + case hs: HadoopFsRelation => + sparkSession + .sparkContext + .listenerBus + .post(SparkListenerInputUpdate(this.source, + this.extraOptions, + hs.location.rootPaths.map(_.toString))) + case _ => + } + sparkSession.baseRelationToDataFrame(relation) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 90bea2d676e22..6d6af31ffd4e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -23,6 +23,7 @@ import java.util.{Date, Locale, Properties, UUID} import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability +import org.apache.spark.scheduler.SparkListenerOutputUpdate import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ @@ -266,6 +267,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } else { saveToV1Source() } + ds.sparkSession + .sparkContext + .listenerBus + .post(SparkListenerOutputUpdate( + this.source, + this.mode.toString, + this.extraOptions)) } private def saveToV1Source(): Unit = {