From d2440f297380dd5fcfc2d7da497b47fcbc8e56d6 Mon Sep 17 00:00:00 2001 From: Ahmed Mahran Date: Thu, 23 Jun 2016 02:06:30 +0200 Subject: [PATCH 1/3] Add a test case to capture the issue --- .../streaming/StreamingContextSuite.scala | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 806e181f61980..6a0622999056f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -18,10 +18,12 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} +import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.Queue +import org.apache.spark.rdd.BlockRDD + +import scala.collection.mutable.{ArrayBuffer, Queue} import org.apache.commons.io.FileUtils import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester} @@ -327,6 +329,50 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo } } + test("stop gracefully - clear all metadata") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + conf.set("spark.dummyTimeConfig", "3600s") + conf.set("spark.streaming.gracefulStopTimeout", "15s") + sc = new SparkContext(conf) + for (i <- 1 to 4) { + logInfo("==================================\n\n\n") + ssc = new StreamingContext(sc, Milliseconds(100)) + val checkpointDirectory = Utils.createTempDir().getAbsolutePath() + ssc.checkpoint(checkpointDirectory) + val rddsArray = new CopyOnWriteArrayList[BlockRDD[_]]() + TestReceiver.counter.set(1) + val input = ssc.receiverStream(new TestReceiver) + input.checkpoint(Seconds(1)).foreachRDD { rdd => + rdd match { + case blockRDD: BlockRDD[_] => + rddsArray.add(blockRDD) + blockRDD.count + } + } + ssc.start() + // wait for enough jobs to be generated + Thread.sleep(5000) + eventually(timeout(5.seconds), interval(10.millis)) { + assert(rddsArray.size > 0) + } + ssc.stop(stopSparkContext = false, stopGracefully = true) + Utils.deleteRecursively(new File(checkpointDirectory)) + val rdds = rddsArray.toArray.map(_.asInstanceOf[BlockRDD[_]]).toSeq + def validRDDsCount = rdds.count(_.isValid) + def invalidRDDsCount = rdds.count(!_.isValid) + def validRDDs = s"[${rdds.filter(_.isValid).mkString(", ")}]" + logInfo(s"Number of generated rdds: ${rdds.length}") + logInfo(s"Number of valid rdds: $validRDDsCount") + logInfo(s"Number of invalid rdds: $invalidRDDsCount") + logInfo(s"Valid rdds: $validRDDs") + assert(validRDDsCount == 0, s"All rdds should be invalid, " + + s"number of valid rdds = $validRDDsCount, " + + s"number of invalid rdds = $invalidRDDsCount, " + + s"valid rdds: $validRDDs") + Thread.sleep(100) + } + } + test("stop gracefully even if a receiver misses StopReceiver") { // This is not a deterministic unit. But if this unit test is flaky, then there is definitely // something wrong. See SPARK-5681 From c55a286278ae4403b850b9cea8b779d1e8f0abc3 Mon Sep 17 00:00:00 2001 From: Ahmed Mahran Date: Thu, 23 Jun 2016 02:07:32 +0200 Subject: [PATCH 2/3] Do a final cleaning of metadata when stopping streaming context gracefully --- .../org/apache/spark/streaming/scheduler/JobGenerator.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 19c88f1ee0114..ad8cc878731b0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -146,6 +146,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { Thread.sleep(pollTime) } logInfo("Waited for jobs to be processed and checkpoints to be written") + + // do final cleanup in the same thread + clearMetadata(Time(clock.getTimeMillis) + graph.getMaxInputStreamRememberDuration) } else { logInfo("Stopping JobGenerator immediately") // Stop timer and graph immediately, ignore unprocessed data and pending jobs From a12c9c8595dde7473e372542bace98045585aa7b Mon Sep 17 00:00:00 2001 From: Ahmed Mahran Date: Thu, 23 Jun 2016 23:50:50 +0200 Subject: [PATCH 3/3] Apply code review comments https://github.com/apache/spark/pull/13866 reviewers @HyukjinKwon @srowen --- .../streaming/StreamingContextSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 6a0622999056f..9c6a9b68bfc7f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -21,8 +21,7 @@ import java.io.{File, NotSerializableException} import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.AtomicInteger -import org.apache.spark.rdd.BlockRDD - +import scala.collection.JavaConversions import scala.collection.mutable.{ArrayBuffer, Queue} import org.apache.commons.io.FileUtils @@ -36,6 +35,7 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source +import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver @@ -339,13 +339,13 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc = new StreamingContext(sc, Milliseconds(100)) val checkpointDirectory = Utils.createTempDir().getAbsolutePath() ssc.checkpoint(checkpointDirectory) - val rddsArray = new CopyOnWriteArrayList[BlockRDD[_]]() + val generatedRDDs = new CopyOnWriteArrayList[BlockRDD[_]]() TestReceiver.counter.set(1) - val input = ssc.receiverStream(new TestReceiver) + val input = ssc.receiverStream(new TestReceiver()) input.checkpoint(Seconds(1)).foreachRDD { rdd => rdd match { case blockRDD: BlockRDD[_] => - rddsArray.add(blockRDD) + generatedRDDs.add(blockRDD) blockRDD.count } } @@ -353,15 +353,15 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo // wait for enough jobs to be generated Thread.sleep(5000) eventually(timeout(5.seconds), interval(10.millis)) { - assert(rddsArray.size > 0) + assert(!generatedRDDs.isEmpty) } ssc.stop(stopSparkContext = false, stopGracefully = true) Utils.deleteRecursively(new File(checkpointDirectory)) - val rdds = rddsArray.toArray.map(_.asInstanceOf[BlockRDD[_]]).toSeq - def validRDDsCount = rdds.count(_.isValid) - def invalidRDDsCount = rdds.count(!_.isValid) - def validRDDs = s"[${rdds.filter(_.isValid).mkString(", ")}]" - logInfo(s"Number of generated rdds: ${rdds.length}") + val rdds = JavaConversions.collectionAsScalaIterable(generatedRDDs) + val validRDDsCount = rdds.count(_.isValid) + val invalidRDDsCount = rdds.count(!_.isValid) + val validRDDs = s"[${rdds.filter(_.isValid).mkString(", ")}]" + logInfo(s"Number of generated rdds: ${rdds.size}") logInfo(s"Number of valid rdds: $validRDDsCount") logInfo(s"Number of invalid rdds: $invalidRDDsCount") logInfo(s"Valid rdds: $validRDDs")