From 521ce977f30d115d41280f330b21d9b087f627e4 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 26 Aug 2015 17:36:33 +0900 Subject: [PATCH 1/3] [SPARK-9034] [SQL] Reflect field names defined in GenericUDTF --- .../spark/sql/catalyst/analysis/Analyzer.scala | 7 ++++--- .../spark/sql/catalyst/expressions/generators.scala | 12 +++++++----- .../main/scala/org/apache/spark/sql/DataFrame.scala | 5 +++-- .../sql/hive/execution/HiveCompatibilitySuite.scala | 1 + .../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 2 +- ...GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e | 1 + ...GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30 | 1 + ...l_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada | 1 + ...l_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7 | 0 ...l_view_noalias-2-6d5806dd1d2511911a5de1e205523f42 | 2 ++ ...l_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a | 0 ...l_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56 | 2 ++ ...al_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe | 0 ...l_view_noalias-6-16d227442dd775615c6ecfceedc6c612 | 0 ...l_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3 | 2 ++ .../spark/sql/hive/execution/HiveQuerySuite.scala | 6 ++++++ 16 files changed, 31 insertions(+), 11 deletions(-) create mode 100644 sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e create mode 100644 sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-6-16d227442dd775615c6ecfceedc6c612 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index beabacfc88e32..887c80c8bf263 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -147,7 +147,7 @@ class Analyzer( case u @ UnresolvedAlias(child) => child match { case ne: NamedExpression => ne case e if !e.resolved => u - case g: Generator if g.elementTypes.size > 1 => MultiAlias(g, Nil) + case g: Generator => MultiAlias(g, Nil) case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)() case other => Alias(other, s"_c$i")() } @@ -715,13 +715,14 @@ class Analyzer( if (names.length == elementTypes.length) { names.zip(elementTypes).map { - case (name, (t, nullable)) => + case (name, (t, nullable, _)) => AttributeReference(name, t, nullable)() } } else if (names.isEmpty) { elementTypes.zipWithIndex.map { // keep the default column names as Hive does _c0, _c1, _cN - case ((t, nullable), i) => AttributeReference(s"_c$i", t, nullable)() + case ((t, nullable, None), i) => AttributeReference(s"_c$i", t, nullable)() + case ((t, nullable, Some(name)), i) => AttributeReference(name, t, nullable)() } } else { failAnalysis( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 1a2092c909c56..fb254ef8338ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -53,7 +53,7 @@ trait Generator extends Expression { * The output element data types in structure of Seq[(DataType, Nullable)] * TODO we probably need to add more information like metadata etc. */ - def elementTypes: Seq[(DataType, Boolean)] + def elementTypes: Seq[(DataType, Boolean, Option[String])] /** Should be implemented by child classes to perform specific Generators. */ override def eval(input: InternalRow): TraversableOnce[InternalRow] @@ -69,7 +69,7 @@ trait Generator extends Expression { * A generator that produces its output using the provided lambda function. */ case class UserDefinedGenerator( - elementTypes: Seq[(DataType, Boolean)], + elementTypes: Seq[(DataType, Boolean, Option[String])], function: Row => TraversableOnce[InternalRow], children: Seq[Expression]) extends Generator with CodegenFallback { @@ -112,9 +112,11 @@ case class Explode(child: Expression) extends UnaryExpression with Generator wit } } - override def elementTypes: Seq[(DataType, Boolean)] = child.dataType match { - case ArrayType(et, containsNull) => (et, containsNull) :: Nil - case MapType(kt, vt, valueContainsNull) => (kt, false) :: (vt, valueContainsNull) :: Nil + // hive-compatible default alias for explode function ("col" for array, "key", "value" for map) + override def elementTypes: Seq[(DataType, Boolean, Option[String])] = child.dataType match { + case ArrayType(et, containsNull) => (et, containsNull, Some("col")) :: Nil + case MapType(kt, vt, valueContainsNull) => + (kt, false, Some("key")) :: (vt, valueContainsNull, Some("value")) :: Nil } override def eval(input: InternalRow): TraversableOnce[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 53ad3c0266cdb..79c2aa6956b99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1175,7 +1175,8 @@ class DataFrame private[sql]( def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = { val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] - val elementTypes = schema.toAttributes.map { attr => (attr.dataType, attr.nullable) } + val elementTypes = schema.toAttributes.map { + attr => (attr.dataType, attr.nullable, Some(attr.name)) } val names = schema.toAttributes.map(_.name) val convert = CatalystTypeConverters.createToCatalystConverter(schema) @@ -1203,7 +1204,7 @@ class DataFrame private[sql]( val dataType = ScalaReflection.schemaFor[B].dataType val attributes = AttributeReference(outputColumn, dataType)() :: Nil // TODO handle the metadata? - val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable) } + val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable, Some(attr.name)) } val names = attributes.map(_.name) def rowFunction(row: Row): TraversableOnce[InternalRow] = { diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 6ed40b03975d0..2d0d7b8af3581 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -661,6 +661,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "join_star", "lateral_view", "lateral_view_cp", + "lateral_view_noalias", "lateral_view_ppd", "leftsemijoin", "leftsemijoin_mr", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 0b5e863506142..aa0458d49955b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -511,7 +511,7 @@ private[hive] case class HiveGenericUDTF( protected lazy val collector = new UDTFCollector override lazy val elementTypes = outputInspector.getAllStructFieldRefs.asScala.map { - field => (inspectorToDataType(field.getFieldObjectInspector), true) + field => (inspectorToDataType(field.getFieldObjectInspector), true, Some(field.getFieldName)) } @transient diff --git a/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e new file mode 100644 index 0000000000000..1cf253f92c055 --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e @@ -0,0 +1 @@ +238 diff --git a/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30 b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30 new file mode 100644 index 0000000000000..60878ffb77064 --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30 @@ -0,0 +1 @@ +238 val_238 diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada b/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7 b/sql/hive/src/test/resources/golden/lateral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42 b/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42 new file mode 100644 index 0000000000000..0da0d93886e01 --- /dev/null +++ b/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42 @@ -0,0 +1,2 @@ +key1 100 +key2 200 diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a b/sql/hive/src/test/resources/golden/lateral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56 b/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56 new file mode 100644 index 0000000000000..0da0d93886e01 --- /dev/null +++ b/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56 @@ -0,0 +1,2 @@ +key1 100 +key2 200 diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe b/sql/hive/src/test/resources/golden/lateral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-6-16d227442dd775615c6ecfceedc6c612 b/sql/hive/src/test/resources/golden/lateral_view_noalias-6-16d227442dd775615c6ecfceedc6c612 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3 b/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3 new file mode 100644 index 0000000000000..4ba46bbda5b04 --- /dev/null +++ b/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3 @@ -0,0 +1,2 @@ +key1 100 key1 100 +key2 200 key2 200 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e597d6865f67a..fc72e3c7dc6aa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -563,6 +563,12 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { createQueryTest("Specify the udtf output", "SELECT d FROM (SELECT explode(array(1,1)) d FROM src LIMIT 1) t") + createQueryTest("SPARK-9034 Reflect field names defined in GenericUDTF #1", + "SELECT col FROM (SELECT explode(array(key,value)) FROM src LIMIT 1) t") + + createQueryTest("SPARK-9034 Reflect field names defined in GenericUDTF #2", + "SELECT key,value FROM (SELECT explode(map(key,value)) FROM src LIMIT 1) t") + test("sampling") { sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s") sql("SELECT * FROM src TABLESAMPLE(100 PERCENT) s") From cea49ff19872e0f2a9a44e262e08a5a3786f8d0f Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 28 Oct 2015 18:01:40 +0900 Subject: [PATCH 2/3] replace UnresolvedAttribute --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 79c2aa6956b99..897649627cb7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1185,7 +1185,7 @@ class DataFrame private[sql]( val generator = UserDefinedGenerator(elementTypes, rowFunction, input.map(_.expr)) Generate(generator, join = true, outer = false, - qualifier = None, names.map(UnresolvedAttribute(_)), logicalPlan) + qualifier = None, generatorOutput = Nil, logicalPlan) } /** @@ -1205,7 +1205,6 @@ class DataFrame private[sql]( val attributes = AttributeReference(outputColumn, dataType)() :: Nil // TODO handle the metadata? val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable, Some(attr.name)) } - val names = attributes.map(_.name) def rowFunction(row: Row): TraversableOnce[InternalRow] = { val convert = CatalystTypeConverters.createToCatalystConverter(dataType) @@ -1214,7 +1213,7 @@ class DataFrame private[sql]( val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil) Generate(generator, join = true, outer = false, - qualifier = None, names.map(UnresolvedAttribute(_)), logicalPlan) + qualifier = None, generatorOutput = Nil, logicalPlan) } ///////////////////////////////////////////////////////////////////////////// From 27d867515b57eea5914056f1ceed015e9c6723ce Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 3 Nov 2015 10:51:36 +0900 Subject: [PATCH 3/3] remove unnecessary Option for field name --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 8 +++----- .../spark/sql/catalyst/expressions/generators.scala | 10 +++++----- .../main/scala/org/apache/spark/sql/DataFrame.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 2 +- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 887c80c8bf263..2415d4c9871cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -706,7 +706,7 @@ class Analyzer( /** * Construct the output attributes for a [[Generator]], given a list of names. If the list of - * names is empty names are assigned by ordinal (i.e., _c0, _c1, ...) to match Hive's defaults. + * names is empty names are assigned from field names in generator. */ private def makeGeneratorOutput( generator: Generator, @@ -719,10 +719,8 @@ class Analyzer( AttributeReference(name, t, nullable)() } } else if (names.isEmpty) { - elementTypes.zipWithIndex.map { - // keep the default column names as Hive does _c0, _c1, _cN - case ((t, nullable, None), i) => AttributeReference(s"_c$i", t, nullable)() - case ((t, nullable, Some(name)), i) => AttributeReference(name, t, nullable)() + elementTypes.map { + case (t, nullable, name) => AttributeReference(name, t, nullable)() } } else { failAnalysis( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index fb254ef8338ae..894a0730d1c2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -53,7 +53,7 @@ trait Generator extends Expression { * The output element data types in structure of Seq[(DataType, Nullable)] * TODO we probably need to add more information like metadata etc. */ - def elementTypes: Seq[(DataType, Boolean, Option[String])] + def elementTypes: Seq[(DataType, Boolean, String)] /** Should be implemented by child classes to perform specific Generators. */ override def eval(input: InternalRow): TraversableOnce[InternalRow] @@ -69,7 +69,7 @@ trait Generator extends Expression { * A generator that produces its output using the provided lambda function. */ case class UserDefinedGenerator( - elementTypes: Seq[(DataType, Boolean, Option[String])], + elementTypes: Seq[(DataType, Boolean, String)], function: Row => TraversableOnce[InternalRow], children: Seq[Expression]) extends Generator with CodegenFallback { @@ -113,10 +113,10 @@ case class Explode(child: Expression) extends UnaryExpression with Generator wit } // hive-compatible default alias for explode function ("col" for array, "key", "value" for map) - override def elementTypes: Seq[(DataType, Boolean, Option[String])] = child.dataType match { - case ArrayType(et, containsNull) => (et, containsNull, Some("col")) :: Nil + override def elementTypes: Seq[(DataType, Boolean, String)] = child.dataType match { + case ArrayType(et, containsNull) => (et, containsNull, "col") :: Nil case MapType(kt, vt, valueContainsNull) => - (kt, false, Some("key")) :: (vt, valueContainsNull, Some("value")) :: Nil + (kt, false, "key") :: (vt, valueContainsNull, "value") :: Nil } override def eval(input: InternalRow): TraversableOnce[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 897649627cb7f..fc0ab632f9930 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1176,7 +1176,7 @@ class DataFrame private[sql]( val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val elementTypes = schema.toAttributes.map { - attr => (attr.dataType, attr.nullable, Some(attr.name)) } + attr => (attr.dataType, attr.nullable, attr.name) } val names = schema.toAttributes.map(_.name) val convert = CatalystTypeConverters.createToCatalystConverter(schema) @@ -1204,7 +1204,7 @@ class DataFrame private[sql]( val dataType = ScalaReflection.schemaFor[B].dataType val attributes = AttributeReference(outputColumn, dataType)() :: Nil // TODO handle the metadata? - val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable, Some(attr.name)) } + val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable, attr.name) } def rowFunction(row: Row): TraversableOnce[InternalRow] = { val convert = CatalystTypeConverters.createToCatalystConverter(dataType) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index aa0458d49955b..a9db70119d011 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -511,7 +511,7 @@ private[hive] case class HiveGenericUDTF( protected lazy val collector = new UDTFCollector override lazy val elementTypes = outputInspector.getAllStructFieldRefs.asScala.map { - field => (inspectorToDataType(field.getFieldObjectInspector), true, Some(field.getFieldName)) + field => (inspectorToDataType(field.getFieldObjectInspector), true, field.getFieldName) } @transient