From bbdeebe645d0f045d51b7e9e9adc379fbd7cdc55 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Fri, 28 Nov 2014 19:34:32 +0530 Subject: [PATCH 1/6] Support COALESCE function in Spark SQL and Hive QL --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 2 ++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 13 +++++++++++++ .../scala/org/apache/spark/sql/hive/HiveQl.scala | 2 ++ .../spark/sql/hive/execution/SQLQuerySuite.scala | 9 ++++++++- 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index dc1d349f10f1b..225e33b301695 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -51,6 +51,7 @@ class SqlParser extends AbstractSparkSQLParser { protected val CACHE = Keyword("CACHE") protected val CASE = Keyword("CASE") protected val CAST = Keyword("CAST") + protected val COALESCE = Keyword("COALESCE") protected val COUNT = Keyword("COUNT") protected val DECIMAL = Keyword("DECIMAL") protected val DESC = Keyword("DESC") @@ -305,6 +306,7 @@ class SqlParser extends AbstractSparkSQLParser { { case s ~ p ~ l => Substring(s, p, l) } | SQRT ~ "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } | ABS ~ "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) } + | COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exps => Coalesce(exps) } | ident ~ ("(" ~> repsep(expression, ",")) <~ ")" ^^ { case udfName ~ exprs => UnresolvedFunction(udfName, exprs) } ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 84ee3051eb682..1383c7690197c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -992,4 +992,17 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { "nulldata2 on nulldata1.value <=> nulldata2.value"), (1 to 2).map(i => Seq(i))) } + + test("Supporting Coalesce function in Spark SQL") { + val nullCheckData1 = TestData(1,"1") :: TestData(2,null) :: Nil + val rdd1 = sparkContext.parallelize((0 to 1).map(i => nullCheckData1(i))) + rdd1.registerTempTable("nulldata1") + val nullCheckData2 = TestData(1,"1") :: TestData(2,"2") :: Nil + val rdd2 = sparkContext.parallelize((0 to 1).map(i => nullCheckData2(i))) + rdd2.registerTempTable("nulldata2") + + checkAnswer(sql("SELECT Coalesce(nulldata2.value,nulldata1.value) FROM nulldata1 join " + + "nulldata2 on nulldata1.key = nulldata2.key"), + (1 to 2).map(i => Seq(i.toString))) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index b9283f668a9b5..6826348985572 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -902,6 +902,7 @@ private[hive] object HiveQl { val CASE = "(?i)CASE".r val SUBSTR = "(?i)SUBSTR(?:ING)?".r val SQRT = "(?i)SQRT".r + val COALESCE = "(?i)COALESCE".r protected def nodeToExpr(node: Node): Expression = node match { /* Attribute References */ @@ -1053,6 +1054,7 @@ private[hive] object HiveQl { Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType)) case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length)) + case Token("TOK_FUNCTION", Token(COALESCE(), Nil) :: args) => Coalesce(args.map(nodeToExpr)) /* UDFs - Must be last otherwise will preempt built in functions */ case Token("TOK_FUNCTION", Token(name, Nil) :: args) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e9b1943ff8db7..0614c9c307ab6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -163,9 +163,16 @@ class SQLQuerySuite extends QueryTest { sql("SELECT case when ~1=-2 then 1 else 0 end FROM src"), sql("SELECT 1 FROM src").collect().toSeq) } - + test("SPARK-4154 Query does not work if it has 'not between' in Spark SQL and HQL") { checkAnswer(sql("SELECT key FROM src WHERE key not between 0 and 10 order by key"), sql("SELECT key FROM src WHERE key between 11 and 500 order by key").collect().toSeq) } + + test("Supporting Coalesce function in Spark SQL") { + sql("SELECT * FROM src where key=477").registerTempTable("src1") + checkAnswer(sql("SELECT COALESCE(src1.key,src.key) FROM src left outer join src1 " + +"on src.key=src1.key order by src.key,src1.key"), + sql("SELECT key FROM src order by key").collect().toSeq) + } } From f2ea39b68cb3965be3eeeb9e74794778e5b884a9 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 29 Nov 2014 00:25:18 +0530 Subject: [PATCH 2/6] Revert "Support COALESCE function in Spark SQL and Hive QL" This reverts commit bbdeebe645d0f045d51b7e9e9adc379fbd7cdc55. --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 2 -- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 13 ------------- .../scala/org/apache/spark/sql/hive/HiveQl.scala | 2 -- .../spark/sql/hive/execution/SQLQuerySuite.scala | 9 +-------- 4 files changed, 1 insertion(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 225e33b301695..dc1d349f10f1b 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -51,7 +51,6 @@ class SqlParser extends AbstractSparkSQLParser { protected val CACHE = Keyword("CACHE") protected val CASE = Keyword("CASE") protected val CAST = Keyword("CAST") - protected val COALESCE = Keyword("COALESCE") protected val COUNT = Keyword("COUNT") protected val DECIMAL = Keyword("DECIMAL") protected val DESC = Keyword("DESC") @@ -306,7 +305,6 @@ class SqlParser extends AbstractSparkSQLParser { { case s ~ p ~ l => Substring(s, p, l) } | SQRT ~ "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } | ABS ~ "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) } - | COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exps => Coalesce(exps) } | ident ~ ("(" ~> repsep(expression, ",")) <~ ")" ^^ { case udfName ~ exprs => UnresolvedFunction(udfName, exprs) } ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1383c7690197c..84ee3051eb682 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -992,17 +992,4 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { "nulldata2 on nulldata1.value <=> nulldata2.value"), (1 to 2).map(i => Seq(i))) } - - test("Supporting Coalesce function in Spark SQL") { - val nullCheckData1 = TestData(1,"1") :: TestData(2,null) :: Nil - val rdd1 = sparkContext.parallelize((0 to 1).map(i => nullCheckData1(i))) - rdd1.registerTempTable("nulldata1") - val nullCheckData2 = TestData(1,"1") :: TestData(2,"2") :: Nil - val rdd2 = sparkContext.parallelize((0 to 1).map(i => nullCheckData2(i))) - rdd2.registerTempTable("nulldata2") - - checkAnswer(sql("SELECT Coalesce(nulldata2.value,nulldata1.value) FROM nulldata1 join " + - "nulldata2 on nulldata1.key = nulldata2.key"), - (1 to 2).map(i => Seq(i.toString))) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 6826348985572..b9283f668a9b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -902,7 +902,6 @@ private[hive] object HiveQl { val CASE = "(?i)CASE".r val SUBSTR = "(?i)SUBSTR(?:ING)?".r val SQRT = "(?i)SQRT".r - val COALESCE = "(?i)COALESCE".r protected def nodeToExpr(node: Node): Expression = node match { /* Attribute References */ @@ -1054,7 +1053,6 @@ private[hive] object HiveQl { Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType)) case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length)) - case Token("TOK_FUNCTION", Token(COALESCE(), Nil) :: args) => Coalesce(args.map(nodeToExpr)) /* UDFs - Must be last otherwise will preempt built in functions */ case Token("TOK_FUNCTION", Token(name, Nil) :: args) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 0614c9c307ab6..e9b1943ff8db7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -163,16 +163,9 @@ class SQLQuerySuite extends QueryTest { sql("SELECT case when ~1=-2 then 1 else 0 end FROM src"), sql("SELECT 1 FROM src").collect().toSeq) } - + test("SPARK-4154 Query does not work if it has 'not between' in Spark SQL and HQL") { checkAnswer(sql("SELECT key FROM src WHERE key not between 0 and 10 order by key"), sql("SELECT key FROM src WHERE key between 11 and 500 order by key").collect().toSeq) } - - test("Supporting Coalesce function in Spark SQL") { - sql("SELECT * FROM src where key=477").registerTempTable("src1") - checkAnswer(sql("SELECT COALESCE(src1.key,src.key) FROM src left outer join src1 " - +"on src.key=src1.key order by src.key,src1.key"), - sql("SELECT key FROM src order by key").collect().toSeq) - } } From a3bee8c29795ccf9954bb3988b5ff0777b59ef7d Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 29 Nov 2014 00:26:58 +0530 Subject: [PATCH 3/6] Revert "Revert "Support COALESCE function in Spark SQL and Hive QL"" This reverts commit f2ea39b68cb3965be3eeeb9e74794778e5b884a9. --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 2 ++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 13 +++++++++++++ .../scala/org/apache/spark/sql/hive/HiveQl.scala | 2 ++ .../spark/sql/hive/execution/SQLQuerySuite.scala | 9 ++++++++- 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index dc1d349f10f1b..225e33b301695 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -51,6 +51,7 @@ class SqlParser extends AbstractSparkSQLParser { protected val CACHE = Keyword("CACHE") protected val CASE = Keyword("CASE") protected val CAST = Keyword("CAST") + protected val COALESCE = Keyword("COALESCE") protected val COUNT = Keyword("COUNT") protected val DECIMAL = Keyword("DECIMAL") protected val DESC = Keyword("DESC") @@ -305,6 +306,7 @@ class SqlParser extends AbstractSparkSQLParser { { case s ~ p ~ l => Substring(s, p, l) } | SQRT ~ "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } | ABS ~ "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) } + | COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exps => Coalesce(exps) } | ident ~ ("(" ~> repsep(expression, ",")) <~ ")" ^^ { case udfName ~ exprs => UnresolvedFunction(udfName, exprs) } ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 84ee3051eb682..1383c7690197c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -992,4 +992,17 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { "nulldata2 on nulldata1.value <=> nulldata2.value"), (1 to 2).map(i => Seq(i))) } + + test("Supporting Coalesce function in Spark SQL") { + val nullCheckData1 = TestData(1,"1") :: TestData(2,null) :: Nil + val rdd1 = sparkContext.parallelize((0 to 1).map(i => nullCheckData1(i))) + rdd1.registerTempTable("nulldata1") + val nullCheckData2 = TestData(1,"1") :: TestData(2,"2") :: Nil + val rdd2 = sparkContext.parallelize((0 to 1).map(i => nullCheckData2(i))) + rdd2.registerTempTable("nulldata2") + + checkAnswer(sql("SELECT Coalesce(nulldata2.value,nulldata1.value) FROM nulldata1 join " + + "nulldata2 on nulldata1.key = nulldata2.key"), + (1 to 2).map(i => Seq(i.toString))) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index b9283f668a9b5..6826348985572 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -902,6 +902,7 @@ private[hive] object HiveQl { val CASE = "(?i)CASE".r val SUBSTR = "(?i)SUBSTR(?:ING)?".r val SQRT = "(?i)SQRT".r + val COALESCE = "(?i)COALESCE".r protected def nodeToExpr(node: Node): Expression = node match { /* Attribute References */ @@ -1053,6 +1054,7 @@ private[hive] object HiveQl { Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType)) case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length)) + case Token("TOK_FUNCTION", Token(COALESCE(), Nil) :: args) => Coalesce(args.map(nodeToExpr)) /* UDFs - Must be last otherwise will preempt built in functions */ case Token("TOK_FUNCTION", Token(name, Nil) :: args) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e9b1943ff8db7..0614c9c307ab6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -163,9 +163,16 @@ class SQLQuerySuite extends QueryTest { sql("SELECT case when ~1=-2 then 1 else 0 end FROM src"), sql("SELECT 1 FROM src").collect().toSeq) } - + test("SPARK-4154 Query does not work if it has 'not between' in Spark SQL and HQL") { checkAnswer(sql("SELECT key FROM src WHERE key not between 0 and 10 order by key"), sql("SELECT key FROM src WHERE key between 11 and 500 order by key").collect().toSeq) } + + test("Supporting Coalesce function in Spark SQL") { + sql("SELECT * FROM src where key=477").registerTempTable("src1") + checkAnswer(sql("SELECT COALESCE(src1.key,src.key) FROM src left outer join src1 " + +"on src.key=src1.key order by src.key,src1.key"), + sql("SELECT key FROM src order by key").collect().toSeq) + } } From 25c9f855cca506ce22c812aa0677416ab049f3f5 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 29 Nov 2014 00:35:54 +0530 Subject: [PATCH 4/6] updated --- .../main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 -- .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 9 +-------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 6826348985572..b9283f668a9b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -902,7 +902,6 @@ private[hive] object HiveQl { val CASE = "(?i)CASE".r val SUBSTR = "(?i)SUBSTR(?:ING)?".r val SQRT = "(?i)SQRT".r - val COALESCE = "(?i)COALESCE".r protected def nodeToExpr(node: Node): Expression = node match { /* Attribute References */ @@ -1054,7 +1053,6 @@ private[hive] object HiveQl { Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType)) case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length)) - case Token("TOK_FUNCTION", Token(COALESCE(), Nil) :: args) => Coalesce(args.map(nodeToExpr)) /* UDFs - Must be last otherwise will preempt built in functions */ case Token("TOK_FUNCTION", Token(name, Nil) :: args) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 0614c9c307ab6..79ac743d93392 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -163,16 +163,9 @@ class SQLQuerySuite extends QueryTest { sql("SELECT case when ~1=-2 then 1 else 0 end FROM src"), sql("SELECT 1 FROM src").collect().toSeq) } - + test("SPARK-4154 Query does not work if it has 'not between' in Spark SQL and HQL") { checkAnswer(sql("SELECT key FROM src WHERE key not between 0 and 10 order by key"), sql("SELECT key FROM src WHERE key between 11 and 500 order by key").collect().toSeq) } - - test("Supporting Coalesce function in Spark SQL") { - sql("SELECT * FROM src where key=477").registerTempTable("src1") - checkAnswer(sql("SELECT COALESCE(src1.key,src.key) FROM src left outer join src1 " - +"on src.key=src1.key order by src.key,src1.key"), - sql("SELECT key FROM src order by key").collect().toSeq) - } } From 9fa1f9e7918cc4bca20230c4310408cab2796986 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 29 Nov 2014 00:39:36 +0530 Subject: [PATCH 5/6] style issue --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 79ac743d93392..e9b1943ff8db7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -163,7 +163,7 @@ class SQLQuerySuite extends QueryTest { sql("SELECT case when ~1=-2 then 1 else 0 end FROM src"), sql("SELECT 1 FROM src").collect().toSeq) } - + test("SPARK-4154 Query does not work if it has 'not between' in Spark SQL and HQL") { checkAnswer(sql("SELECT key FROM src WHERE key not between 0 and 10 order by key"), sql("SELECT key FROM src WHERE key between 11 and 500 order by key").collect().toSeq) From e377a0c65e68994b0b305c24c0d6ff62128d7772 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 29 Nov 2014 23:16:15 +0530 Subject: [PATCH 6/6] Supporting type widening in Coalesce function and replaced Coalesce UDF with local function. --- .../catalyst/analysis/HiveTypeCoercion.scala | 21 +++++++++++++++++++ .../org/apache/spark/sql/hive/HiveQl.scala | 2 ++ .../sql/hive/execution/SQLQuerySuite.scala | 7 +++++++ 3 files changed, 30 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index e38114ab3cf25..f772526073691 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -219,6 +219,27 @@ trait HiveTypeCoercion { if (b.right.dataType == widestType) b.right else Cast(b.right, widestType) b.makeCopy(Array(newLeft, newRight)) }.getOrElse(b) // If there is no applicable conversion, leave expression unchanged. + + case c @ Coalesce(exps) => + val valueTypes = exps.map (_.dataType) + if (valueTypes.distinct.size > 1) { + val commonType = valueTypes.reduce { (v1, v2) => + // When a string is found on one side, make the other side a string too. + if (v1 == StringType || v2 == StringType) { + StringType + } else { + findTightestCommonType(v1, v2) + .getOrElse(sys.error( + s"Types in Coalesce must be the same or coercible to a common type: $v1 != $v2")) + } + } + val transformedExps = exps.map {value=> + if (value.dataType != commonType) Cast(value, commonType) else value + } + Coalesce(transformedExps) + } else { + c + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index b9283f668a9b5..6826348985572 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -902,6 +902,7 @@ private[hive] object HiveQl { val CASE = "(?i)CASE".r val SUBSTR = "(?i)SUBSTR(?:ING)?".r val SQRT = "(?i)SQRT".r + val COALESCE = "(?i)COALESCE".r protected def nodeToExpr(node: Node): Expression = node match { /* Attribute References */ @@ -1053,6 +1054,7 @@ private[hive] object HiveQl { Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType)) case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length)) + case Token("TOK_FUNCTION", Token(COALESCE(), Nil) :: args) => Coalesce(args.map(nodeToExpr)) /* UDFs - Must be last otherwise will preempt built in functions */ case Token("TOK_FUNCTION", Token(name, Nil) :: args) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e9b1943ff8db7..b8fe3adb241c6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -168,4 +168,11 @@ class SQLQuerySuite extends QueryTest { checkAnswer(sql("SELECT key FROM src WHERE key not between 0 and 10 order by key"), sql("SELECT key FROM src WHERE key between 11 and 500 order by key").collect().toSeq) } + + test("Supporting Coalesce function in Spark SQL") { + sql("SELECT * FROM src where key=477").registerTempTable("src1") + checkAnswer(sql("SELECT COALESCE(src1.key,src.key) FROM src left outer join src1 " + +"on src.key=src1.key order by src.key,src1.key"), + sql("SELECT key FROM src order by key").collect().toSeq) + } }