Skip to content

Commit

Permalink
Changes in clarity, removed redundant error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffrharr committed Jul 13, 2015
1 parent af41098 commit 9ec68c1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -544,47 +544,39 @@ class StreamingContext private[streaming] (
* Registers streamingListeners specified in spark.streaming.extraListeners
*/
private def setupStreamingListeners(): Unit = {
// Use reflection to instantiate listeners specified via `spark.extraListeners`
try {
val listenerClassNames: Seq[String] =
conf.get("spark.streaming.extraListeners", "").split(',').map(_.trim).filter(_ != "")
for (className <- listenerClassNames) {
// Use reflection to find the right constructor
val constructors = {
val listenerClass = org.apache.spark.util.Utils.classForName(className)
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]]
}
val constructorTakingSparkConf = constructors.find { c =>
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
}
lazy val zeroArgumentConstructor = constructors.find { c =>
c.getParameterTypes.isEmpty
}
val listener: StreamingListener = {
if (constructorTakingSparkConf.isDefined) {
constructorTakingSparkConf.get.newInstance(conf)
} else if (zeroArgumentConstructor.isDefined) {
zeroArgumentConstructor.get.newInstance()
} else {
throw new SparkException(
s"$className did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf. Note: if the class is" +
" defined inside of another Scala class, then its constructors may accept an" +
" implicit parameter that references the enclosing class; in this case, you must" +
" define the listener as a top-level class in order to prevent this extra" +
" parameter from breaking Spark's ability to find a valid constructor.")
}
}
addStreamingListener(listener)
logInfo(s"Registered StreamingListener $className")
// Use reflection to instantiate listeners specified via `spark.streaming.extraListeners`
val listenerClassNames: Seq[String] =
conf.get("spark.streaming.extraListeners", "").split(',').map(_.trim).filter(_ != "")
for (className <- listenerClassNames) {
// Use reflection to find the right constructor
val constructors = {
val listenerClass = org.apache.spark.util.Utils.classForName(className)
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]]
}
val constructorTakingSparkConf = constructors.find { c =>
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
}
} catch {
case e: Exception =>
try {
stop()
} finally {
throw new SparkException(s"Exception when registering StreamingListener", e)
lazy val zeroArgumentConstructor = constructors.find { c =>
c.getParameterTypes.isEmpty
}
val listener: StreamingListener = {
if (constructorTakingSparkConf.isDefined) {
constructorTakingSparkConf.get.newInstance(conf)
} else if (zeroArgumentConstructor.isDefined) {
zeroArgumentConstructor.get.newInstance()
} else {
throw new SparkException(
s"Exception when registering Streaming Listener:" +
" $className did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf. Note: if the class is" +
" defined inside of another Scala class, then its constructors may accept an" +
" implicit parameter that references the enclosing class; in this case, you must" +
" define the listener as a top-level class in order to prevent this extra" +
" parameter from breaking Spark's ability to find a valid constructor.")
}
}
addStreamingListener(listener)
logInfo(s"Registered StreamingListener $className")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
classOf[ReceiverInfoCollector].getName)
val scc = new StreamingContext(conf, Seconds(1))

scc.scheduler.listenerBus.listeners.collect {
case x: BatchInfoCollector => x }.size should be (1)
scc.scheduler.listenerBus.listeners.collect {
case x: ReceiverInfoCollector => x }.size should be (1)
scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[BatchInfoCollector] }
scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[ReceiverInfoCollector] }
}
}

Expand Down

0 comments on commit 9ec68c1

Please sign in to comment.