From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 1/6] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdba..5a5b71e52dd79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da02..5e00546a74c00 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 2/6] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00aa..47962ebe6ef82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c00..61d9dcd37572b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 3/6] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572b..3427152b2da02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From a39def82e2ad09c1c311e9ca062d7ac16304eee5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 3 Mar 2016 15:52:01 -0800 Subject: [PATCH 4/6] SQL generation for script transformation --- .../apache/spark/sql/hive/SQLBuilder.scala | 79 +++++++++++++++++++ .../sql/hive/LogicalPlanToSQLSuite.scala | 57 +++++++++++++ 2 files changed, 136 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 13a78c609e014..d68241f564f80 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.execution.HiveScriptIOSchema /** * A builder class used to convert a resolved logical plan into a SQL query string. Note that this @@ -87,6 +88,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case a: Aggregate => toSQL(node) case s: Sort => toSQL(node) case r: RepartitionByExpression => toSQL(node) + case t: ScriptTransformation => toSQL(node) case _ => build( "SELECT", @@ -196,6 +198,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi p.partitionExpressions.map(_.sql).mkString(", ") ) + case p: ScriptTransformation => + scriptTransformationToSQL(p) + case OneRowRelation => "" @@ -221,6 +226,80 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi ) } + /** + * Get the row format specification + * Note: + * 1. Changes are needed when readerClause and writerClause are supported. + * 2. Changes are needed when "ESCAPED BY" is supported. + */ + private def getRowFormat( + schemaLess: Boolean, + rowFormat: Seq[(String, String)], + serdeClass: Option[String], + serdeProps: Seq[(String, String)]): String = { + if (schemaLess) return "" + + val rowFormatDelimited = + rowFormat.map { + case ("TOK_TABLEROWFORMATFIELD", value) => + "FIELDS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATCOLLITEMS", value) => + "COLLECTION ITEMS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATMAPKEYS", value) => + "MAP KEYS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATLINES", value) => + "LINES TERMINATED BY " + value + case ("TOK_TABLEROWFORMATNULL", value) => + "NULL DEFINED AS " + value + case o => + throw new UnsupportedOperationException( + s"Row format $o doesn't have a SQL representation") + } + + val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("") + val serdePropsSQL = + if (serdeClass.nonEmpty) { + val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ") + if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else "" + } else { + "" + } + if (rowFormat.nonEmpty) { + "ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" ") + } else { + "ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL + } + } + + private def scriptTransformationToSQL(plan: ScriptTransformation): String = { + val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema] + val inputRowFormat = getRowFormat( + ioSchema.schemaLess, + ioSchema.inputRowFormat, + ioSchema.inputSerdeClass, + ioSchema.inputSerdeProps) + val outputRowFormat = getRowFormat( + ioSchema.schemaLess, + ioSchema.outputRowFormat, + ioSchema.outputSerdeClass, + ioSchema.outputSerdeProps) + + val outputSchema = plan.output.map { attr => + s"${attr.sql} ${attr.dataType.simpleString}" + }.mkString(", ") + + build( + "SELECT TRANSFORM", + "(" + plan.input.map(_.sql).mkString(", ") + ")", + inputRowFormat, + s"USING \'${plan.script}\'", + "AS (" + outputSchema + ")", + outputRowFormat, + if (plan.child == OneRowRelation) "" else "FROM", + toSQL(plan.child) + ) + } + private def aggregateToSQL(plan: Aggregate): String = { val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ") build( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index d708fcf8dd4d9..d5cec21b8b136 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -281,6 +281,63 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { } } + test("script transformation - schemaless") { + checkHiveQl("SELECT TRANSFORM (a, b, c, d) USING 'cat' FROM parquet_t2") + checkHiveQl("SELECT TRANSFORM (*) USING 'cat' FROM parquet_t2") + } + + test("script transformation - alias list") { + checkHiveQl("SELECT TRANSFORM (a, b, c, d) USING 'cat' AS (d1, d2, d3, d4) FROM parquet_t2") + } + + test("script transformation - alias list with type") { + checkHiveQl( + """FROM + |(FROM parquet_t1 SELECT TRANSFORM(key, value) USING 'cat' AS (thing1 int, thing2 string)) t + |SELECT thing1 + 1 + """.stripMargin) + } + + test("script transformation - row format delimited clause with only one format property") { + checkHiveQl( + """SELECT TRANSFORM (key) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' + |USING 'cat' AS (tKey) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' + |FROM parquet_t1 + """.stripMargin) + } + + test("script transformation - row format delimited clause with multiple format properties") { + checkHiveQl( + """SELECT TRANSFORM (key) + |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\t' + |USING 'cat' AS (tKey) + |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\t' + |FROM parquet_t1 + """.stripMargin) + } + + test("script transformation - row format serde clauses with SERDEPROPERTIES") { + checkHiveQl( + """SELECT TRANSFORM (key, value) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |WITH SERDEPROPERTIES('field.delim' = '|') + |USING 'cat' AS (tKey, tValue) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |WITH SERDEPROPERTIES('field.delim' = '|') + |FROM parquet_t1 + """.stripMargin) + } + + test("script transformation - row format serde clauses without SERDEPROPERTIES") { + checkHiveQl( + """SELECT TRANSFORM (key, value) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |USING 'cat' AS (tKey, tValue) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |FROM parquet_t1 + """.stripMargin) + } + test("plans with non-SQL expressions") { sqlContext.udf.register("foo", (_: Int) * 2) intercept[UnsupportedOperationException](new SQLBuilder(sql("SELECT foo(id) FROM t0")).toSQL) From 04def724850b1ea163cd2648afca772fbab98dfd Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 7 Mar 2016 12:58:35 -0800 Subject: [PATCH 5/6] moved rowFormatToSQL into ScriptTransformation.scala --- .../apache/spark/sql/hive/SQLBuilder.scala | 65 +++---------------- .../hive/execution/ScriptTransformation.scala | 48 ++++++++++++++ 2 files changed, 56 insertions(+), 57 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 6e7b0438ecfaf..f1616463b1aec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -218,63 +218,14 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi ) } - /** - * Get the row format specification - * Note: - * 1. Changes are needed when readerClause and writerClause are supported. - * 2. Changes are needed when "ESCAPED BY" is supported. - */ - private def getRowFormat( - schemaLess: Boolean, - rowFormat: Seq[(String, String)], - serdeClass: Option[String], - serdeProps: Seq[(String, String)]): String = { - if (schemaLess) return "" - - val rowFormatDelimited = - rowFormat.map { - case ("TOK_TABLEROWFORMATFIELD", value) => - "FIELDS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATCOLLITEMS", value) => - "COLLECTION ITEMS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATMAPKEYS", value) => - "MAP KEYS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATLINES", value) => - "LINES TERMINATED BY " + value - case ("TOK_TABLEROWFORMATNULL", value) => - "NULL DEFINED AS " + value - case o => - throw new UnsupportedOperationException( - s"Row format $o doesn't have a SQL representation") - } - - val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("") - val serdePropsSQL = - if (serdeClass.nonEmpty) { - val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ") - if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else "" - } else { - "" - } - if (rowFormat.nonEmpty) { - "ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" ") - } else { - "ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL - } - } - private def scriptTransformationToSQL(plan: ScriptTransformation): String = { val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema] - val inputRowFormat = getRowFormat( - ioSchema.schemaLess, - ioSchema.inputRowFormat, - ioSchema.inputSerdeClass, - ioSchema.inputSerdeProps) - val outputRowFormat = getRowFormat( - ioSchema.schemaLess, - ioSchema.outputRowFormat, - ioSchema.outputSerdeClass, - ioSchema.outputSerdeProps) + val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse( + throw new UnsupportedOperationException( + s"unsupported row format ${ioSchema.inputRowFormat}")) + val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse( + throw new UnsupportedOperationException( + s"unsupported row format ${ioSchema.outputRowFormat}")) val outputSchema = plan.output.map { attr => s"${attr.sql} ${attr.dataType.simpleString}" @@ -283,10 +234,10 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi build( "SELECT TRANSFORM", "(" + plan.input.map(_.sql).mkString(", ") + ")", - inputRowFormat, + inputRowFormatSQL, s"USING \'${plan.script}\'", "AS (" + outputSchema + ")", - outputRowFormat, + outputRowFormatSQL, if (plan.child == OneRowRelation) "" else "FROM", toSQL(plan.child) ) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 5e6641693798f..a9542668c549e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -399,4 +399,52 @@ case class HiveScriptIOSchema ( instance } } + + def inputRowFormatSQL: Option[String] = + getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps) + + def outputRowFormatSQL: Option[String] = + getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps) + + /** + * Get the row format specification + * Note: + * 1. Changes are needed when readerClause and writerClause are supported. + * 2. Changes are needed when "ESCAPED BY" is supported. + */ + private def getRowFormatSQL( + rowFormat: Seq[(String, String)], + serdeClass: Option[String], + serdeProps: Seq[(String, String)]): Option[String] = { + if (schemaLess) return Some("") + + val rowFormatDelimited = + rowFormat.map { + case ("TOK_TABLEROWFORMATFIELD", value) => + "FIELDS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATCOLLITEMS", value) => + "COLLECTION ITEMS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATMAPKEYS", value) => + "MAP KEYS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATLINES", value) => + "LINES TERMINATED BY " + value + case ("TOK_TABLEROWFORMATNULL", value) => + "NULL DEFINED AS " + value + case o => return None + } + + val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("") + val serdePropsSQL = + if (serdeClass.nonEmpty) { + val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ") + if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else "" + } else { + "" + } + if (rowFormat.nonEmpty) { + Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" ")) + } else { + Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL) + } + } } From 5dba0e77e65e2ffbfb70b3835fe0b13e3d5ed807 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 7 Mar 2016 14:38:52 -0800 Subject: [PATCH 6/6] style fix. --- .../src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index f1616463b1aec..74efe6eb50820 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -30,8 +30,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType} import org.apache.spark.sql.hive.execution.HiveScriptIOSchema +import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType} /** * A place holder for generated SQL for subquery expression.