From 5a8dc877d5d279983754919f3e44a4ba7cb3c2fa Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 09:37:37 -0500 Subject: [PATCH 1/8] improve the doc for "spark.memory.offHeap.size" --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 6600cb6c0ac09..c686304bd30da 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -902,7 +902,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory in bytes which can be used for off-heap allocation. + The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 46788bd26737a53c5bc76089012ecb2515f0260b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 14:00:16 -0500 Subject: [PATCH 2/8] fix --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index c686304bd30da..6600cb6c0ac09 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -902,7 +902,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. + The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 766c67d4d1609782bf7558a2b7c742df72bff620 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 28 Oct 2016 16:20:47 -0400 Subject: [PATCH 3/8] postToAll has to be called in the same thread --- .../execution/streaming/StreamingQueryListenerBus.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index fc2190d39da4f..9891a7d89d38b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -39,12 +39,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) * be dispatched to all StreamingQueryListener in the thread of the Spark listener bus. */ def post(event: StreamingQueryListener.Event) { - event match { - case s: QueryStartedEvent => - postToAll(s) - case _ => - sparkListenerBus.post(event) - } + sparkListenerBus.post(event) } override def onOtherEvent(event: SparkListenerEvent): Unit = { From 24bcd7b700627dff961ac6e405922babf5d1851f Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 28 Oct 2016 21:51:57 -0400 Subject: [PATCH 4/8] fix the misunderstanding of the problem --- .../execution/streaming/StreamingQueryListenerBus.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 9891a7d89d38b..2623f2e55a6c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -39,7 +39,14 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) * be dispatched to all StreamingQueryListener in the thread of the Spark listener bus. */ def post(event: StreamingQueryListener.Event) { - sparkListenerBus.post(event) + event match { + case s: QueryStartedEvent => + sparkListenerBus.postToAll(s) + // post to local listeners to trigger callbacks + postToAll(event) + case _ => + sparkListenerBus.post(event) + } } override def onOtherEvent(event: SparkListenerEvent): Unit = { From 2c2147e4015d639f81206b3f3d2a68d3a623e1aa Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 31 Oct 2016 16:46:00 -0400 Subject: [PATCH 5/8] guarantee thread-safety --- .../sql/execution/streaming/StreamingQueryListenerBus.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 2623f2e55a6c6..f68167b545d33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -41,7 +41,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) def post(event: StreamingQueryListener.Event) { event match { case s: QueryStartedEvent => - sparkListenerBus.postToAll(s) + sparkListenerBus.post(s) // post to local listeners to trigger callbacks postToAll(event) case _ => From 8d2b3a63f8d14acb8bd33ce15a9b7a593b703f97 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 31 Oct 2016 16:46:40 -0400 Subject: [PATCH 6/8] fix --- .../sql/execution/streaming/StreamingQueryListenerBus.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index f68167b545d33..04397f5abd3dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -43,7 +43,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) case s: QueryStartedEvent => sparkListenerBus.post(s) // post to local listeners to trigger callbacks - postToAll(event) + postToAll(s) case _ => sparkListenerBus.post(event) } From 4b29d8f7be52f230f3e5dc726f11599e86ee4e07 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 31 Oct 2016 20:32:45 -0400 Subject: [PATCH 7/8] de-duplicate QueryStartedEvent and add test --- .../execution/streaming/StreamingQueryListenerBus.scala | 9 ++++++++- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 7 ++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 04397f5abd3dd..825b37d647a1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -52,7 +52,14 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case e: StreamingQueryListener.Event => - postToAll(e) + // SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus + // synchronously and to listeners attached to LiveListenerBus asynchronously. Therefore, + // we need to ignore QueryStartedEvent if this method is called within SparkListenerBus + // thread + if (Thread.currentThread().getName != "SparkListenerBus" || + !e.isInstanceOf[QueryStartedEvent]) { + postToAll(e) + } case _ => } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 464c443beb6e7..98ad95cc9b77b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // A StreamingQueryListener that gets the query status after the first completed trigger val listener = new StreamingQueryListener { @volatile var firstStatus: StreamingQueryStatus = null - override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } + var queryStartedEvent = 0 + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { + queryStartedEvent += 1 + } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { if (firstStatus == null) firstStatus = queryProgress.queryStatus } @@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { q.processAllAvailable() eventually(timeout(streamingTimeout)) { assert(listener.firstStatus != null) + // test if QueryStartedEvent callback is called for only once + assert(listener.queryStartedEvent === 1) } listener.firstStatus } finally { From 00dd3ceadf24c17b2d16283a86644a22e4102a88 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 1 Nov 2016 21:08:49 -0400 Subject: [PATCH 8/8] address shixiong's comments --- .../sql/execution/streaming/StreamingQueryListenerBus.scala | 5 ++--- .../org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 825b37d647a1c..22e4c6380fcd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -53,11 +53,10 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) event match { case e: StreamingQueryListener.Event => // SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus - // synchronously and to listeners attached to LiveListenerBus asynchronously. Therefore, + // synchronously and the ones attached to LiveListenerBus asynchronously. Therefore, // we need to ignore QueryStartedEvent if this method is called within SparkListenerBus // thread - if (Thread.currentThread().getName != "SparkListenerBus" || - !e.isInstanceOf[QueryStartedEvent]) { + if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) { postToAll(e) } case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 98ad95cc9b77b..31b7fe0b04da9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -290,7 +290,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // A StreamingQueryListener that gets the query status after the first completed trigger val listener = new StreamingQueryListener { @volatile var firstStatus: StreamingQueryStatus = null - var queryStartedEvent = 0 + @volatile var queryStartedEvent = 0 override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { queryStartedEvent += 1 }