-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7786][STREAMING] Allow StreamingListener to be specified in SparkConf and loaded when creating StreamingContext #6380
Changes from 17 commits
73549ff
4eb6987
f8ad629
3c1a19d
f3a3fee
e8506d4
6453c90
d92d55b
00c0409
186766f
c94982f
40d91ed
d7e7b2e
33d3179
233335d
28e7f27
af41098
9ec68c1
70c31e4
aff08ff
fa8c752
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.spark.streaming | ||
|
||
import java.io.{InputStream, NotSerializableException} | ||
import java.lang.reflect.Constructor | ||
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} | ||
|
||
import scala.collection.Map | ||
|
@@ -202,6 +203,7 @@ class StreamingContext private[streaming] ( | |
|
||
private val startSite = new AtomicReference[CallSite](null) | ||
|
||
setupStreamingListeners() | ||
private var shutdownHookRef: AnyRef = _ | ||
|
||
/** | ||
|
@@ -538,6 +540,54 @@ class StreamingContext private[streaming] ( | |
scheduler.listenerBus.addListener(streamingListener) | ||
} | ||
|
||
/** | ||
* Registers streamingListeners specified in spark.streaming.extraListeners | ||
*/ | ||
private def setupStreamingListeners(): Unit = { | ||
// Use reflection to instantiate listeners specified via `spark.extraListeners` | ||
try { | ||
val listenerClassNames: Seq[String] = | ||
conf.get("spark.streaming.extraListeners", "").split(',').map(_.trim).filter(_ != "") | ||
for (className <- listenerClassNames) { | ||
// Use reflection to find the right constructor | ||
val constructors = { | ||
val listenerClass = org.apache.spark.util.Utils.classForName(className) | ||
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]] | ||
} | ||
val constructorTakingSparkConf = constructors.find { c => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. |
||
c.getParameterTypes.sameElements(Array(classOf[SparkConf])) | ||
} | ||
lazy val zeroArgumentConstructor = constructors.find { c => | ||
c.getParameterTypes.isEmpty | ||
} | ||
val listener: StreamingListener = { | ||
if (constructorTakingSparkConf.isDefined) { | ||
constructorTakingSparkConf.get.newInstance(conf) | ||
} else if (zeroArgumentConstructor.isDefined) { | ||
zeroArgumentConstructor.get.newInstance() | ||
} else { | ||
throw new SparkException( | ||
s"$className did not have a zero-argument constructor or a" + | ||
" single-argument constructor that accepts SparkConf. Note: if the class is" + | ||
" defined inside of another Scala class, then its constructors may accept an" + | ||
" implicit parameter that references the enclosing class; in this case, you must" + | ||
" define the listener as a top-level class in order to prevent this extra" + | ||
" parameter from breaking Spark's ability to find a valid constructor.") | ||
} | ||
} | ||
addStreamingListener(listener) | ||
logInfo(s"Registered StreamingListener $className") | ||
} | ||
} catch { | ||
case e: Exception => | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
stop() | ||
} finally { | ||
throw new SparkException(s"Exception when registering StreamingListener", e) | ||
} | ||
} | ||
} | ||
|
||
private def validate() { | ||
assert(graph != null, "Graph is null") | ||
graph.validate() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
package org.apache.spark.streaming.scheduler | ||
|
||
import org.apache.spark.scheduler.StatsReportListener | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was this needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You have not addressed this. Why was this needed?? I dont see anything that required this. |
||
|
||
import scala.collection.mutable.Queue | ||
|
||
import org.apache.spark.util.Distribution | ||
|
@@ -81,10 +83,13 @@ trait StreamingListener { | |
/** | ||
* :: DeveloperApi :: | ||
* A simple StreamingListener that logs summary statistics across Spark Streaming batches | ||
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) | ||
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 100) | ||
*/ | ||
@DeveloperApi | ||
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { | ||
class StatsReportListener(numBatchInfos: Int) extends StreamingListener { | ||
|
||
def this() = this(100) | ||
|
||
// Queue containing latest completed batches | ||
val batchInfos = new Queue[BatchInfo]() | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.spark.streaming | ||
|
||
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} | ||
import scala.collection.JavaConversions._ | ||
import scala.concurrent.Future | ||
import scala.concurrent.ExecutionContext.Implicits.global | ||
|
||
|
@@ -29,7 +30,7 @@ import org.apache.spark.streaming.scheduler._ | |
import org.scalatest.Matchers | ||
import org.scalatest.concurrent.Eventually._ | ||
import org.scalatest.time.SpanSugar._ | ||
import org.apache.spark.Logging | ||
import org.apache.spark.{SparkConf, Logging} | ||
|
||
class StreamingListenerSuite extends TestSuiteBase with Matchers { | ||
|
||
|
@@ -140,6 +141,18 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { | |
} | ||
true | ||
} | ||
|
||
test("registering listeners via spark.streaming.extraListeners") { | ||
val conf = new SparkConf().setMaster("local").setAppName("test") | ||
.set("spark.streaming.extraListeners", classOf[BatchInfoCollector].getName + "," + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I missed it earlier, this unit does not cover the two cases, one without constructor and one with constructor. Also, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also please include a third case where the incorrect StreamingListener with 2 args is specified, which is expected throw an exception. Test with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The change is already in StreamingListenerSuite.scala (and the class On Mon, Jul 13, 2015 at 5:36 PM, Tathagata Das notifications@github.com
|
||
classOf[ReceiverInfoCollector].getName) | ||
val scc = new StreamingContext(conf, Seconds(1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can lead to a leak in the StreamingContext and underlying SparkContext. Please put the whole in
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. StreamingListenerSuite extends TestSuiteBase, so I think we can use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this still the case when the context is never started? On Fri, Jul 17, 2015 at 1:55 AM, Tathagata Das notifications@github.com
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the StreamingContext is being created, which means an underlying SparkContext is being created, which is not being cleaned up. |
||
|
||
scc.scheduler.listenerBus.listeners.collect { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its easier to understand if this check is replaced with |
||
case x: BatchInfoCollector => x }.size should be (1) | ||
scc.scheduler.listenerBus.listeners.collect { | ||
case x: ReceiverInfoCollector => x }.size should be (1) | ||
} | ||
} | ||
|
||
/** Listener that collects information on processed batches */ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect:
"when initializing SparkContext"? ==> StreamingContext
"Spark's streaming listener bus" ==> "StreamingContext's listener bus"