From aafe7b62f80ff1e86f6c528711d24e7de54536c5 Mon Sep 17 00:00:00 2001 From: wangjiasheng Date: Mon, 27 Nov 2017 16:28:48 +0800 Subject: [PATCH 1/2] Revert "[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset from JobScheduler.jobSets" This reverts commit f8db8945f25cb884278ff8841bac5f6f28f0dec6. --- .../spark/streaming/scheduler/JobScheduler.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 2fa3bf7d5230b..b7d114bc16d48 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -201,20 +201,18 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { + jobSets.remove(jobSet.time) + jobGenerator.onBatchCompletion(jobSet.time) + logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( + jobSet.totalDelay / 1000.0, jobSet.time.toString, + jobSet.processingDelay / 1000.0 + )) listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => - if (jobSet.hasCompleted) { - jobSets.remove(jobSet.time) - jobGenerator.onBatchCompletion(jobSet.time) - logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( - jobSet.totalDelay / 1000.0, jobSet.time.toString, - jobSet.processingDelay / 1000.0 - )) - } } } From e4e57cca9b0d21db8ad6292f8fcbde2dd316d7b7 Mon Sep 17 00:00:00 2001 From: wangjiasheng Date: Mon, 27 Nov 2017 16:31:24 +0800 Subject: [PATCH 2/2] [SPARK][STREAMING] Invoke onBatchCompletion() only when all jobs in the JobSet are Success --- .../scala/org/apache/spark/streaming/scheduler/JobSet.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 0baedaf275d67..4f4deb08856de 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -44,7 +44,9 @@ case class JobSet( } def handleJobCompletion(job: Job) { - incompleteJobs -= job + if (job.result.isSuccess) { + incompleteJobs -= job + } if (hasCompleted) processingEndTime = System.currentTimeMillis() }