From 9d2098db3e082de0deb1c6cfea7527d04f3f5d03 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 12 Mar 2018 21:43:06 +0800 Subject: [PATCH 1/6] [SPARK-23653][SQL] Show sql statement in spark SQL UI --- .../org/apache/spark/sql/SparkSession.scala | 1 + .../spark/sql/execution/SQLExecution.scala | 15 +++++++++-- .../sql/execution/ui/ExecutionPage.scala | 20 +++++++++++++++ .../execution/ui/SQLAppStatusListener.scala | 7 ++++-- .../sql/execution/ui/SQLAppStatusStore.scala | 3 ++- .../spark/sql/execution/ui/SQLListener.scala | 3 ++- .../sql/execution/SQLJsonProtocolSuite.scala | 5 ++-- .../ui/SQLAppStatusListenerSuite.scala | 25 +++++++++++++------ 8 files changed, 63 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 734573ba31f71..7d9c3dd57b99f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -635,6 +635,7 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { + SQLExecution.setSqlText(sqlText) Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index e991da7df0bde..89a37dcb1afcc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -34,6 +34,16 @@ object SQLExecution { private val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() + private val executionIdToSqlText = new ConcurrentHashMap[Long, String]() + + def setSqlText(sqlText: String): Unit = { + executionIdToSqlText.putIfAbsent(_nextExecutionId.get(), sqlText) + } + + def getSqlText(executionId: Long): String = { + executionIdToSqlText.get(executionId) + } + def getQueryExecution(executionId: Long): QueryExecution = { executionIdToQueryExecution.get(executionId) } @@ -63,16 +73,17 @@ object SQLExecution { val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) + val sqlText = getSqlText(executionId) executionIdToQueryExecution.put(executionId, queryExecution) try { // sparkContext.getCallSite() would first try to pick up any call site that was previously // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on // streaming queries would give us call site like "run at :0" val callSite = sparkSession.sparkContext.getCallSite() - sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, - SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) + SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), + System.currentTimeMillis(), sqlText)) try { body } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index e0554f0c4d337..1b83cd2c95baf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -78,6 +78,7 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging summary ++ planVisualization(metrics, graph) ++ + showSQLText(executionUIData.sqlText) ++ physicalPlanDescription(executionUIData.physicalPlanDescription) }.getOrElse {
No information to display for query {executionId}
@@ -120,6 +121,25 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging private def jobURL(jobId: Long): String = "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId) + private def showSQLText(sqlText: String): Seq[Node] = { +
+ + + SQL text + +
+ + +
+ } + private def physicalPlanDescription(physicalPlanDescription: String): Seq[Node] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 53fb9a0cc21cf..f2750787fe8a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -230,7 +230,7 @@ class SQLAppStatusListener( private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time) = event + physicalPlanDescription, sparkPlanInfo, time, sqlText) = event def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { nodes.map { @@ -265,6 +265,7 @@ class SQLAppStatusListener( exec.physicalPlanDescription = physicalPlanDescription exec.metrics = sqlPlanMetrics exec.submissionTime = time + exec.sqlText = sqlText update(exec) } @@ -351,6 +352,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var jobs = Map[Int, JobExecutionStatus]() var stages = Set[Int]() var driverAccumUpdates = Map[Long, Long]() + var sqlText: String = null @volatile var metricsValues: Map[Long, String] = null @@ -369,7 +371,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { completionTime, jobs, stages, - metricsValues) + metricsValues, + sqlText) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 9a76584717f42..4022238e1c9f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -91,7 +91,8 @@ class SQLExecutionUIData( * from the SQL listener instance. */ @JsonDeserialize(keyAs = classOf[JLong]) - val metricValues: Map[Long, String]) { + val metricValues: Map[Long, String], + val sqlText: String) { @JsonIgnore @KVIndex("completionTime") private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index b58b8c6d45e5b..3c69db6572158 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -34,7 +34,8 @@ case class SparkListenerSQLExecutionStart( details: String, physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, - time: Long) + time: Long, + sqlText: String) extends SparkListenerEvent @DeveloperApi diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index c2e62b987e0cc..75da2de95215a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -41,12 +41,13 @@ class SQLJsonProtocolSuite extends SparkFunSuite { | "metadata":{}, | "metrics":[] | }, - | "time":0 + | "time":0, + | "sqlText":"select 1 as a" |} """.stripMargin val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString)) val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan", - new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0) + new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0, "select 1 as a") assert(reconstructedEvent == expectedEvent) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 85face3994fd4..582dcd04dfb86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -166,7 +166,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -298,7 +299,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), @@ -327,7 +329,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), @@ -367,7 +370,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), @@ -396,7 +400,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) var stageId = 0 def twoStageJob(jobId: Int): Unit = { @@ -521,13 +526,15 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with val df = createTestDataFrame // Start execution 1 and execution 2 time += 1 + val sqlText = "select 1 as a" listener.onOtherEvent(SparkListenerSQLExecutionStart( 1, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - time)) + time, + sqlText)) time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( 2, @@ -535,7 +542,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - time)) + time, + sqlText)) // Stop execution 2 before execution 1 time += 1 @@ -551,7 +559,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - time)) + time, + sqlText)) assert(statusStore.executionsCount === 2) assert(statusStore.execution(2) === None) } From 4712379415ad660eb15088401a537df95e05650d Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Wed, 14 Mar 2018 20:03:27 +0800 Subject: [PATCH 2/6] fix the variable substitution problem like `${var}`, `${env:var}` --- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 7d9c3dd57b99f..f5dba6b8eb306 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -146,6 +146,8 @@ class SparkSession private( } } + lazy private val substitutor = new VariableSubstitution(sessionState.conf) + /** * A wrapped version of this session in the form of a [[SQLContext]], for backward compatibility. * @@ -635,7 +637,7 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { - SQLExecution.setSqlText(sqlText) + SQLExecution.setSqlText(substitutor.substitute(sqlText)) Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) } From 6f8bc0d4bc536213dcf7415dd90152362ae06124 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 15 Mar 2018 15:15:44 +0800 Subject: [PATCH 3/6] change sql text to DF and fix the problems mentioned in review --- .../apache/spark/sql/DataFrameWriter.scala | 2 +- .../scala/org/apache/spark/sql/Dataset.scala | 19 ++++++++++++++----- .../org/apache/spark/sql/SparkSession.scala | 4 ++-- .../spark/sql/execution/SQLExecution.scala | 14 ++------------ .../streaming/MicroBatchExecution.scala | 2 +- .../hive/thriftserver/SparkSQLDriver.scala | 2 +- .../hive/execution/HiveComparisonTest.scala | 4 +++- 7 files changed, 24 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ed7a9100cc7f1..75cb5fd52cc91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -651,7 +651,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { try { val start = System.nanoTime() // call `QueryExecution.toRDD` to trigger the execution of commands. - SQLExecution.withNewExecutionId(session, qe)(qe.toRdd) + SQLExecution.withNewExecutionId(session, qe, ds.sqlText)(qe.toRdd) val end = System.nanoTime() session.listenerManager.onSuccess(name, qe, end - start) } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0aee1d7be5788..8158255879c6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -182,6 +182,15 @@ class Dataset[T] private[sql]( this(sqlContext.sparkSession, logicalPlan, encoder) } + private var _sqlText: String = _ + + def setSqlText(sqlText: String): Dataset[T] = { + _sqlText = sqlText + this + } + + def sqlText: String = _sqlText + @transient private[sql] val logicalPlan: LogicalPlan = { // For various commands (like DDL) and queries with side effects, we force query execution // to happen right away to let these side effects take place eagerly. @@ -251,7 +260,7 @@ class Dataset[T] private[sql]( Column(col).cast(StringType) } } - val takeResult = newDf.select(castCols: _*).take(numRows + 1) + val takeResult = newDf.select(castCols: _*).setSqlText(sqlText).take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) @@ -1457,7 +1466,7 @@ class Dataset[T] private[sql]( */ def filter(condition: Column): Dataset[T] = withTypedPlan { Filter(condition.expr, planWithBarrier) - } + }.setSqlText(sqlText) /** * Filters rows using the given SQL expression. @@ -3222,7 +3231,7 @@ class Dataset[T] private[sql]( * an execution. */ private def withNewExecutionId[U](body: => U): U = { - SQLExecution.withNewExecutionId(sparkSession, queryExecution)(body) + SQLExecution.withNewExecutionId(sparkSession, queryExecution, sqlText)(body) } /** @@ -3231,7 +3240,7 @@ class Dataset[T] private[sql]( * reset. */ private def withNewRDDExecutionId[U](body: => U): U = { - SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) { + SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution, sqlText) { rddQueryExecution.executedPlan.foreach { plan => plan.resetMetrics() } @@ -3249,7 +3258,7 @@ class Dataset[T] private[sql]( plan.resetMetrics() } val start = System.nanoTime() - val result = SQLExecution.withNewExecutionId(sparkSession, qe) { + val result = SQLExecution.withNewExecutionId(sparkSession, qe, sqlText) { action(qe.executedPlan) } val end = System.nanoTime() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f5dba6b8eb306..34a0c38170b80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -637,8 +637,8 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { - SQLExecution.setSqlText(substitutor.substitute(sqlText)) - Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) + val df = Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) + df.setSqlText(substitutor.substitute(sqlText)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 89a37dcb1afcc..c2f418af4d765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -34,16 +34,6 @@ object SQLExecution { private val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() - private val executionIdToSqlText = new ConcurrentHashMap[Long, String]() - - def setSqlText(sqlText: String): Unit = { - executionIdToSqlText.putIfAbsent(_nextExecutionId.get(), sqlText) - } - - def getSqlText(executionId: Long): String = { - executionIdToSqlText.get(executionId) - } - def getQueryExecution(executionId: Long): QueryExecution = { executionIdToQueryExecution.get(executionId) } @@ -68,12 +58,12 @@ object SQLExecution { */ def withNewExecutionId[T]( sparkSession: SparkSession, - queryExecution: QueryExecution)(body: => T): T = { + queryExecution: QueryExecution, + sqlText: String = "")(body: => T): T = { val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) - val sqlText = getSqlText(executionId) executionIdToQueryExecution.put(executionId, queryExecution) try { // sparkContext.getCallSite() would first try to pick up any call site that was previously diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 6e231970f4a22..b43ff0654940d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -503,7 +503,7 @@ class MicroBatchExecution( new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema)) reportTimeTaken("addBatch") { - SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) { + SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution, nextBatch.sqlText) { sink match { case s: Sink => s.addBatch(currentBatchId, nextBatch) case _: StreamWriteSupport => diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 6775902173444..5c665a93adcf3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -60,7 +60,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont try { context.sparkContext.setJobDescription(command) val execution = context.sessionState.executePlan(context.sql(command).logicalPlan) - hiveResponse = SQLExecution.withNewExecutionId(context.sparkSession, execution) { + hiveResponse = SQLExecution.withNewExecutionId(context.sparkSession, execution, command) { execution.hiveResultString() } tableSchema = getResultSetSchema(execution) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 272e6f51f5002..5c043b26a2cdf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -345,7 +345,9 @@ abstract class HiveComparisonTest val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath)) def getResult(): Seq[String] = { - SQLExecution.withNewExecutionId(query.sparkSession, query)(query.hiveResultString()) + SQLExecution.withNewExecutionId(query.sparkSession, query, queryString) { + query.hiveResultString() + } } try { (query, prepareAnswer(query, getResult())) } catch { case e: Throwable => From 92293c6b0890eeb439dddf2157122bc850a8a711 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 15 Mar 2018 22:00:59 +0800 Subject: [PATCH 4/6] simplify the code about sql text in DF --- .../scala/org/apache/spark/sql/Dataset.scala | 64 +++++++++---------- .../org/apache/spark/sql/SparkSession.scala | 4 +- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 8158255879c6e..897e3c2621e74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -61,18 +61,20 @@ import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.Utils private[sql] object Dataset { - def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = { - val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]]) + def apply[T: Encoder](sparkSession: SparkSession, + logicalPlan: LogicalPlan, sqlText: String = ""): Dataset[T] = { + val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]], sqlText) // Eagerly bind the encoder so we verify that the encoder matches the underlying // schema. The user will get an error if this is not the case. dataset.deserializer dataset } - def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { + def ofRows(sparkSession: SparkSession, + logicalPlan: LogicalPlan, sqlText: String = ""): DataFrame = { val qe = sparkSession.sessionState.executePlan(logicalPlan) qe.assertAnalyzed() - new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) + new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema), sqlText) } } @@ -166,7 +168,8 @@ private[sql] object Dataset { class Dataset[T] private[sql]( @transient val sparkSession: SparkSession, @DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution, - encoder: Encoder[T]) + encoder: Encoder[T], + val sqlText: String = "") extends Serializable { queryExecution.assertAnalyzed() @@ -174,23 +177,16 @@ class Dataset[T] private[sql]( // Note for Spark contributors: if adding or updating any action in `Dataset`, please make sure // you wrap it with `withNewExecutionId` if this actions doesn't call other action. - def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = { - this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder) + def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, + encoder: Encoder[T], sqlText: String = "") = { + this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder, sqlText) } - def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = { - this(sqlContext.sparkSession, logicalPlan, encoder) + def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, + encoder: Encoder[T], sqlText: String = "") = { + this(sqlContext.sparkSession, logicalPlan, encoder, sqlText) } - private var _sqlText: String = _ - - def setSqlText(sqlText: String): Dataset[T] = { - _sqlText = sqlText - this - } - - def sqlText: String = _sqlText - @transient private[sql] val logicalPlan: LogicalPlan = { // For various commands (like DDL) and queries with side effects, we force query execution // to happen right away to let these side effects take place eagerly. @@ -260,7 +256,7 @@ class Dataset[T] private[sql]( Column(col).cast(StringType) } } - val takeResult = newDf.select(castCols: _*).setSqlText(sqlText).take(numRows + 1) + val takeResult = newDf.select(castCols: _*).take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) @@ -399,7 +395,8 @@ class Dataset[T] private[sql]( */ // This is declared with parentheses to prevent the Scala compiler from treating // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema)) + def toDF(): DataFrame = + new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema), sqlText) /** * :: Experimental :: @@ -631,7 +628,8 @@ class Dataset[T] private[sql]( outputPartitioning, physicalPlan.outputOrdering, isStreaming - )(sparkSession)).as[T] + )(sparkSession), + sqlText).as[T] } /** @@ -1373,10 +1371,11 @@ class Dataset[T] private[sql]( planWithBarrier) if (encoder.flat) { - new Dataset[U1](sparkSession, project, encoder) + new Dataset[U1](sparkSession, project, encoder, sqlText) } else { // Flattens inner fields of U1 - new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1) + new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder), sqlText) + .map(_._1) } } @@ -1390,7 +1389,7 @@ class Dataset[T] private[sql]( val namedColumns = columns.map(_.withInputType(exprEnc, planWithBarrier.output).named) val execution = new QueryExecution(sparkSession, Project(namedColumns, planWithBarrier)) - new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders)) + new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders), sqlText) } /** @@ -1466,7 +1465,7 @@ class Dataset[T] private[sql]( */ def filter(condition: Column): Dataset[T] = withTypedPlan { Filter(condition.expr, planWithBarrier) - }.setSqlText(sqlText) + } /** * Filters rows using the given SQL expression. @@ -2032,7 +2031,7 @@ class Dataset[T] private[sql]( val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => new Dataset[T]( - sparkSession, Sample(x(0), x(1), withReplacement = false, seed, plan), encoder) + sparkSession, Sample(x(0), x(1), withReplacement = false, seed, plan), encoder, sqlText) }.toArray } @@ -2592,7 +2591,7 @@ class Dataset[T] private[sql]( new Dataset[U]( sparkSession, MapPartitions[T, U](func, planWithBarrier), - implicitly[Encoder[U]]) + implicitly[Encoder[U]], sqlText) } /** @@ -2622,7 +2621,8 @@ class Dataset[T] private[sql]( val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]] Dataset.ofRows( sparkSession, - MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier)) + MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier), + sqlText) } /** @@ -3301,21 +3301,21 @@ class Dataset[T] private[sql]( /** A convenient function to wrap a logical plan and produce a DataFrame. */ @inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = { - Dataset.ofRows(sparkSession, logicalPlan) + Dataset.ofRows(sparkSession, logicalPlan, sqlText) } /** A convenient function to wrap a logical plan and produce a Dataset. */ @inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = { - Dataset(sparkSession, logicalPlan) + Dataset(sparkSession, logicalPlan, sqlText) } /** A convenient function to wrap a set based logical plan and produce a Dataset. */ @inline private def withSetOperator[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = { if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) { // Set operators widen types (change the schema), so we cannot reuse the row encoder. - Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]] + Dataset.ofRows(sparkSession, logicalPlan, sqlText).asInstanceOf[Dataset[U]] } else { - Dataset(sparkSession, logicalPlan) + Dataset(sparkSession, logicalPlan, sqlText) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 34a0c38170b80..349f292517fbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -637,8 +637,8 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { - val df = Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) - df.setSqlText(substitutor.substitute(sqlText)) + Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText), + substitutor.substitute(sqlText)) } /** From 89e8e74923eea894402aa4dc2b1cb2fa2ff9a738 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Fri, 16 Mar 2018 11:09:07 +0800 Subject: [PATCH 5/6] workaround for the Scala bug --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 897e3c2621e74..5c0361f8c7079 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -178,12 +178,17 @@ class Dataset[T] private[sql]( // you wrap it with `withNewExecutionId` if this actions doesn't call other action. def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, - encoder: Encoder[T], sqlText: String = "") = { + encoder: Encoder[T]) = { + this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder, "") + } + + def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, + encoder: Encoder[T], sqlText: String) = { this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder, sqlText) } def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, - encoder: Encoder[T], sqlText: String = "") = { + encoder: Encoder[T], sqlText: String) = { this(sqlContext.sparkSession, logicalPlan, encoder, sqlText) } From df98d83a2ebc194864716f08636163905e2fd114 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 19 Mar 2018 15:04:13 +0800 Subject: [PATCH 6/6] count() shouldn't show sql text --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5c0361f8c7079..1f91cbe2b573a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2780,7 +2780,7 @@ class Dataset[T] private[sql]( * @group action * @since 1.6.0 */ - def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => + def count(): Long = withAction("count", groupBy().count().queryExecution, true) { plan => plan.executeCollect().head.getLong(0) } @@ -3257,13 +3257,17 @@ class Dataset[T] private[sql]( * Wrap a Dataset action to track the QueryExecution and time cost, then report to the * user-registered callback functions. */ - private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { + private def withAction[U]( + name: String, + qe: QueryExecution, + hideSqlText: Boolean = false)(action: SparkPlan => U) = { try { qe.executedPlan.foreach { plan => plan.resetMetrics() } val start = System.nanoTime() - val result = SQLExecution.withNewExecutionId(sparkSession, qe, sqlText) { + val result = SQLExecution.withNewExecutionId(sparkSession, qe, + if (hideSqlText) "" else sqlText) { action(qe.executedPlan) } val end = System.nanoTime()