From 30aa83ab791cd3bb3856d5b71322e66981054008 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 18 Sep 2015 21:46:23 +0800 Subject: [PATCH 1/7] Report the batch error to StreamingListener.onBatchCompleted --- .../spark/streaming/scheduler/BatchInfo.scala | 4 +- .../streaming/scheduler/JobScheduler.scala | 11 +++-- .../spark/streaming/scheduler/JobSet.scala | 5 ++- .../streaming/StreamingListenerSuite.scala | 45 +++++++++++++++++++ 4 files changed, 57 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 9922b6bc1201b..74b94a7ef69a7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -36,8 +36,8 @@ case class BatchInfo( streamIdToInputInfo: Map[Int, StreamInputInfo], submissionTime: Long, processingStartTime: Option[Long], - processingEndTime: Option[Long] - ) { + processingEndTime: Option[Long], + errorMessage: Option[String] = None) { @deprecated("Use streamIdToInputInfo instead", "1.5.0") def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 0cd39594ee923..8d7414b26c6e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -25,7 +25,7 @@ import scala.util.{Failure, Success} import org.apache.spark.Logging import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ -import org.apache.spark.util.{EventLoop, ThreadUtils} +import org.apache.spark.util.{EventLoop, ThreadUtils, Utils} private[scheduler] sealed trait JobSchedulerEvent @@ -120,7 +120,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { - listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) + listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo())) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) @@ -159,7 +159,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { if (isFirstJobOfJobSet) { // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the // correct "jobSet.processingStartTime". - listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo)) + listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo())) } logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) } @@ -177,9 +177,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) - listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) + listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo())) } case Failure(e) => + val jobSet = jobSets.get(job.time) + val errorMessage = Some(Utils.exceptionString(e)) + listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo(errorMessage))) reportError("Error running job " + job, e) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 95833efc9417f..043a5973a8340 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -61,13 +61,14 @@ case class JobSet( processingEndTime - time.milliseconds } - def toBatchInfo: BatchInfo = { + def toBatchInfo(errorMessage: Option[String] = None): BatchInfo = { new BatchInfo( time, streamIdToInputInfo, submissionTime, if (processingStartTime >= 0 ) Some(processingStartTime) else None, - if (processingEndTime >= 0 ) Some(processingEndTime) else None + if (processingEndTime >= 0 ) Some(processingEndTime) else None, + errorMessage ) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index d840c349bbbc4..f42ab03584e55 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -140,6 +140,51 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } + test("onBatchCompleted") { + ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) + inputStream.foreachRDD(_.count) + + @volatile var errorMessage: Option[String] = None + ssc.addStreamingListener(new StreamingListener { + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { + errorMessage = batchCompleted.batchInfo.errorMessage + } + }) + val batchCounter = new BatchCounter(ssc) + ssc.start() + // Make sure running at least one batch + batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) + ssc.stop() + assert(errorMessage.isEmpty, "A successful batch should not set errorMessage") + } + + test("onBatchCompleted: error") { + ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) + inputStream.foreachRDD { _ => + throw new RuntimeException("This is an unsuccessful batch") + } + + @volatile var errorMessage: Option[String] = None + ssc.addStreamingListener(new StreamingListener { + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { + errorMessage = batchCompleted.batchInfo.errorMessage + } + }) + val batchCounter = new BatchCounter(ssc) + ssc.start() + // Make sure running at least one batch + batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) + val e = intercept[RuntimeException] { + ssc.awaitTerminationOrTimeout(10000) + } + assert(e.getMessage === "This is an unsuccessful batch") + ssc.stop() + assert(errorMessage.nonEmpty && errorMessage.get.contains("This is an unsuccessful batch"), + "An unsuccessful batch should set errorMessage") + } + /** Check if a sequence of numbers is in increasing order */ def isInIncreasingOrder(seq: Seq[Long]): Boolean = { for (i <- 1 until seq.size) { From e87a1ca63f026a2af08a2d7aa26a40bfb907f451 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 23 Sep 2015 15:48:46 +0800 Subject: [PATCH 2/7] Expose failureReasons in BatchInfo --- .../spark/streaming/scheduler/BatchInfo.scala | 4 +- .../streaming/scheduler/JobScheduler.scala | 31 +++++----- .../spark/streaming/scheduler/JobSet.scala | 18 ++++-- .../streaming/StreamingListenerSuite.scala | 60 +++++++++++++++---- 4 files changed, 80 insertions(+), 33 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 74b94a7ef69a7..7992a16c2fd70 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -29,6 +29,8 @@ import org.apache.spark.streaming.Time * the streaming scheduler queue * @param processingStartTime Clock time of when the first job of this batch started processing * @param processingEndTime Clock time of when the last job of this batch finished processing + * @param failureReasons The failure reasons if there are any jobs in this batch failed. The key is + * `outputOpId` and the value is the failure reason. */ @DeveloperApi case class BatchInfo( @@ -37,7 +39,7 @@ case class BatchInfo( submissionTime: Long, processingStartTime: Option[Long], processingEndTime: Option[Long], - errorMessage: Option[String] = None) { + failureReasons: Map[Int, String] = Map.empty) { @deprecated("Use streamIdToInputInfo instead", "1.5.0") def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 8d7414b26c6e1..b259dfa7cfa8f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -120,7 +120,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { - listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo())) + listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) @@ -159,30 +159,27 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { if (isFirstJobOfJobSet) { // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the // correct "jobSet.processingStartTime". - listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo())) + listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo)) } logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) } private def handleJobCompletion(job: Job) { + val jobSet = jobSets.get(job.time) + jobSet.handleJobCompletion(job) + logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) + if (jobSet.hasCompleted) { + jobSets.remove(jobSet.time) + jobGenerator.onBatchCompletion(jobSet.time) + logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( + jobSet.totalDelay / 1000.0, jobSet.time.toString, + jobSet.processingDelay / 1000.0 + )) + listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) + } job.result match { case Success(_) => - val jobSet = jobSets.get(job.time) - jobSet.handleJobCompletion(job) - logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) - if (jobSet.hasCompleted) { - jobSets.remove(jobSet.time) - jobGenerator.onBatchCompletion(jobSet.time) - logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( - jobSet.totalDelay / 1000.0, jobSet.time.toString, - jobSet.processingDelay / 1000.0 - )) - listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo())) - } case Failure(e) => - val jobSet = jobSets.get(job.time) - val errorMessage = Some(Utils.exceptionString(e)) - listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo(errorMessage))) reportError("Error running job " + job, e) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 043a5973a8340..f888a54eef036 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -18,8 +18,10 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.HashSet +import scala.util.Failure import org.apache.spark.streaming.Time +import org.apache.spark.util.Utils /** Class representing a set of Jobs * belong to the same batch. @@ -61,14 +63,22 @@ case class JobSet( processingEndTime - time.milliseconds } - def toBatchInfo(errorMessage: Option[String] = None): BatchInfo = { + def toBatchInfo: BatchInfo = { + val failureReasons: Map[Int, String] = + if (hasCompleted) { + jobs.filter(_.result.isFailure).map { job => + (job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception)) + }.toMap + } else { + Map.empty + } new BatchInfo( time, streamIdToInputInfo, submissionTime, - if (processingStartTime >= 0 ) Some(processingStartTime) else None, - if (processingEndTime >= 0 ) Some(processingEndTime) else None, - errorMessage + if (processingStartTime >= 0) Some(processingStartTime) else None, + if (processingEndTime >= 0) Some(processingEndTime) else None, + failureReasons ) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index f42ab03584e55..82611a7c99ee9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -140,15 +140,15 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } - test("onBatchCompleted") { + test("onBatchCompleted with successful batch") { ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) - @volatile var errorMessage: Option[String] = None + @volatile var failureReasons: Map[Int, String] = null ssc.addStreamingListener(new StreamingListener { override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - errorMessage = batchCompleted.batchInfo.errorMessage + failureReasons = batchCompleted.batchInfo.failureReasons } }) val batchCounter = new BatchCounter(ssc) @@ -156,20 +156,21 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { // Make sure running at least one batch batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) ssc.stop() - assert(errorMessage.isEmpty, "A successful batch should not set errorMessage") + assert(failureReasons != null && failureReasons.isEmpty, + "A successful batch should not set errorMessage") } - test("onBatchCompleted: error") { + test("onBatchCompleted with failed batch and one failed job") { ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD { _ => - throw new RuntimeException("This is an unsuccessful batch") + throw new RuntimeException("This is a failed job") } - @volatile var errorMessage: Option[String] = None + @volatile var failureReasons: Map[Int, String] = null ssc.addStreamingListener(new StreamingListener { override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - errorMessage = batchCompleted.batchInfo.errorMessage + failureReasons = batchCompleted.batchInfo.failureReasons } }) val batchCounter = new BatchCounter(ssc) @@ -179,10 +180,47 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { val e = intercept[RuntimeException] { ssc.awaitTerminationOrTimeout(10000) } - assert(e.getMessage === "This is an unsuccessful batch") + assert(e.getMessage === "This is a failed job") ssc.stop() - assert(errorMessage.nonEmpty && errorMessage.get.contains("This is an unsuccessful batch"), - "An unsuccessful batch should set errorMessage") + // Check if failureReasons contains the correct error message + assert(failureReasons != null) + assert(failureReasons.size === 1) + assert(failureReasons.contains(0)) + assert(failureReasons(0).contains("This is a failed job")) + } + + test("onBatchCompleted with failed batch and multiple failed jobs") { + ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) + inputStream.foreachRDD { _ => + throw new RuntimeException("This is a failed job") + } + inputStream.foreachRDD { _ => + throw new RuntimeException("This is another failed job") + } + + @volatile var failureReasons: Map[Int, String] = null + ssc.addStreamingListener(new StreamingListener { + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { + failureReasons = batchCompleted.batchInfo.failureReasons + } + }) + val batchCounter = new BatchCounter(ssc) + ssc.start() + // Make sure running at least one batch + batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) + val e = intercept[RuntimeException] { + ssc.awaitTerminationOrTimeout(10000) + } + assert(e.getMessage === "This is another failed job") + ssc.stop() + // Check if failureReasons contains the correct error messages + assert(failureReasons != null) + assert(failureReasons.size === 2) + assert(failureReasons.contains(0)) + assert(failureReasons.contains(1)) + assert(failureReasons(0).contains("This is a failed job")) + assert(failureReasons(1).contains("This is another failed job")) } /** Check if a sequence of numbers is in increasing order */ From 693ff74da410ec14954774e75445a42980951bc8 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 23 Sep 2015 16:15:15 +0800 Subject: [PATCH 3/7] case Success(_) -> case _ --- .../org/apache/spark/streaming/scheduler/JobScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 901b8efe38d43..66afbf1b11764 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -179,9 +179,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } job.result match { - case Success(_) => case Failure(e) => reportError("Error running job " + job, e) + case _ => } } From a1439e0d87755c80b0f6b756ec8a76345c1f1453 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 23 Sep 2015 16:37:36 +0800 Subject: [PATCH 4/7] Keep failureReasons private[streaming] --- .../scala/org/apache/spark/streaming/scheduler/BatchInfo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 7992a16c2fd70..c4640a6006b64 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -39,7 +39,7 @@ case class BatchInfo( submissionTime: Long, processingStartTime: Option[Long], processingEndTime: Option[Long], - failureReasons: Map[Int, String] = Map.empty) { + private[streaming] val failureReasons: Map[Int, String] = Map.empty) { @deprecated("Use streamIdToInputInfo instead", "1.5.0") def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords) From f33cd9c6d529ced98dc16c75210db49c595e1ee3 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 23 Sep 2015 16:39:48 +0800 Subject: [PATCH 5/7] Fix the code style --- .../scala/org/apache/spark/streaming/scheduler/JobSet.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index f888a54eef036..39580d02d02fb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -64,7 +64,7 @@ case class JobSet( } def toBatchInfo: BatchInfo = { - val failureReasons: Map[Int, String] = + val failureReasons: Map[Int, String] = { if (hasCompleted) { jobs.filter(_.result.isFailure).map { job => (job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception)) @@ -72,6 +72,7 @@ case class JobSet( } else { Map.empty } + } new BatchInfo( time, streamIdToInputInfo, From fdf39eed9a745ced40fd36fccd3351e2f449ff4a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 23 Sep 2015 16:46:12 +0800 Subject: [PATCH 6/7] Add constructor and apply for binary compatibility --- .../spark/streaming/scheduler/BatchInfo.scala | 32 ++++++++++++++++++- .../spark/streaming/scheduler/JobSet.scala | 2 +- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index c4640a6006b64..a0bb4371b760f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -39,7 +39,20 @@ case class BatchInfo( submissionTime: Long, processingStartTime: Option[Long], processingEndTime: Option[Long], - private[streaming] val failureReasons: Map[Int, String] = Map.empty) { + private[streaming] val failureReasons: Map[Int, String]) { + + /** + * Create `BatchInfo`. This is for binary compatibility. + */ + def this( + batchTime: Time, + streamIdToInputInfo: Map[Int, StreamInputInfo], + submissionTime: Long, + processingStartTime: Option[Long], + processingEndTime: Option[Long]) { + this(batchTime, streamIdToInputInfo, submissionTime, processingStartTime, processingEndTime, + Map.empty) + } @deprecated("Use streamIdToInputInfo instead", "1.5.0") def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords) @@ -70,3 +83,20 @@ case class BatchInfo( */ def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum } + +@DeveloperApi +object BatchInfo { + + /** + * Create `BatchInfo`. This is for binary compatibility. + */ + def apply( + batchTime: Time, + streamIdToInputInfo: Map[Int, StreamInputInfo], + submissionTime: Long, + processingStartTime: Option[Long], + processingEndTime: Option[Long]): BatchInfo = { + BatchInfo(batchTime, streamIdToInputInfo, submissionTime, processingStartTime, + processingEndTime, Map.empty) + } +} \ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 39580d02d02fb..90530220e1f3c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -73,7 +73,7 @@ case class JobSet( Map.empty } } - new BatchInfo( + BatchInfo( time, streamIdToInputInfo, submissionTime, From af6562f41a45e682d61eb9b7abffdd5708d11a82 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 23 Sep 2015 17:09:50 +0800 Subject: [PATCH 7/7] Refactor unit tests to eliminate duplicate codes --- .../spark/streaming/scheduler/BatchInfo.scala | 2 +- .../streaming/StreamingListenerSuite.scala | 75 +++++++++---------- 2 files changed, 35 insertions(+), 42 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index a0bb4371b760f..cef34e417c1cf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -99,4 +99,4 @@ object BatchInfo { BatchInfo(batchTime, streamIdToInputInfo, submissionTime, processingStartTime, processingEndTime, Map.empty) } -} \ No newline at end of file +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 82611a7c99ee9..d8fd2ced3bf3b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -145,17 +145,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) - @volatile var failureReasons: Map[Int, String] = null - ssc.addStreamingListener(new StreamingListener { - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - failureReasons = batchCompleted.batchInfo.failureReasons - } - }) - val batchCounter = new BatchCounter(ssc) - ssc.start() - // Make sure running at least one batch - batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) - ssc.stop() + val failureReasons = startStreamingContextAndCollectFailureReasons(ssc) assert(failureReasons != null && failureReasons.isEmpty, "A successful batch should not set errorMessage") } @@ -167,22 +157,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { throw new RuntimeException("This is a failed job") } - @volatile var failureReasons: Map[Int, String] = null - ssc.addStreamingListener(new StreamingListener { - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - failureReasons = batchCompleted.batchInfo.failureReasons - } - }) - val batchCounter = new BatchCounter(ssc) - ssc.start() - // Make sure running at least one batch - batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) - val e = intercept[RuntimeException] { - ssc.awaitTerminationOrTimeout(10000) - } - assert(e.getMessage === "This is a failed job") - ssc.stop() // Check if failureReasons contains the correct error message + val failureReasons = startStreamingContextAndCollectFailureReasons(ssc, isFailed = true) assert(failureReasons != null) assert(failureReasons.size === 1) assert(failureReasons.contains(0)) @@ -199,22 +175,9 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { throw new RuntimeException("This is another failed job") } - @volatile var failureReasons: Map[Int, String] = null - ssc.addStreamingListener(new StreamingListener { - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - failureReasons = batchCompleted.batchInfo.failureReasons - } - }) - val batchCounter = new BatchCounter(ssc) - ssc.start() - // Make sure running at least one batch - batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) - val e = intercept[RuntimeException] { - ssc.awaitTerminationOrTimeout(10000) - } - assert(e.getMessage === "This is another failed job") - ssc.stop() // Check if failureReasons contains the correct error messages + val failureReasons = + startStreamingContextAndCollectFailureReasons(ssc, isFailed = true) assert(failureReasons != null) assert(failureReasons.size === 2) assert(failureReasons.contains(0)) @@ -223,6 +186,23 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { assert(failureReasons(1).contains("This is another failed job")) } + private def startStreamingContextAndCollectFailureReasons( + _ssc: StreamingContext, isFailed: Boolean = false): Map[Int, String] = { + val failureReasonsCollector = new FailureReasonsCollector() + _ssc.addStreamingListener(failureReasonsCollector) + val batchCounter = new BatchCounter(_ssc) + _ssc.start() + // Make sure running at least one batch + batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) + if (isFailed) { + intercept[RuntimeException] { + _ssc.awaitTerminationOrTimeout(10000) + } + } + _ssc.stop() + failureReasonsCollector.failureReasons + } + /** Check if a sequence of numbers is in increasing order */ def isInIncreasingOrder(seq: Seq[Long]): Boolean = { for (i <- 1 until seq.size) { @@ -288,3 +268,16 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O } def onStop() { } } + +/** + * A StreamingListener that saves the latest `failureReasons` in `BatchInfo` to the `failureReasons` + * field. + */ +class FailureReasonsCollector extends StreamingListener { + + @volatile var failureReasons: Map[Int, String] = null + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { + failureReasons = batchCompleted.batchInfo.failureReasons + } +}