Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7786][STREAMING] Allow StreamingListener to be specified in SparkConf and loaded when creating StreamingContext #6380

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
73549ff
Add func setupStreamingListeners to add listeners from
Jeffrharr May 21, 2015
4eb6987
Merge branch 'master' into SparkConf_Listeners
Jeffrharr May 23, 2015
f8ad629
Use class API to add listener. Modify StatsReportListener to have a zero
Jeffrharr May 23, 2015
3c1a19d
Reduced ambiguity by removing default values from StatsReportListener…
Jeffrharr May 23, 2015
f3a3fee
Add test
Jeffrharr May 23, 2015
e8506d4
Fix minor syntax derp
Jeffrharr May 23, 2015
6453c90
Fix style issues and small type in StreamingListenerSuite
Jeffrharr May 24, 2015
d92d55b
Maintain consistancy with property naming conventions
Jeffrharr May 27, 2015
00c0409
Fix test for new format
Jeffrharr May 27, 2015
186766f
Merge remote-tracking branch 'upstream/master' into SparkConf_Listeners
Jeffrharr May 28, 2015
c94982f
Test renamed to spark.streaming.listeners
Jeffrharr May 28, 2015
40d91ed
Merge remote-tracking branch 'upstream/master' into SparkConf_Listeners
Jeffrharr Jul 10, 2015
d7e7b2e
Update name to spark.streaming.extraListeners / fix docs / change
Jeffrharr Jul 10, 2015
33d3179
Default to 100
Jeffrharr Jul 10, 2015
233335d
Update Class.forName to org.apache.spark.util.Utils.classForName to
Jeffrharr Jul 10, 2015
28e7f27
Use spark.streaming.extraListeners
Jeffrharr Jul 10, 2015
af41098
checking
Jeffrharr Jul 10, 2015
9ec68c1
Changes in clarity, removed redundant error handling
Jeffrharr Jul 13, 2015
70c31e4
Fix documentation, add more/clearer test cases
Jeffrharr Jul 16, 2015
aff08ff
Accidental insertion removed
Jeffrharr Jul 16, 2015
fa8c752
Correct test text
Jeffrharr Jul 16, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1968,7 +1968,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
for (className <- listenerClassNames) {
// Use reflection to find the right constructor
val constructors = {
val listenerClass = Class.forName(className)
val listenerClass = org.apache.spark.util.Utils.classForName(className)
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
}
val constructorTakingSparkConf = constructors.find { c =>
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,17 @@ Apart from these, the following properties are also available, and may be useful
How many batches the Spark Streaming UI and status APIs remember before garbage collecting.
</td>
</tr>
<tr>
<td><code>spark.streaming.extraListeners</code></td>
<td>(none)</td>
<td>
A comma-separated list of classes that implement <code>StreamingListener</code>; when initializing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect:
"when initializing SparkContext"? ==> StreamingContext

"Spark's streaming listener bus" ==> "StreamingContext's listener bus"

StreamingContext, instances of these classes will be created and registered with StreamingContext's listener
bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor
will be called; otherwise, a zero-argument constructor will be called. If no valid constructor
can be found, the StreamingContext creation will fail with an exception.
</td>
</tr>
</table>

#### SparkR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming

import java.io.{InputStream, NotSerializableException}
import java.lang.reflect.Constructor
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}

import scala.collection.Map
Expand Down Expand Up @@ -202,6 +203,7 @@ class StreamingContext private[streaming] (

private val startSite = new AtomicReference[CallSite](null)

setupStreamingListeners()
private var shutdownHookRef: AnyRef = _

/**
Expand Down Expand Up @@ -538,6 +540,46 @@ class StreamingContext private[streaming] (
scheduler.listenerBus.addListener(streamingListener)
}

/**
* Registers streamingListeners specified in spark.streaming.extraListeners
*/
private def setupStreamingListeners(): Unit = {
// Use reflection to instantiate listeners specified via `spark.streaming.extraListeners`
val listenerClassNames: Seq[String] =
conf.get("spark.streaming.extraListeners", "").split(',').map(_.trim).filter(_ != "")
for (className <- listenerClassNames) {
// Use reflection to find the right constructor
val constructors = {
val listenerClass = org.apache.spark.util.Utils.classForName(className)
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]]
}
val constructorTakingSparkConf = constructors.find { c =>
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
}
lazy val zeroArgumentConstructor = constructors.find { c =>
c.getParameterTypes.isEmpty
}
val listener: StreamingListener = {
if (constructorTakingSparkConf.isDefined) {
constructorTakingSparkConf.get.newInstance(conf)
} else if (zeroArgumentConstructor.isDefined) {
zeroArgumentConstructor.get.newInstance()
} else {
throw new SparkException(
"Exception when registering Streaming Listener:" +
s" $className did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf. Note: if the class is" +
" defined inside of another Scala class, then its constructors may accept an" +
" implicit parameter that references the enclosing class; in this case, you must" +
" define the listener as a top-level class in order to prevent this extra" +
" parameter from breaking Spark's ability to find a valid constructor.")
}
}
addStreamingListener(listener)
logInfo(s"Registered StreamingListener $className")
}
}

