From bd77f3cd5a2fac5d66dd4d89b0db176345cad9e3 Mon Sep 17 00:00:00 2001 From: Jacker Hu Date: Wed, 23 Sep 2015 15:02:09 +0800 Subject: [PATCH 1/9] Fix SPARK-10772: using Option in TransformedDStream instead of Some --- .../spark/streaming/dstream/TransformedDStream.scala | 2 +- .../apache/spark/streaming/BasicOperationsSuite.scala | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 5d46ca0715ffd..2199407e46b0a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -38,6 +38,6 @@ class TransformedDStream[U: ClassTag] ( override def compute(validTime: Time): Option[RDD[U]] = { val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq - Some(transformFunc(parentRDDs, validTime)) + Option(transformFunc(parentRDDs, validTime)) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 255376807c957..6dd13a207c027 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -210,6 +210,16 @@ class BasicOperationsSuite extends TestSuiteBase { input.map(_.map(_.toString)) ) } + + test("transform with NULL") { + val input = Seq(1 to 4, Seq(), 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.transform(rdd => {if (rdd != null && !rdd.isEmpty()) rdd.map(_.toString) else null}), // RDD.map in transform + input.filter(!_.isEmpty).map(_.map(_.toString)) + ) + } + test("transformWith") { val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) From c1e00ec88ac3c506f78d1e98fcfcc9a1c8e55a70 Mon Sep 17 00:00:00 2001 From: Jacker Hu Date: Wed, 23 Sep 2015 15:22:57 +0800 Subject: [PATCH 2/9] Fix unit test failure --- .../org/apache/spark/streaming/BasicOperationsSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 6dd13a207c027..9617377fa6504 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -216,7 +216,9 @@ class BasicOperationsSuite extends TestSuiteBase { testOperation( input, (r: DStream[Int]) => r.transform(rdd => {if (rdd != null && !rdd.isEmpty()) rdd.map(_.toString) else null}), // RDD.map in transform - input.filter(!_.isEmpty).map(_.map(_.toString)) + input.filter(!_.isEmpty).map(_.map(_.toString)), + 4, + false ) } From 4e5a97f84809937bfc0376182c624140e6cd1ff0 Mon Sep 17 00:00:00 2001 From: Jacker Hu Date: Fri, 25 Sep 2015 14:23:04 +0800 Subject: [PATCH 3/9] Refix the tranform NPE, throw exception when tranform function returns NULL --- .../dstream/TransformedDStream.scala | 9 ++++++++- .../streaming/BasicOperationsSuite.scala | 19 ++++++++++--------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 2199407e46b0a..0e1037e71c8c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.dstream +import org.apache.spark.SparkException import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag @@ -38,6 +39,12 @@ class TransformedDStream[U: ClassTag] ( override def compute(validTime: Time): Option[RDD[U]] = { val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq - Option(transformFunc(parentRDDs, validTime)) + val transformedRDD = transformFunc(parentRDDs, validTime) + if (transformedRDD == null) { + throw new SparkException("Transform function return null instead of an RDD; " + + "this should be avoided . You can return an empty RDD using RDD.empty " + + "if you dont want to return anything.") + } + Some(transformedRDD) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 9617377fa6504..cfbbf93c1d696 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -212,17 +212,18 @@ class BasicOperationsSuite extends TestSuiteBase { } test("transform with NULL") { - val input = Seq(1 to 4, Seq(), 5 to 8, 9 to 12) - testOperation( - input, - (r: DStream[Int]) => r.transform(rdd => {if (rdd != null && !rdd.isEmpty()) rdd.map(_.toString) else null}), // RDD.map in transform - input.filter(!_.isEmpty).map(_.map(_.toString)), - 4, - false - ) + val input = Seq(1 to 4) + intercept[SparkException] { + testOperation( + input, + (r: DStream[Int]) => r.transform(rdd => null.asInstanceOf[RDD[Int]]), + Seq(Seq()), + 1, + false + ) + } } - test("transformWith") { val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) From d068000a234bf47f7d5a38c8a7474ad66c87e086 Mon Sep 17 00:00:00 2001 From: Jacker Hu Date: Mon, 28 Sep 2015 11:17:00 +0800 Subject: [PATCH 4/9] Change the exception message according @srowen's comments --- .../apache/spark/streaming/dstream/TransformedDStream.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 0e1037e71c8c9..50ab6d1422ba6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -41,9 +41,8 @@ class TransformedDStream[U: ClassTag] ( val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq val transformedRDD = transformFunc(parentRDDs, validTime) if (transformedRDD == null) { - throw new SparkException("Transform function return null instead of an RDD; " + - "this should be avoided . You can return an empty RDD using RDD.empty " + - "if you dont want to return anything.") + throw new SparkException("Transform function may not return null. " + + " Return RDD.empty to return no elements as the result of the transformation.") } Some(transformedRDD) } From 81b09167c6dd3b412ffb229fe557075e75cf97dd Mon Sep 17 00:00:00 2001 From: Jacker Hu Date: Mon, 28 Sep 2015 15:32:12 +0800 Subject: [PATCH 5/9] Fix the space --- .../org/apache/spark/streaming/dstream/TransformedDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 50ab6d1422ba6..57ccd0a2b6573 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -42,7 +42,7 @@ class TransformedDStream[U: ClassTag] ( val transformedRDD = transformFunc(parentRDDs, validTime) if (transformedRDD == null) { throw new SparkException("Transform function may not return null. " + - " Return RDD.empty to return no elements as the result of the transformation.") + "Return RDD.empty to return no elements as the result of the transformation.") } Some(transformedRDD) } From 0d660ce1c8ad953b20ffc78f5056e701f3e45e21 Mon Sep 17 00:00:00 2001 From: jhu-chang Date: Thu, 8 Oct 2015 12:40:56 +0800 Subject: [PATCH 6/9] Fix scala style and change the may to must in msg --- .../apache/spark/streaming/dstream/TransformedDStream.scala | 2 +- .../org/apache/spark/streaming/BasicOperationsSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 57ccd0a2b6573..b48ea895104db 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -41,7 +41,7 @@ class TransformedDStream[U: ClassTag] ( val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq val transformedRDD = transformFunc(parentRDDs, validTime) if (transformedRDD == null) { - throw new SparkException("Transform function may not return null. " + + throw new SparkException("Transform function must not return null. " + "Return RDD.empty to return no elements as the result of the transformation.") } Some(transformedRDD) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index cfbbf93c1d696..9988f410f0bc1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -210,7 +210,7 @@ class BasicOperationsSuite extends TestSuiteBase { input.map(_.map(_.toString)) ) } - + test("transform with NULL") { val input = Seq(1 to 4) intercept[SparkException] { @@ -218,7 +218,7 @@ class BasicOperationsSuite extends TestSuiteBase { input, (r: DStream[Int]) => r.transform(rdd => null.asInstanceOf[RDD[Int]]), Seq(Seq()), - 1, + 1, false ) } From 119ad81f8f3c49949012b3d509032f2a2ee24261 Mon Sep 17 00:00:00 2001 From: jhu-chang Date: Thu, 8 Oct 2015 17:34:16 +0800 Subject: [PATCH 7/9] rearrange the import order of SparkException --- .../org/apache/spark/streaming/dstream/TransformedDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index b48ea895104db..e4c1bfdd98a20 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -17,8 +17,8 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.SparkException import org.apache.spark.rdd.{PairRDDFunctions, RDD} +import org.apache.spark.SparkException import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag From 11036e1d70d02ccf7a09bda4b87a9717bb7dead8 Mon Sep 17 00:00:00 2001 From: jhu-chang Date: Fri, 9 Oct 2015 11:20:45 +0800 Subject: [PATCH 8/9] correct the exception message --- .../apache/spark/streaming/dstream/TransformedDStream.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index e4c1bfdd98a20..c6afd0d12f4b9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -42,7 +42,8 @@ class TransformedDStream[U: ClassTag] ( val transformedRDD = transformFunc(parentRDDs, validTime) if (transformedRDD == null) { throw new SparkException("Transform function must not return null. " + - "Return RDD.empty to return no elements as the result of the transformation.") + "Return SparkContext.emptyRDD() instead to represent no element " + + "as the result of transformation.") } Some(transformedRDD) } From 2cc4faba0da2f8a137ab3c00ce9da32cbf37126e Mon Sep 17 00:00:00 2001 From: jhu-chang Date: Fri, 9 Oct 2015 11:28:27 +0800 Subject: [PATCH 9/9] reorder the import --- .../apache/spark/streaming/dstream/TransformedDStream.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index c6afd0d12f4b9..ab01f47d5cf99 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -17,10 +17,11 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.rdd.{PairRDDFunctions, RDD} +import scala.reflect.ClassTag + import org.apache.spark.SparkException +import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming.{Duration, Time} -import scala.reflect.ClassTag private[streaming] class TransformedDStream[U: ClassTag] (