From fe4c0fd2c4aab0e631e3d485714ed20e6c64da2c Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 25 Aug 2015 21:05:37 -0700 Subject: [PATCH] add more java-friendly DF API --- .../org/apache/spark/sql/DataFrame.scala | 112 +++++++++++------- .../apache/spark/sql/JavaDataFrameSuite.java | 5 + .../apache/spark/sql/DataFrameJoinSuite.scala | 4 + .../org/apache/spark/sql/DataFrameSuite.scala | 20 ++++ 4 files changed, 99 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 791c10c3d7ce7..3e330391435e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -446,8 +446,8 @@ class DataFrame private[sql]( * i.e. similar to SQL's `JOIN USING` syntax. * * {{{ - * // Joining df1 and df2 using the column "user_id" - * df1.join(df2, "user_id") + * // Joining df1 and df2 using the column "user_name", and "user_id" + * df1.join(df2, "user_name", "user_id") * }}} * * Note that if you perform a self-join using this function without aliasing the input @@ -455,43 +455,26 @@ class DataFrame private[sql]( * there is no way to disambiguate which side of the join you would like to reference. * * @param right Right side of the join operation. - * @param usingColumn Name of the column to join on. This column must exist on both sides. + * @param usingColumns Names of the column to join on. Can one or more column names. + * These columns must exist on both sides. * @group dfops - * @since 1.4.0 + * @since 1.6.0 */ - def join(right: DataFrame, usingColumn: String): DataFrame = { - join(right, Seq(usingColumn)) - } + @scala.annotation.varargs + def join(right: DataFrame, colName: String, usingColumns: String*): DataFrame = { + // TODO ideally the API signature should be like def join(right: DataFrame, colNames: String*) + // However, the compiler reports duplicated functions after erasure with + // def join(right: DataFrame, usingColumns: Seq[String]) - /** - * Inner equi-join with another [[DataFrame]] using the given columns. - * - * Different from other join functions, the join columns will only appear once in the output, - * i.e. similar to SQL's `JOIN USING` syntax. - * - * {{{ - * // Joining df1 and df2 using the columns "user_id" and "user_name" - * df1.join(df2, Seq("user_id", "user_name")) - * }}} - * - * Note that if you perform a self-join using this function without aliasing the input - * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since - * there is no way to disambiguate which side of the join you would like to reference. - * - * @param right Right side of the join operation. - * @param usingColumns Names of the columns to join on. This columns must exist on both sides. - * @group dfops - * @since 1.4.0 - */ - def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // by creating a new instance for one of the branch. val joined = sqlContext.executePlan( Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join] // Project only one of the join columns. - val joinedCols = usingColumns.map(col => joined.right.resolve(col)) - val condition = usingColumns.map { col => + val allColumns = colName +: usingColumns + val joinedCols = allColumns.map(col => joined.right.resolve(col)) + val condition = allColumns.map { col => catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col)) }.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) => catalyst.expressions.And(cond, eqTo) @@ -1238,10 +1221,11 @@ class DataFrame private[sql]( * the subset of columns. * * @group dfops - * @since 1.4.0 + * @since 1.6.0 */ - def dropDuplicates(colNames: Seq[String]): DataFrame = { - val groupCols = colNames.map(resolve) + @scala.annotation.varargs + def dropDuplicates(colName: String, colNames: String*): DataFrame = { + val groupCols = (colName +: colNames).map(resolve) val groupColExprIds = groupCols.map(_.exprId) val aggCols = logicalPlan.output.map { attr => if (groupColExprIds.contains(attr.exprId)) { @@ -1253,15 +1237,6 @@ class DataFrame private[sql]( Aggregate(groupCols, aggCols, logicalPlan) } - /** - * Returns a new [[DataFrame]] with duplicate rows removed, considering only - * the subset of columns. - * - * @group dfops - * @since 1.4.0 - */ - def dropDuplicates(colNames: Array[String]): DataFrame = dropDuplicates(colNames.toSeq) - /** * Computes statistics for numeric columns, including count, mean, stddev, min, and max. * If no columns are given, this function computes statistics for all numerical columns. @@ -1913,6 +1888,59 @@ class DataFrame private[sql]( write.mode(SaveMode.Append).insertInto(tableName) } + /** + * (Scala-specific) Returns a new [[DataFrame]] with duplicate rows removed, considering only + * the subset of columns. + * + * @group dfops + * @deprecated As of 1.6.0, replaced by + * `dropDuplicates(colName1, colName2...)`. + */ + @deprecated("Use dropDuplicates(colName1, colName2...)", "1.6.0") + def dropDuplicates(colNames: Seq[String]): DataFrame = { + dropDuplicates(colNames.head, colNames.tail: _*) + } + + /** + * Returns a new [[DataFrame]] with duplicate rows removed, considering only + * the subset of columns. + * + * @group dfops + * @deprecated As of 1.6.0, replaced by + * `dropDuplicates(colName1, colName2...)`. + */ + @deprecated("Use dropDuplicates(colName1, colName2...)", "1.6.0") + def dropDuplicates(colNames: Array[String]): DataFrame = { + val seqNames = colNames.toSeq + dropDuplicates(seqNames.head, seqNames.tail: _*) + } + + /** + * Inner equi-join with another [[DataFrame]] using the given columns. + * + * Different from other join functions, the join columns will only appear once in the output, + * i.e. similar to SQL's `JOIN USING` syntax. + * + * {{{ + * // Joining df1 and df2 using the columns "user_id" and "user_name" + * df1.join(df2, Seq("user_id", "user_name")) + * }}} + * + * Note that if you perform a self-join using this function without aliasing the input + * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since + * there is no way to disambiguate which side of the join you would like to reference. + * + * @param right Right side of the join operation. + * @param usingColumns Names of the columns to join on. This columns must exist on both sides. + * @group dfops + * @deprecated As of 1.6.0, replaced by + * `join(right, colName1, colName2, ...)`. + */ + @deprecated("Use join(right, colName1, colName2, ...)", "1.6.0") + def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { + join(right, usingColumns.head, usingColumns.tail: _*) + } + /** * Wrap a DataFrame action to track all Spark jobs in the body so that we can connect them with * an execution. diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 7abdd3db80341..6ebe33f4b18e7 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -107,6 +107,11 @@ public void testVarargMethods() { df2.select(rand(), acos("b")); df2.select(col("*"), randn(5L)); + + DataFrame l = df.toDF("a", "b"); + DataFrame r = df2.toDF("a", "b"); + l.join(r, "a", "b"); + l.join(r, "a"); } @Ignore diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index e2716d7841d85..4074a2387b39b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -40,6 +40,10 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { checkAnswer( df.join(df2, Seq("int", "int2")), Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil) + + checkAnswer( + df.join(df2, "int", "int2"), + Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil) } test("join - join using self join") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 284fff184085a..4e6caab4e778e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -678,6 +678,26 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer( testData.dropDuplicates(Seq("value2")), Seq(Row(2, 1, 2), Row(1, 1, 1))) + + checkAnswer( + testData.dropDuplicates("key", "value1"), + Seq(Row(2, 1, 2), Row(1, 2, 1), Row(1, 1, 1), Row(2, 2, 2))) + + checkAnswer( + testData.dropDuplicates("value1", "value2"), + Seq(Row(2, 1, 2), Row(1, 2, 1), Row(1, 1, 1), Row(2, 2, 2))) + + checkAnswer( + testData.dropDuplicates("key"), + Seq(Row(2, 1, 2), Row(1, 1, 1))) + + checkAnswer( + testData.dropDuplicates("value1"), + Seq(Row(2, 1, 2), Row(1, 2, 1))) + + checkAnswer( + testData.dropDuplicates("value2"), + Seq(Row(2, 1, 2), Row(1, 1, 1))) } test("SPARK-7150 range api") {