From 46f15334236d2c68c35a3eadf3de52ec4cbaa4e7 Mon Sep 17 00:00:00 2001 From: ArcherShao Date: Thu, 26 Mar 2015 21:58:02 +0800 Subject: [PATCH 1/4] Check whether cores > receivers in local mode --- .../spark/streaming/StreamingContext.scala | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 543224d4b07bc..e51787a8e714e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -488,6 +488,8 @@ class StreamingContext private[streaming] ( assert(graph != null, "Graph is null") graph.validate() + assert(isCoresEnoughInLocalMode(), "No enough cores, Spark jobs will not get resources to " + + "process the received data") assert( checkpointDir == null || checkpointDuration != null, "Checkpoint directory has been set, but the graph checkpointing interval has " + @@ -495,6 +497,45 @@ class StreamingContext private[streaming] ( ) } + private def isCoresEnoughInLocalMode(): Boolean = { + val risNum = graph.getReceiverInputStreams().size + if (risNum == 0){ + true + } + + // Regular expression used for local[N] and local[*] master formats + val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r + // Regular expression for local[N, maxRetries], used in tests with failing tasks + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r + + val master = sc.conf.get("spark.master") + val coresPerTask = sc.conf.getInt("spark.task.cpus", 1) + val maxTaskNum = master match { + case "local" => 1 + + case LOCAL_N_REGEX(threads) => + def localCpuCount = Runtime.getRuntime.availableProcessors() + // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. + val threadCount = if (threads == "*") localCpuCount else threads.toInt + threadCount / coresPerTask + + case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => + def localCpuCount = Runtime.getRuntime.availableProcessors() + // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. + val threadCount = if (threads == "*") localCpuCount else threads.toInt + threadCount / coresPerTask + + case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => + coresPerSlave / coresPerTask * numSlaves + + case _ => Int.MaxValue + } + + maxTaskNum > risNum + } + /** * Start the execution of the streams. * From f5976d7a9a83f145045501cad13fb91b63cb0dd1 Mon Sep 17 00:00:00 2001 From: Chuan Shao Date: Thu, 26 Mar 2015 22:02:59 +0800 Subject: [PATCH 2/4] update --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index e51787a8e714e..1ba09d5bfcbc6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -513,7 +513,7 @@ class StreamingContext private[streaming] ( val master = sc.conf.get("spark.master") val coresPerTask = sc.conf.getInt("spark.task.cpus", 1) val maxTaskNum = master match { - case "local" => 1 + case "local" => 1 / coresPerTask case LOCAL_N_REGEX(threads) => def localCpuCount = Runtime.getRuntime.availableProcessors() From 8cf26b1a1a4b1e311077146e992f64cd701aa7b8 Mon Sep 17 00:00:00 2001 From: Chuan Shao Date: Fri, 27 Mar 2015 10:09:55 +0800 Subject: [PATCH 3/4] resolved build error --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1ba09d5bfcbc6..69fe30d0bfbcc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -489,7 +489,7 @@ class StreamingContext private[streaming] ( graph.validate() assert(isCoresEnoughInLocalMode(), "No enough cores, Spark jobs will not get resources to " + - "process the received data") + "process the received data.") assert( checkpointDir == null || checkpointDuration != null, "Checkpoint directory has been set, but the graph checkpointing interval has " + @@ -528,7 +528,7 @@ class StreamingContext private[streaming] ( threadCount / coresPerTask case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => - coresPerSlave / coresPerTask * numSlaves + coresPerSlave.toInt / coresPerTask * numSlaves.toInt case _ => Int.MaxValue } From 197a0304c70ef5262e30065fbec12595c1c72cb1 Mon Sep 17 00:00:00 2001 From: ArcherShao Date: Fri, 27 Mar 2015 11:40:29 +0800 Subject: [PATCH 4/4] modify relate UT --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 2e5005ef6ff14..f449558b8d96f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -404,7 +404,7 @@ object SlowTestReceiver { /** Streaming application for testing DStream and RDD creation sites */ package object testPackage extends Assertions { def test() { - val conf = new SparkConf().setMaster("local").setAppName("CreationSite test") + val conf = new SparkConf().setMaster("local[2]").setAppName("CreationSite test") val ssc = new StreamingContext(conf , Milliseconds(100)) try { val inputStream = ssc.receiverStream(new TestReceiver)