-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7786][STREAMING] Allow StreamingListener to be specified in SparkConf and loaded when creating StreamingContext #6380
Conversation
spark.streamingListeners
argument constructor in order to make it compatible with config option.
…'s single-valued constructor. Added docs for config option spark.streamingListeners
Jenkins, this is ok to test. |
Test build #33425 has finished for PR 6380 at commit
|
How can I run Scala style checks? By the way, thanks for writing the code that I essentially lifted for this :P |
Test build #33445 has finished for PR 6380 at commit
|
@@ -17,6 +17,8 @@ | |||
|
|||
package org.apache.spark.streaming.scheduler | |||
|
|||
import org.apache.spark.scheduler.StatsReportListener |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have not addressed this. Why was this needed?? I dont see anything that required this.
@His-name-is-Joof you can do |
@@ -1481,6 +1481,17 @@ Apart from these, the following properties are also available, and may be useful | |||
How many batches the Spark Streaming UI and status APIs remember before garbage collecting. | |||
</td> | |||
</tr> | |||
<tr> | |||
<td><code>spark.streamingListeners</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be under the spark.streaming
namespace instead, so a better name would be spark.streaming.listeners
, or spark.streaming.extraListeners
to be more consistent with the existing spark.extraListeners
.
Test build #33612 has finished for PR 6380 at commit
|
Test build #33625 has finished for PR 6380 at commit
|
Test build #33635 has finished for PR 6380 at commit
|
val listenerClass = Class.forName(className) | ||
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]] | ||
} | ||
val constructorTakingSparkConf = constructors.find { c => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
listenerClass.getConstructor(classOf[SparkConf])
is simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. getConstructor
will throw NoSuchMethodException
and make the code complex.
retest this please |
Test build #34479 has finished for PR 6380 at commit
|
Hey so, I thought about this feature a little bit. I am not this is an essential feature.The spark.extraListeners was added so that listeners can get added before SparkContext is started even in environments where the user does not explicitly start the SparkContext, e.g. spark shell. Since the SparkContext in the spark-shell is started automatically before the user gets any opportunity to attach their own listener, the extraListener conf was added to make sure the system automatically attaches them before creating any listener event. But StreamingContext is really never started automatically like SparkContext in spark-shell. One can always add listeners before starting the StreamingContext. So what is the usecase where this is really essential? |
Any thoughts on this? |
Sorry, been unusually busy for quite a while. You make an excellent point, The downside being that streamingListeners must be formatted properly to Any thoughts on this? — |
avoid problems with classloaders
Test build #36995 has finished for PR 6380 at commit
|
Jenkins, test this please. |
Test build #37027 has finished for PR 6380 at commit
|
your test is failing :) |
|
||
test("registering listeners via spark.streaming.listeners") { | ||
val conf = new SparkConf().setMaster("local").setAppName("test") | ||
.set("spark.streaming.listeners", classOf[BatchInfoCollector].getName + "," + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be changed too to extraListeners.
Test build #37056 has finished for PR 6380 at commit
|
Test build #37069 has finished for PR 6380 at commit
|
} | ||
} catch { | ||
case e: Exception => | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This stop()
is not needed as the StreamingContext has not been started yet at this point. And in that case the whole try ... catch
is not needed, and the whole things can be simplified to a single throw new SparkException("Exception when registering StreamingListener: $className did not have a zero-argument constructor or a ..."
<td><code>spark.streaming.extraListeners</code></td> | ||
<td>(none)</td> | ||
<td> | ||
A comma-separated list of classes that implement <code>StreamingListener</code>; when initializing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect:
"when initializing SparkContext"? ==> StreamingContext
"Spark's streaming listener bus" ==> "StreamingContext's listener bus"
Test build #37157 has finished for PR 6380 at commit
|
Couple of more comments to address and it is good to go. |
@His-name-is-Joof ping! |
Working on it now. In the middle of moving apartments. Thanks for your On Wed, Jul 15, 2015 at 7:07 PM, Tathagata Das notifications@github.com
|
Test build #37548 has finished for PR 6380 at commit
|
@His-name-is-Joof I am not sure you realized, but the PR does not merge cleanly! It has conflicts, please update with the master branch |
val failingScc = new StreamingContext(failingConf, Seconds(1)) | ||
} | ||
val expectedErrorMessage = | ||
"Exception when registering Streaming Listener:" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets not be so specific, the exact text can change. Just verify whether the message has the name of the class inside it.
@His-name-is-Joof Mind updating this soon. Would be good to get this merged before the underlying code diverges further. |
Hey @His-name-is-Joof if you are not able to update this PR, mind if I take over merging it? |
Let's close this PR for now and reopen it later if needed. @tdas can take over. |
Add SparkConf option spark.streamingListeners to implement similar functionality to spark.extraListeners defined in [SPARK-5411].
Also modifies StatsReportListener to have a zero-valued constructor and adds a simple test