From ded210be4bb7c01f6df1ede6269cfb2b1db325d2 Mon Sep 17 00:00:00 2001 From: Jian Feng Date: Wed, 16 Sep 2015 10:18:55 +0800 Subject: [PATCH 1/7] [SPARK-10577] [PySpark] DataFrame hint for broadcast join https://issues.apache.org/jira/browse/SPARK-10577 --- python/pyspark/sql/functions.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 26b8662718a60..fe5a5b5e6a5be 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -29,6 +29,7 @@ from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.sql.types import StringType from pyspark.sql.column import Column, _to_java_column, _to_seq +from pyspark.sql.dataframe import DataFrame def _create_function(name, doc=""): @@ -189,6 +190,14 @@ def approxCountDistinct(col, rsd=None): return Column(jc) +@since(1.6) +def broadcast(df): + """Marks a DataFrame as small enough for use in broadcast joins.""" + + sc = SparkContext._active_spark_context + return DataFrame(sc._jvm.functions.broadcast(df._jdf),sc._jsc) + + @since(1.4) def coalesce(*cols): """Returns the first column that is not null. From d1d8881e4117abe9a3a46155a793f3d2ce624ca0 Mon Sep 17 00:00:00 2001 From: Jian Feng Date: Wed, 16 Sep 2015 11:20:51 +0800 Subject: [PATCH 2/7] [SPARK-10577] [PySpark] DataFrame hint for broadcast join #8777 https://issues.apache.org/jira/browse/SPARK-10577 add space after comma --- python/pyspark/sql/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index fe5a5b5e6a5be..bb8d4fbc5a5cc 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -195,7 +195,7 @@ def broadcast(df): """Marks a DataFrame as small enough for use in broadcast joins.""" sc = SparkContext._active_spark_context - return DataFrame(sc._jvm.functions.broadcast(df._jdf),sc._jsc) + return DataFrame(sc._jvm.functions.broadcast(df._jdf), sc._jsc) @since(1.4) From 8f3753cfe2f8655473d8e70802307c99f1af3640 Mon Sep 17 00:00:00 2001 From: Jian Feng Date: Wed, 16 Sep 2015 22:07:17 +0800 Subject: [PATCH 3/7] [SPARK-10577] [PySpark] DataFrame hint for broadcast join https://issues.apache.org/jira/browse/SPARK-10577 Adding a test for broadcast join hint --- python/pyspark/sql/tests.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index f2172b7a27d88..0aa229615570d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1075,6 +1075,23 @@ def foo(): self.assertRaises(TypeError, foo) + # add test for SPARK-10577 (test broadcast join hint) + def test_functions_broadcast(self): + from pyspark.sql.functions import broadcast + + df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value")) + df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value")) + + # equijoin - should be converted into broadcast join + self.assertEqual(1, df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan().toString().count("BroadcastHashJoin")) + + # no join key -- should not be a broadcast join + self.assertEqual(0, df1.join(broadcast(df2))._jdf.queryExecution().executedPlan().toString().count("BroadcastHashJoin")) + + # planner should not crash without a join + broadcast(df1)._jdf.queryExecution().executedPlan() + + class HiveContextSQLTests(ReusedPySparkTestCase): From b8daa763229b65735640fac35438f34e12a50dbe Mon Sep 17 00:00:00 2001 From: Jian Feng Date: Thu, 17 Sep 2015 21:58:14 +0800 Subject: [PATCH 4/7] [SPARK-10663] [ML] [MLLIB] [DOCS] Change test.toDF to test in Spark ML Programming Guide https://issues.apache.org/jira/browse/SPARK-10663 --- docs/ml-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 78c93a95c7807..a103892125540 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -466,7 +466,7 @@ val test = sqlContext.createDataFrame(Seq( )).toDF("id", "text") // Make predictions on test documents. -model.transform(test.toDF) +model.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => From 8de159b94bed029d4223961809545fb1025cf8b1 Mon Sep 17 00:00:00 2001 From: Jianfeng-chs Date: Thu, 17 Sep 2015 22:11:26 +0800 Subject: [PATCH 5/7] Update ml-guide.md --- docs/ml-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ml-guide.md b/docs/ml-guide.md index a103892125540..78c93a95c7807 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -466,7 +466,7 @@ val test = sqlContext.createDataFrame(Seq( )).toDF("id", "text") // Make predictions on test documents. -model.transform(test) +model.transform(test.toDF) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => From 198f0e652770569746ffbc3537d91a30dfa2eee1 Mon Sep 17 00:00:00 2001 From: Jian Feng Date: Thu, 17 Sep 2015 22:31:23 +0800 Subject: [PATCH 6/7] Revert "[SPARK-10663] [ML] [MLLIB] [DOCS] Change test.toDF to test in Spark ML Programming Guide" This reverts commit b8daa763229b65735640fac35438f34e12a50dbe. --- docs/ml-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 26aeca70b9fb7..c5d7f990021f1 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -475,7 +475,7 @@ val test = sqlContext.createDataFrame(Seq( )).toDF("id", "text") // Make predictions on test documents. -model.transform(test) +model.transform(test.toDF) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => From 9958c647375a3069a5c017e11d32e64ce2a1230c Mon Sep 17 00:00:00 2001 From: Jian Feng Date: Thu, 17 Sep 2015 22:31:56 +0800 Subject: [PATCH 7/7] Revert "Revert "[SPARK-10663] [ML] [MLLIB] [DOCS] Change test.toDF to test in Spark ML Programming Guide"" This reverts commit 198f0e652770569746ffbc3537d91a30dfa2eee1. --- docs/ml-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ml-guide.md b/docs/ml-guide.md index c5d7f990021f1..26aeca70b9fb7 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -475,7 +475,7 @@ val test = sqlContext.createDataFrame(Seq( )).toDF("id", "text") // Make predictions on test documents. -model.transform(test.toDF) +model.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>