From 3b44c5978bd44db986621d3e8511e9165b66926b Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Wed, 20 Apr 2016 11:06:30 -0700 Subject: [PATCH 1/7] adding testcase --- .../org/apache/spark/sql/DataFrameSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e953a6e8ef0c2..009c101e746d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1429,4 +1429,23 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { getMessage() assert(e1.startsWith("Path does not exist")) } + + test("SPARK-12987: drop column ") { + val df = Seq((1, 2)).toDF("a_b", "a.c") + val df1 = df.drop("a_b") + checkAnswer(df1, Row(2)) + assert(df1.schema.map(_.name) === Seq("a.c")) + } + + test("SPARK-14759: drop column ") { + val df1 = sqlContext.createDataFrame(Seq((1, 2), (3, 4))).toDF("any", "hour") + val df2 = sqlContext.createDataFrame(Seq((1, 3))).toDF("any").withColumn("hour", lit(10)) + val j = df1.join(df2, $"df1.hour" === $"df2.hour", "left") + assert(j.schema.map(_.name) === Seq("any","hour","any","hour")) + print("Columns after join:{0}".format(j.columns)) + val jj = j.drop($"df2.hour") + assert(jj.schema.map(_.name) === Seq("any")) + print("Columns after drop 'hour':{0}".format(jj.columns)) + } + } From 99027fa9cfd3e968bd5dc3808e8af7f8456e1f2d Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Tue, 3 May 2016 20:51:36 -0700 Subject: [PATCH 2/7] fix --- .../src/main/scala/org/apache/spark/sql/Column.scala | 11 +++++++++++ .../scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- .../org/apache/spark/sql/DatasetAggregatorSuite.scala | 8 ++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index bd96941da798d..3eb7ec1993ad0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -68,6 +68,17 @@ class TypedColumn[-T, U]( } new TypedColumn[T, U](newExpr, encoder) } + + /** Creates a TypedColumn based on the given expression. */ + private def withExpr(newExpr: Expression): TypedColumn[Any, String] = + new TypedColumn[Any, String](newExpr, ExpressionEncoder[String]) + + override def as(alias: String): TypedColumn[Any, String] = withExpr { + expr match { + case ne: NamedExpression => Alias (expr, alias)(explicitMetadata = Some(ne.metadata)) + case other => Alias(other, alias)() + } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index fb2e9e781b417..b1a215b5cc11c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1442,7 +1442,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df1 = sqlContext.createDataFrame(Seq((1, 2), (3, 4))).toDF("any", "hour") val df2 = sqlContext.createDataFrame(Seq((1, 3))).toDF("any").withColumn("hour", lit(10)) val j = df1.join(df2, $"df1.hour" === $"df2.hour", "left") - assert(j.schema.map(_.name) === Seq("any","hour","any","hour")) + assert(j.schema.map(_.name) === Seq("any", "hour", "any", "hour")) print("Columns after join:{0}".format(j.columns)) val jj = j.drop($"df2.hour") assert(jj.schema.map(_.name) === Seq("any")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index 6eae3ed7ad6c0..57f65da72c6c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -232,4 +232,12 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { "a" -> Seq(1, 2) ) } + + test("spark-15051 aggregator in DataFrame/Dataset[Row]") { + val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j") + checkAnswer(df1.agg(RowAgg.toColumn as "b"), Row(6) :: Nil) + + val df2 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j") + checkAnswer(df2.agg(RowAgg.toColumn as "b").select("b"), Row(6) :: Nil) + } } From 0a348415e708464ba101fb0eafa0306c01f23aee Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Wed, 4 May 2016 00:54:00 -0700 Subject: [PATCH 3/7] fixing the typeColumn --- .../scala/org/apache/spark/sql/Column.scala | 14 +++++++++++--- .../org/apache/spark/sql/DataFrameSuite.scala | 19 ------------------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index eba8332346a08..83d81548d5fbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -70,10 +70,18 @@ class TypedColumn[-T, U]( } /** Creates a TypedColumn based on the given expression. */ - private def withExpr(newExpr: Expression): TypedColumn[Any, String] = - new TypedColumn[Any, String](newExpr, ExpressionEncoder[String]) + private def withExpr(newExpr: Expression): TypedColumn[T, U] = + new TypedColumn[T, U](newExpr, encoder) - override def as(alias: String): TypedColumn[Any, String] = withExpr { + /** + * Gives the TypedColumn a name (alias). + * If the current TypedColumn has metadata associated with it, this metadata will be propagated + * to the new column. + * + * @group expr_ops + * @since 2.0.0 + */ + override def as(alias: String): TypedColumn[T, U] = withExpr { expr match { case ne: NamedExpression => Alias (expr, alias)(explicitMetadata = Some(ne.metadata)) case other => Alias(other, alias)() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 3e9092ec6927f..80a93ee6d4f3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1475,23 +1475,4 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { getMessage() assert(e1.startsWith("Path does not exist")) } - - test("SPARK-12987: drop column ") { - val df = Seq((1, 2)).toDF("a_b", "a.c") - val df1 = df.drop("a_b") - checkAnswer(df1, Row(2)) - assert(df1.schema.map(_.name) === Seq("a.c")) - } - - test("SPARK-14759: drop column ") { - val df1 = sqlContext.createDataFrame(Seq((1, 2), (3, 4))).toDF("any", "hour") - val df2 = sqlContext.createDataFrame(Seq((1, 3))).toDF("any").withColumn("hour", lit(10)) - val j = df1.join(df2, $"df1.hour" === $"df2.hour", "left") - assert(j.schema.map(_.name) === Seq("any", "hour", "any", "hour")) - print("Columns after join:{0}".format(j.columns)) - val jj = j.drop($"df2.hour") - assert(jj.schema.map(_.name) === Seq("any")) - print("Columns after drop 'hour':{0}".format(jj.columns)) - } - } From c17970e0d02fd28b426aef1702520a8a6d5015cd Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Wed, 4 May 2016 22:57:08 -0700 Subject: [PATCH 4/7] override the name function --- .../main/scala/org/apache/spark/sql/Column.scala | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 83d81548d5fbd..7fee4dd0ec01e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -69,10 +69,6 @@ class TypedColumn[-T, U]( new TypedColumn[T, U](newExpr, encoder) } - /** Creates a TypedColumn based on the given expression. */ - private def withExpr(newExpr: Expression): TypedColumn[T, U] = - new TypedColumn[T, U](newExpr, encoder) - /** * Gives the TypedColumn a name (alias). * If the current TypedColumn has metadata associated with it, this metadata will be propagated @@ -81,12 +77,9 @@ class TypedColumn[-T, U]( * @group expr_ops * @since 2.0.0 */ - override def as(alias: String): TypedColumn[T, U] = withExpr { - expr match { - case ne: NamedExpression => Alias (expr, alias)(explicitMetadata = Some(ne.metadata)) - case other => Alias(other, alias)() - } - } + override def name(alias: String): TypedColumn[T, U] = + new TypedColumn[T, U](super.name(alias).expr, encoder) + } /** From 796b89a10b43290f14580f8d54c8dd2e031067f8 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Thu, 5 May 2016 13:46:19 -0700 Subject: [PATCH 5/7] delegate as to name --- .../scala/org/apache/spark/sql/Column.scala | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 7fee4dd0ec01e..fa63334a10923 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -895,7 +895,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.4.0 */ - def as(aliases: Seq[String]): Column = withExpr { MultiAlias(expr, aliases) } + def as(aliases: Seq[String]): Column = name(aliases) /** * Assigns the given aliases to the results of a table generating function. @@ -907,7 +907,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.4.0 */ - def as(aliases: Array[String]): Column = withExpr { MultiAlias(expr, aliases) } + def as(aliases: Array[String]): Column = name(aliases) /** * Gives the column an alias. @@ -922,12 +922,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.3.0 */ - def as(alias: Symbol): Column = withExpr { - expr match { - case ne: NamedExpression => Alias(expr, alias.name)(explicitMetadata = Some(ne.metadata)) - case other => Alias(other, alias.name)() - } - } + def as(alias: Symbol): Column = name(alias.name) /** * Gives the column an alias with metadata. @@ -939,9 +934,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.3.0 */ - def as(alias: String, metadata: Metadata): Column = withExpr { - Alias(expr, alias)(explicitMetadata = Some(metadata)) - } + def as(alias: String, metadata: Metadata): Column = name(alias, metadata) /** * Gives the column a name (alias). @@ -958,11 +951,30 @@ class Column(protected[sql] val expr: Expression) extends Logging { */ def name(alias: String): Column = withExpr { expr match { - case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata)) + case ne: NamedExpression => + Alias(expr, alias)(explicitMetadata = Some(ne.metadata)) case other => Alias(other, alias)() } } + /** + * Gives the column a name (alias). + * * @group expr_ops + * @since 2.0.0 + */ + def name(aliases: Seq[String]): Column = withExpr { + MultiAlias(expr, aliases) + } + + /** + * Gives the column an alias with metadata. + * @group expr_ops + * @since 2.0.0 + */ + def name(alias: String, metadata: Metadata): Column = withExpr { + Alias(expr, alias)(explicitMetadata = Some(metadata)) + } + /** * Casts the column to a different data type. * {{{ From d66dc360da31b04635427c8c7f9c071e56306eaa Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Thu, 5 May 2016 13:59:16 -0700 Subject: [PATCH 6/7] fixing style --- sql/core/src/main/scala/org/apache/spark/sql/Column.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index fa63334a10923..c84376c4528ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -951,8 +951,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { */ def name(alias: String): Column = withExpr { expr match { - case ne: NamedExpression => - Alias(expr, alias)(explicitMetadata = Some(ne.metadata)) + case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata)) case other => Alias(other, alias)() } } @@ -963,7 +962,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @since 2.0.0 */ def name(aliases: Seq[String]): Column = withExpr { - MultiAlias(expr, aliases) + MultiAlias(expr, aliases) } /** From e408fdf43c207a189f6316a80599e7f54eb832b6 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Thu, 5 May 2016 18:48:20 -0700 Subject: [PATCH 7/7] revert back name --- .../scala/org/apache/spark/sql/Column.scala | 26 ++++--------------- .../spark/sql/DatasetAggregatorSuite.scala | 2 +- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index c84376c4528ab..9b8334d334e4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -895,7 +895,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.4.0 */ - def as(aliases: Seq[String]): Column = name(aliases) + def as(aliases: Seq[String]): Column = withExpr { MultiAlias(expr, aliases) } /** * Assigns the given aliases to the results of a table generating function. @@ -907,7 +907,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.4.0 */ - def as(aliases: Array[String]): Column = name(aliases) + def as(aliases: Array[String]): Column = withExpr { MultiAlias(expr, aliases) } /** * Gives the column an alias. @@ -934,7 +934,9 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.3.0 */ - def as(alias: String, metadata: Metadata): Column = name(alias, metadata) + def as(alias: String, metadata: Metadata): Column = withExpr { + Alias(expr, alias)(explicitMetadata = Some(metadata)) + } /** * Gives the column a name (alias). @@ -956,24 +958,6 @@ class Column(protected[sql] val expr: Expression) extends Logging { } } - /** - * Gives the column a name (alias). - * * @group expr_ops - * @since 2.0.0 - */ - def name(aliases: Seq[String]): Column = withExpr { - MultiAlias(expr, aliases) - } - - /** - * Gives the column an alias with metadata. - * @group expr_ops - * @since 2.0.0 - */ - def name(alias: String, metadata: Metadata): Column = withExpr { - Alias(expr, alias)(explicitMetadata = Some(metadata)) - } - /** * Casts the column to a different data type. * {{{ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index 57f65da72c6c5..b2a0f3d67e5a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -233,7 +233,7 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { ) } - test("spark-15051 aggregator in DataFrame/Dataset[Row]") { + test("spark-15051 alias of aggregator in DataFrame/Dataset[Row]") { val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j") checkAnswer(df1.agg(RowAgg.toColumn as "b"), Row(6) :: Nil)