From db15de5661ba9f5dbf94caedd02f5fdd90a8fe61 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 24 Apr 2016 15:05:15 +0800 Subject: [PATCH 1/4] fix call site for continuous queries --- .../apache/spark/sql/execution/SQLExecution.scala | 6 +++++- .../sql/execution/streaming/StreamExecution.scala | 12 ++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 0a11b16d0ed35..c85f98aa9b463 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -45,7 +45,11 @@ private[sql] object SQLExecution { val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) val r = try { - val callSite = Utils.getCallSite() + // We first try to pick up any call site that was set previously, then fall back to + // Utils.getCallSite(); because call Utils.getCallSite() on continuous queries directly + // would give us call site like "run at :0" + val callSite = sqlContext.sparkContext.getCallSite() + sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 2a1fa1ba627c8..a11293e2a2026 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.ContinuousQueryListener import org.apache.spark.sql.util.ContinuousQueryListener._ -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.{Utils, CallSite, UninterruptibleThread} /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. @@ -98,7 +98,15 @@ class StreamExecution( /** The thread that runs the micro-batches of this stream. */ private[sql] val microBatchThread = new UninterruptibleThread(s"stream execution thread for $name") { - override def run(): Unit = { runBatches() } + + private val callSite = Utils.getCallSite() + + override def run(): Unit = { + // To fix call site like "run at :0", we bridge the call site from the calling + // thread to this micro batch thread + sqlContext.sparkContext.setCallSite(callSite) + runBatches() + } } /** From 44d93c780cbeb97937f211b50c79bd0bf8291909 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 24 Apr 2016 15:10:19 +0800 Subject: [PATCH 2/4] fix style --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index c85f98aa9b463..804eccf44de06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -49,7 +49,7 @@ private[sql] object SQLExecution { // Utils.getCallSite(); because call Utils.getCallSite() on continuous queries directly // would give us call site like "run at :0" val callSite = sqlContext.sparkContext.getCallSite() - + sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) From 70c3f4ed456ddbabf05c21ab5637f04b95b975ea Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 24 Apr 2016 15:40:31 +0800 Subject: [PATCH 3/4] fix imports --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index a11293e2a2026..093095b84e63d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.ContinuousQueryListener import org.apache.spark.sql.util.ContinuousQueryListener._ -import org.apache.spark.util.{Utils, CallSite, UninterruptibleThread} +import org.apache.spark.util.{UninterruptibleThread, Utils} /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. From f7d84609417957e0472306f3eb3bf55c0f4c5112 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Tue, 3 May 2016 11:23:15 +0800 Subject: [PATCH 4/4] address comments --- .../org/apache/spark/sql/execution/SQLExecution.scala | 6 +++--- .../spark/sql/execution/streaming/StreamExecution.scala | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index b585b7b5abde1..31c9f1aef2f3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -45,9 +45,9 @@ private[sql] object SQLExecution { val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) val r = try { - // We first try to pick up any call site that was set previously, then fall back to - // Utils.getCallSite(); because call Utils.getCallSite() on continuous queries directly - // would give us call site like "run at :0" + // sparkContext.getCallSite() would first try to pick up any call site that was previously + // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on + // continuous queries would give us call site like "run at :0" val callSite = sparkSession.sparkContext.getCallSite() sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 0e7010169406d..3c5ced2af73f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -101,14 +101,14 @@ class StreamExecution( @volatile private[sql] var streamDeathCause: ContinuousQueryException = null + /* Get the call site in the caller thread; will pass this into the micro batch thread */ + private val callSite = Utils.getCallSite() + /** The thread that runs the micro-batches of this stream. */ private[sql] val microBatchThread = new UninterruptibleThread(s"stream execution thread for $name") { - - private val callSite = Utils.getCallSite() - override def run(): Unit = { - // To fix call site like "run at :0", we bridge the call site from the calling + // To fix call site like "run at :0", we bridge the call site from the caller // thread to this micro batch thread sparkSession.sparkContext.setCallSite(callSite) runBatches()