-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7786][STREAMING] Allow StreamingListener to be specified in SparkConf and loaded when creating StreamingContext #6380
Changes from 11 commits
73549ff
4eb6987
f8ad629
3c1a19d
f3a3fee
e8506d4
6453c90
d92d55b
00c0409
186766f
c94982f
40d91ed
d7e7b2e
33d3179
233335d
28e7f27
af41098
9ec68c1
70c31e4
aff08ff
fa8c752
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1481,6 +1481,17 @@ Apart from these, the following properties are also available, and may be useful | |
How many batches the Spark Streaming UI and status APIs remember before garbage collecting. | ||
</td> | ||
</tr> | ||
<tr> | ||
<td><code>spark.streaming.listeners</code></td> | ||
<td>(none)</td> | ||
<td> | ||
A comma-separated list of classes that implement <code>StreamingListener</code>; when initializing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is incorrect: "Spark's streaming listener bus" ==> "StreamingContext's listener bus" |
||
SparkContext, instances of these classes will be created and registered with Spark's listener | ||
bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor | ||
will be called; otherwise, a zero-argument constructor will be called. If no valid constructor | ||
can be found, the SparkContext creation will fail with an exception. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update the documentation CORRECTLY. There are references to spark listener!! |
||
</td> | ||
</tr> | ||
</table> | ||
|
||
#### Cluster Managers | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.spark.streaming | ||
|
||
import java.io.{InputStream, NotSerializableException} | ||
import java.lang.reflect.Constructor | ||
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} | ||
|
||
import scala.collection.Map | ||
|
@@ -201,6 +202,7 @@ class StreamingContext private[streaming] ( | |
|
||
private val startSite = new AtomicReference[CallSite](null) | ||
|
||
setupStreamingListeners() | ||
private var shutdownHookRef: AnyRef = _ | ||
|
||
/** | ||
|
@@ -527,6 +529,54 @@ class StreamingContext private[streaming] ( | |
scheduler.listenerBus.addListener(streamingListener) | ||
} | ||
|
||
/** | ||
* Registers streamingListeners specified in spark.streaming.listeners | ||
*/ | ||
private def setupStreamingListeners(): Unit = { | ||
// Use reflection to instantiate listeners specified via `spark.extraListeners` | ||
try { | ||
val listenerClassNames: Seq[String] = | ||
conf.get("spark.streaming.listeners", "").split(',').map(_.trim).filter(_ != "") | ||
for (className <- listenerClassNames) { | ||
// Use reflection to find the right constructor | ||
val constructors = { | ||
val listenerClass = Class.forName(className) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use |
||
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]] | ||
} | ||
val constructorTakingSparkConf = constructors.find { c => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. |
||
c.getParameterTypes.sameElements(Array(classOf[SparkConf])) | ||
} | ||
lazy val zeroArgumentConstructor = constructors.find { c => | ||
c.getParameterTypes.isEmpty | ||
} | ||
val listener: StreamingListener = { | ||
if (constructorTakingSparkConf.isDefined) { | ||
constructorTakingSparkConf.get.newInstance(conf) | ||
} else if (zeroArgumentConstructor.isDefined) { | ||
zeroArgumentConstructor.get.newInstance() | ||
} else { | ||
throw new SparkException( | ||
s"$className did not have a zero-argument constructor or a" + | ||
" single-argument constructor that accepts SparkConf. Note: if the class is" + | ||
" defined inside of another Scala class, then its constructors may accept an" + | ||
" implicit parameter that references the enclosing class; in this case, you must" + | ||
" define the listener as a top-level class in order to prevent this extra" + | ||
" parameter from breaking Spark's ability to find a valid constructor.") | ||
} | ||
} | ||
addStreamingListener(listener) | ||
logInfo(s"Registered streaming listener $className") | ||
} | ||
} catch { | ||
case e: Exception => | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
stop() | ||
} finally { | ||
throw new SparkException(s"Exception when registering StreamingListener", e) | ||
} | ||
} | ||
} | ||
|
||
private def validate() { | ||
assert(graph != null, "Graph is null") | ||
graph.validate() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
package org.apache.spark.streaming.scheduler | ||
|
||
import org.apache.spark.scheduler.StatsReportListener | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was this needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You have not addressed this. Why was this needed?? I dont see anything that required this. |
||
|
||
import scala.collection.mutable.Queue | ||
|
||
import org.apache.spark.util.Distribution | ||
|
@@ -84,7 +86,10 @@ trait StreamingListener { | |
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) | ||
*/ | ||
@DeveloperApi | ||
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { | ||
class StatsReportListener(numBatchInfos: Int) extends StreamingListener { | ||
|
||
def this() = this(10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this different from previous one? With optional params, you already had zero arg constructor before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The previous one could not be instantiated by reflection due to having no
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is it 10 batches only. Can you make it 100 batches. |
||
|
||
// Queue containing latest completed batches | ||
val batchInfos = new Queue[BatchInfo]() | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.spark.streaming | ||
|
||
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} | ||
import scala.collection.JavaConversions._ | ||
import scala.concurrent.Future | ||
import scala.concurrent.ExecutionContext.Implicits.global | ||
|
||
|
@@ -29,7 +30,7 @@ import org.apache.spark.streaming.scheduler._ | |
import org.scalatest.Matchers | ||
import org.scalatest.concurrent.Eventually._ | ||
import org.scalatest.time.SpanSugar._ | ||
import org.apache.spark.Logging | ||
import org.apache.spark.{SparkConf, Logging} | ||
|
||
class StreamingListenerSuite extends TestSuiteBase with Matchers { | ||
|
||
|
@@ -138,6 +139,18 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { | |
} | ||
true | ||
} | ||
|
||
test("registering listeners via spark.streaming.listeners") { | ||
val conf = new SparkConf().setMaster("local").setAppName("test") | ||
.set("spark.streaming.listeners", classOf[BatchInfoCollector].getName + "," + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be changed too to extraListeners. |
||
classOf[ReceiverInfoCollector].getName) | ||
val scc = new StreamingContext(conf, Seconds(1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can lead to a leak in the StreamingContext and underlying SparkContext. Please put the whole in
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. StreamingListenerSuite extends TestSuiteBase, so I think we can use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this still the case when the context is never started? On Fri, Jul 17, 2015 at 1:55 AM, Tathagata Das notifications@github.com
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the StreamingContext is being created, which means an underlying SparkContext is being created, which is not being cleaned up. |
||
|
||
scc.scheduler.listenerBus.listeners.collect { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its easier to understand if this check is replaced with |
||
case x: BatchInfoCollector => x }.size should be (1) | ||
scc.scheduler.listenerBus.listeners.collect { | ||
case x: ReceiverInfoCollector => x }.size should be (1) | ||
} | ||
} | ||
|
||
/** Listener that collects information on processed batches */ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not
spark.streaming.extraListeners
, similar tospark.extraListeners
?