From cc90015e12a6229ce72196e4747ca3d0655abeb4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 4 Jun 2015 02:41:57 +0800 Subject: [PATCH 1/4] Add methods to facilitate equi-join on multiple joining keys. --- python/pyspark/sql/dataframe.py | 32 ++++++++++---- .../org/apache/spark/sql/DataFrame.scala | 43 +++++++++++++++---- .../apache/spark/sql/DataFrameJoinSuite.scala | 18 ++++++++ 3 files changed, 76 insertions(+), 17 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 7673153abe0e2..79c051f618e34 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -509,30 +509,42 @@ def join(self, other, joinExprs=None, joinType=None): The following performs a full outer join between ``df1`` and ``df2``. :param other: Right side of the join - :param joinExprs: a string for join column name, or a join expression (Column). - If joinExprs is a string indicating the name of the join column, - the column must exist on both sides, and this performs an inner equi-join. + :param joinExprs: a string for join column name, a list of column names, + , a join expression (Column) or a list of Columns. + If joinExprs is a string or a list of string indicating the name of the join column(s), + the column(s) must exist on both sides, and this performs an inner equi-join. :param joinType: str, default 'inner'. One of `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`. >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect() [Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] + >>> cond = [df.name == df3.name, df.age == df3.age] + >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() + [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] + >>> df.join(df2, 'name').select(df.name, df2.height).collect() [Row(name=u'Bob', height=85)] + + >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect() + [Row(name=u'Bob', age=5)] """ - if joinExprs is None: + if joinExprs is not None and not isinstance(joinExprs, list): + joinExprs = [joinExprs] + + if joinExprs is None or len(joinExprs) == 0: jdf = self._jdf.join(other._jdf) - elif isinstance(joinExprs, basestring): - jdf = self._jdf.join(other._jdf, joinExprs) + + if isinstance(joinExprs[0], basestring): + jdf = self._jdf.join(other._jdf, self._jseq(joinExprs)) else: - assert isinstance(joinExprs, Column), "joinExprs should be Column" + assert isinstance(joinExprs[0], Column), "joinExprs should be Column or list of Column" if joinType is None: - jdf = self._jdf.join(other._jdf, joinExprs._jc) + jdf = self._jdf.join(other._jdf, self._jcols(joinExprs), "inner") else: assert isinstance(joinType, basestring), "joinType should be basestring" - jdf = self._jdf.join(other._jdf, joinExprs._jc, joinType) + jdf = self._jdf.join(other._jdf, self._jcols(joinExprs), joinType) return DataFrame(jdf, self.sql_ctx) @ignore_unicode_prefix @@ -1291,6 +1303,8 @@ def _test(): .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF() + globs['df3'] = sc.parallelize([Row(name='Alice', age=2), + Row(name='Bob', age=5)]).toDF() globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80), Row(name='Bob', age=5, height=None), Row(name='Tom', age=None, height=None), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 034d887901975..c82a993ce9ec0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -395,22 +395,35 @@ class DataFrame private[sql]( * @since 1.4.0 */ def join(right: DataFrame, usingColumn: String): DataFrame = { + join(right, Seq(usingColumn)) + } + + def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // by creating a new instance for one of the branch. val joined = sqlContext.executePlan( Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join] - // Project only one of the join column. - val joinedCol = joined.right.resolve(usingColumn) + // Project only one of the join columns. + val joinedCols = usingColumns.map(col => joined.right.resolve(col)) + val condition = usingColumns.map { col => + catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col)) + }.foldLeft[Option[catalyst.expressions.BinaryExpression]](None) { (opt, eqTo) => + opt match { + case Some(cond) => + Some(catalyst.expressions.And(cond, eqTo)) + case None => + Some(eqTo) + } + } + Project( - joined.output.filterNot(_ == joinedCol), + joined.output.filterNot(joinedCols.contains(_)), Join( joined.left, joined.right, joinType = Inner, - Some(catalyst.expressions.EqualTo( - joined.left.resolve(usingColumn), - joined.right.resolve(usingColumn)))) + condition) ) } @@ -425,7 +438,7 @@ class DataFrame private[sql]( * @group dfops * @since 1.3.0 */ - def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner") + def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, Seq(joinExprs), "inner") /** * Join with another [[DataFrame]], using the given join expression. The following performs @@ -448,6 +461,10 @@ class DataFrame private[sql]( * @since 1.3.0 */ def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = { + join(right, Seq(joinExprs), joinType) + } + + def join(right: DataFrame, joinExprs: Seq[Column], joinType: String): DataFrame = { // Note that in this function, we introduce a hack in the case of self-join to automatically // resolve ambiguous join conditions into ones that might make sense [SPARK-6231]. // Consider this case: df.join(df, df("key") === df("key")) @@ -458,7 +475,17 @@ class DataFrame private[sql]( // Trigger analysis so in the case of self-join, the analyzer will clone the plan. // After the cloning, left and right side will have distinct expression ids. - val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)) + val condition = joinExprs + .foldLeft[Option[catalyst.expressions.Expression]](None) { (opt, condNext) => + opt match { + case Some(cond) => + Some(catalyst.expressions.And(cond, condNext.expr)) + case None => + Some(condNext.expr) + } + } + + val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), condition) .queryExecution.analyzed.asInstanceOf[Join] // If auto self join alias is disabled, return the plan. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 787f3f175fea2..0f38a171cc2cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -34,6 +34,15 @@ class DataFrameJoinSuite extends QueryTest { Row(1, "1", "2") :: Row(2, "2", "3") :: Row(3, "3", "4") :: Nil) } + test("join - join using multiple columns") { + val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str") + val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str") + + checkAnswer( + df.join(df2, Seq("int", "int2")), + Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil) + } + test("join - join using self join") { val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") @@ -52,6 +61,15 @@ class DataFrameJoinSuite extends QueryTest { sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq) } + test("join - self join multiple columns") { + val df1 = testData.as('df1) + val df2 = testData.as('df2) + + checkAnswer( + df1.join(df2, Seq($"df1.key" === $"df2.key", $"df1.value" === $"df2.value"), "inner"), + sql("SELECT a.key, a.value, b.key, b.value FROM testData a JOIN testData b ON a.key = b.key AND a.value = b.value").collect().toSeq) + } + test("join - using aliases after self join") { val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") checkAnswer( From 0400e8990d47d1a4fc8e0af6ef722bd633d2a49a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 4 Jun 2015 02:59:43 +0800 Subject: [PATCH 2/4] Fix scala style. --- .../test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 0f38a171cc2cb..d31c35dda2f0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -67,7 +67,8 @@ class DataFrameJoinSuite extends QueryTest { checkAnswer( df1.join(df2, Seq($"df1.key" === $"df2.key", $"df1.value" === $"df2.value"), "inner"), - sql("SELECT a.key, a.value, b.key, b.value FROM testData a JOIN testData b ON a.key = b.key AND a.value = b.value").collect().toSeq) + sql("SELECT a.key, a.value, b.key, b.value FROM testData a JOIN testData b " + + "ON a.key = b.key AND a.value = b.value").collect().toSeq) } test("join - using aliases after self join") { From c43722ced91d4ae97cf4bad914ca197c82afd451 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 4 Jun 2015 21:16:50 +0800 Subject: [PATCH 3/4] For comments. --- python/pyspark/sql/dataframe.py | 32 +++++++------ .../org/apache/spark/sql/DataFrame.scala | 47 ++++++++++--------- .../apache/spark/sql/DataFrameJoinSuite.scala | 10 ---- 3 files changed, 42 insertions(+), 47 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 79c051f618e34..9a1e828fec7b0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -503,17 +503,17 @@ def alias(self, alias): @ignore_unicode_prefix @since(1.3) - def join(self, other, joinExprs=None, joinType=None): + def join(self, other, on=None, how=None): """Joins with another :class:`DataFrame`, using the given join expression. The following performs a full outer join between ``df1`` and ``df2``. :param other: Right side of the join - :param joinExprs: a string for join column name, a list of column names, + :param on: a string for join column name, a list of column names, , a join expression (Column) or a list of Columns. - If joinExprs is a string or a list of string indicating the name of the join column(s), + If `on` is a string or a list of string indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an inner equi-join. - :param joinType: str, default 'inner'. + :param how: str, default 'inner'. One of `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`. >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect() @@ -530,21 +530,25 @@ def join(self, other, joinExprs=None, joinType=None): [Row(name=u'Bob', age=5)] """ - if joinExprs is not None and not isinstance(joinExprs, list): - joinExprs = [joinExprs] + if on is not None and not isinstance(on, list): + on = [on] - if joinExprs is None or len(joinExprs) == 0: + if on is None or len(on) == 0: jdf = self._jdf.join(other._jdf) - if isinstance(joinExprs[0], basestring): - jdf = self._jdf.join(other._jdf, self._jseq(joinExprs)) + if isinstance(on[0], basestring): + jdf = self._jdf.join(other._jdf, self._jseq(on)) else: - assert isinstance(joinExprs[0], Column), "joinExprs should be Column or list of Column" - if joinType is None: - jdf = self._jdf.join(other._jdf, self._jcols(joinExprs), "inner") + assert isinstance(on[0], Column), "on should be Column or list of Column" + if len(on) > 1: + on = reduce(lambda x, y: x.__and__(y), on) else: - assert isinstance(joinType, basestring), "joinType should be basestring" - jdf = self._jdf.join(other._jdf, self._jcols(joinExprs), joinType) + on = on[0] + if how is None: + jdf = self._jdf.join(other._jdf, on._jc, "inner") + else: + assert isinstance(how, basestring), "how should be basestring" + jdf = self._jdf.join(other._jdf, on._jc, how) return DataFrame(jdf, self.sql_ctx) @ignore_unicode_prefix diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index c82a993ce9ec0..6e3e3b001f5bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -398,6 +398,26 @@ class DataFrame private[sql]( join(right, Seq(usingColumn)) } + /** + * Inner equi-join with another [[DataFrame]] using the given columns. + * + * Different from other join functions, the join columns will only appear once in the output, + * i.e. similar to SQL's `JOIN USING` syntax. + * + * {{{ + * // Joining df1 and df2 using the columns "user_id" and "user_name" + * df1.join(df2, Seq("user_id", "user_name")) + * }}} + * + * Note that if you perform a self-join using this function without aliasing the input + * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since + * there is no way to disambiguate which side of the join you would like to reference. + * + * @param right Right side of the join operation. + * @param usingColumns Names of the columns to join on. This columns must exist on both sides. + * @group dfops + * @since 1.4.0 + */ def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // by creating a new instance for one of the branch. @@ -408,13 +428,8 @@ class DataFrame private[sql]( val joinedCols = usingColumns.map(col => joined.right.resolve(col)) val condition = usingColumns.map { col => catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col)) - }.foldLeft[Option[catalyst.expressions.BinaryExpression]](None) { (opt, eqTo) => - opt match { - case Some(cond) => - Some(catalyst.expressions.And(cond, eqTo)) - case None => - Some(eqTo) - } + }.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) => + catalyst.expressions.And(cond, eqTo) } Project( @@ -438,7 +453,7 @@ class DataFrame private[sql]( * @group dfops * @since 1.3.0 */ - def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, Seq(joinExprs), "inner") + def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner") /** * Join with another [[DataFrame]], using the given join expression. The following performs @@ -461,10 +476,6 @@ class DataFrame private[sql]( * @since 1.3.0 */ def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = { - join(right, Seq(joinExprs), joinType) - } - - def join(right: DataFrame, joinExprs: Seq[Column], joinType: String): DataFrame = { // Note that in this function, we introduce a hack in the case of self-join to automatically // resolve ambiguous join conditions into ones that might make sense [SPARK-6231]. // Consider this case: df.join(df, df("key") === df("key")) @@ -475,17 +486,7 @@ class DataFrame private[sql]( // Trigger analysis so in the case of self-join, the analyzer will clone the plan. // After the cloning, left and right side will have distinct expression ids. - val condition = joinExprs - .foldLeft[Option[catalyst.expressions.Expression]](None) { (opt, condNext) => - opt match { - case Some(cond) => - Some(catalyst.expressions.And(cond, condNext.expr)) - case None => - Some(condNext.expr) - } - } - - val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), condition) + val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)) .queryExecution.analyzed.asInstanceOf[Join] // If auto self join alias is disabled, return the plan. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index d31c35dda2f0f..f216cf150cbcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -61,16 +61,6 @@ class DataFrameJoinSuite extends QueryTest { sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq) } - test("join - self join multiple columns") { - val df1 = testData.as('df1) - val df2 = testData.as('df2) - - checkAnswer( - df1.join(df2, Seq($"df1.key" === $"df2.key", $"df1.value" === $"df2.value"), "inner"), - sql("SELECT a.key, a.value, b.key, b.value FROM testData a JOIN testData b " + - "ON a.key = b.key AND a.value = b.value").collect().toSeq) - } - test("join - using aliases after self join") { val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") checkAnswer( From cd5c888d3edcd9a7909dd406b374df123df7bb45 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 5 Jun 2015 09:16:30 +0800 Subject: [PATCH 4/4] Import reduce in python3. --- python/pyspark/sql/dataframe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 9a1e828fec7b0..f6df46804b814 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -22,6 +22,7 @@ if sys.version >= '3': basestring = unicode = str long = int + from functools import reduce else: from itertools import imap as map