private def validate() {
assert(graph != null, "Graph is null")
graph.validate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.streaming.scheduler

import org.apache.spark.scheduler.StatsReportListener
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have not addressed this. Why was this needed?? I dont see anything that required this.


import scala.collection.mutable.Queue

import org.apache.spark.util.Distribution
Expand Down Expand Up @@ -81,10 +83,13 @@ trait StreamingListener {
/**
* :: DeveloperApi ::
* A simple StreamingListener that logs summary statistics across Spark Streaming batches
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 100)
*/
@DeveloperApi
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
class StatsReportListener(numBatchInfos: Int) extends StreamingListener {

def this() = this(100)

// Queue containing latest completed batches
val batchInfos = new Queue[BatchInfo]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming

import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

Expand All @@ -26,10 +27,10 @@ import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler._

import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.apache.spark.Logging
import org.scalatest.{Matchers, Assertions}
import org.apache.spark.{SparkException, SparkConf, Logging}

class StreamingListenerSuite extends TestSuiteBase with Matchers {

Expand Down Expand Up @@ -140,6 +141,37 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
true
}

test("registering listeners via spark.streaming.extraListeners") {
// Test for success with zero-argument constructor and sparkConf constructor
val conf = new SparkConf().setMaster("local").setAppName("test")
.set("spark.streaming.extraListeners",
classOf[StreamingListenerThatAcceptsSparkConf].getName + "," +
classOf[ReceiverInfoCollector].getName)
val scc = new StreamingContext(conf, Seconds(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can lead to a leak in the StreamingContext and underlying SparkContext. Please put the whole in

var ssc: StremaingContext = null
try {
  ssc = new StreamingContext...

} finally {
   if (ssc != null) {
     ssc.stop()
   }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamingListenerSuite extends TestSuiteBase, so I think we can use the withStreamingContext(ssc) { } construct to manage cleanup for us: https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala#L270

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still the case when the context is never started?

On Fri, Jul 17, 2015 at 1:55 AM, Tathagata Das notifications@github.com
wrote:

In
streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
#6380 (comment):

@@ -140,6 +141,37 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
true
}
+

  • test("registering listeners via spark.streaming.extraListeners") {
  • // Test for success with zero-argument constructor and sparkConf constructor
  • val conf = new SparkConf().setMaster("local").setAppName("test")
  •  .set("spark.streaming.extraListeners",
    
  •    classOf[StreamingListenerThatAcceptsSparkConf].getName + "," +
    
  •    classOf[ReceiverInfoCollector].getName)
    
  • val scc = new StreamingContext(conf, Seconds(1))

This can lead to a leak in the StreamingContext and underlying
SparkContext. Please put the whole in

var ssc: StremaingContext = null
try {
ssc = new StreamingContext...

} finally {
if (ssc != null) {
ssc.stop()
}
}


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6380/files#r34868938.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the StreamingContext is being created, which means an underlying SparkContext is being created, which is not being cleaned up.


scc.scheduler.listenerBus.listeners.exists {
_.isInstanceOf[StreamingListenerThatAcceptsSparkConf] }
scc.scheduler.listenerBus.listeners.exists {
_.isInstanceOf[ReceiverInfoCollector] }

// Test for failure with too many arguments in constructor
val failingConf = new SparkConf().setMaster("local").setAppName("failingTest")
.set("spark.streaming.extraListeners", classOf[StreamingListenerTooManyArguments].getName)
val thrown = intercept[SparkException] {
val failingScc = new StreamingContext(failingConf, Seconds(1))
}
val expectedErrorMessage =
"Exception when registering Streaming Listener:" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets not be so specific, the exact text can change. Just verify whether the message has the name of the class inside it.

" org.apache.spark.streaming.StreamingListenerTooManyArguments" +
" did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf. Note: if the class is" +
" defined inside of another Scala class, then its constructors may accept an" +
" implicit parameter that references the enclosing class; in this case, you must" +
" define the listener as a top-level class in order to prevent this extra" +
" parameter from breaking Spark's ability to find a valid constructor."
assert(thrown.getMessage === expectedErrorMessage)
}
}

/** Listener that collects information on processed batches */
Expand Down Expand Up @@ -196,3 +228,11 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
}
def onStop() { }
}

class StreamingListenerThatAcceptsSparkConf(conf: SparkConf) extends StreamingListener {
// Empty dummy class used for testing
}

class StreamingListenerTooManyArguments(conf: SparkConf, failInt: Int) extends StreamingListener {
// Empty dummy class used for testing
}