Skip to content

Commit

Permalink
rollback LogicalPlan, support dot operation on nested array type
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Sep 10, 2014
1 parent a58df40 commit ee8a724
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
expression ~ "[" ~ expression <~ "]" ^^ {
case base ~ _ ~ ordinal => GetItem(base, ordinal)
} |
expression ~ "." ~ ident ^^ {
case base ~ _ ~ fieldName => GetField(base, fieldName)
(expression <~ ".") ~ ident ^^ {
case base ~ fieldName => GetField(base, fieldName)
} |
TRUE ^^^ Literal(true, BooleanType) |
FALSE ^^^ Literal(false, BooleanType) |
Expand All @@ -372,8 +372,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
literal

protected lazy val dotExpressionHeader: Parser[Expression] =
ident ~ "." ~ ident ^^ {
case i1 ~ _ ~ i2 => UnresolvedAttribute(i1 + "." + i2)
(ident <~ ".") ~ ident ~ rep("." ~> ident) ^^ {
case i1 ~ i2 ~ rest => UnresolvedAttribute(i1 + "." + i2 + rest.mkString(".", ".", ""))
}

protected lazy val dataType: Parser[DataType] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,74 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression {
}

/**
* Returns the value of fields in the Struct `child`.
* Returns the value of fields in the `child`.
* The type of `child` can be struct, or array of struct,
* or array of array of struct, or array of array ... of struct.
*/
case class GetField(child: Expression, fieldName: String) extends UnaryExpression {
type EvaluatedType = Any

def dataType = field.dataType
lazy val dataType = {
structType
buildDataType(field.dataType)
}

override def nullable = child.nullable || field.nullable
override def foldable = child.foldable

protected def structType = child.dataType match {
private var _buildDataType = identity[DataType] _
private lazy val buildDataType = {
structType
_buildDataType
}

private var _nestedArrayCount = 0
private lazy val nestedArrayCount = {
structType
_nestedArrayCount
}

private def getStructType(t: DataType): StructType = t match {
case ArrayType(elementType, containsNull) =>
_buildDataType = {(t: DataType) => ArrayType(t, containsNull)} andThen _buildDataType
_nestedArrayCount += 1
getStructType(elementType)
case s: StructType => s
case otherType => sys.error(s"GetField is not valid on fields of type $otherType")
}

protected lazy val structType: StructType = {
child match {
case n: GetField =>
this._buildDataType = n._buildDataType
this._nestedArrayCount = n._nestedArrayCount
getStructType(n.field.dataType)
case _ => getStructType(child.dataType)
}
}

lazy val field =
structType.fields
.find(_.name == fieldName)
.getOrElse(sys.error(s"No such field $fieldName in ${child.dataType}"))

lazy val ordinal = structType.fields.indexOf(field)

override lazy val resolved = childrenResolved && child.dataType.isInstanceOf[StructType]
override lazy val resolved = childrenResolved

override def eval(input: Row): Any = {
val baseValue = child.eval(input).asInstanceOf[Row]
if (baseValue == null) null else baseValue(ordinal)
val baseValue = child.eval(input)
evaluateValue(baseValue, nestedArrayCount)
}

private def evaluateValue(v: Any, count: Int): Any = {
if (v == null) {
null
} else if (count > 0) {
v.asInstanceOf[Seq[_]].map(r => evaluateValue(r, count - 1))
} else {
v.asInstanceOf[Row](ordinal)
}
}

override def toString = s"$child.$fieldName"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,36 +86,29 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
def resolve(name: String): Option[NamedExpression] =
resolve(name, output)

/**
* Performs attribute resolution given a name and a sequence of possible attributes.
* The only possible formats of name are "ident" and "ident.ident".
*/
/** Performs attribute resolution given a name and a sequence of possible attributes. */
protected def resolve(name: String, input: Seq[Attribute]): Option[NamedExpression] = {
def handleResult[A <: NamedExpression](result: Seq[A]) = {
result.distinct match {
case Seq(a) => Some(a)
case Seq() => None
case ambiguousReferences =>
throw new TreeNodeException(
this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
}
val parts = name.split("\\.")
// Collect all attributes that are output by this nodes children where either the first part
// matches the name or where the first part matches the scope and the second part matches the
// name. Return these matches along with any remaining parts, which represent dotted access to
// struct fields.
val options = input.flatMap { option =>
// If the first part of the desired name matches a qualifier for this possible match, drop it.
val remainingParts =
if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1) else parts
if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
}

name.split("\\.") match {
// the format of name is "ident", it should match the name of a possible attribute
case Array(s) => handleResult(input.filter(_.name == s))

// The format of name is "ident.ident", the first part should matches the scope
// and the second part should matches the name. Or the first part matches the
// name and this attribute is struct type.
case Array(s1, s2) =>
handleResult(input.collect {
case a if (a.qualifiers.contains(s1) && a.name == s2) => a
case a if (a.name == s1 && a.dataType.isInstanceOf[StructType]) =>
Alias(GetField(a, s2), s2)()
})

case _ => None
options.distinct match {
case Seq((a, Nil)) => Some(a) // One match, no nested fields, use it.
// One match, but we also need to extract the requested nested field.
case Seq((a, nestedFields)) =>
Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
case Seq() => None // No matches.
case ambiguousReferences =>
throw new TreeNodeException(
this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ class JsonSuite extends QueryTest {
test("SPARK-2096 Correctly parse dot notations") {
val jsonSchemaRDD = jsonRDD(complexFieldAndType2)
jsonSchemaRDD.registerTempTable("jsonTable")

checkAnswer(
sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"),
(true, "str1") :: Nil
Expand All @@ -593,5 +594,33 @@ class JsonSuite extends QueryTest {
sql("select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1] from jsonTable"),
("str2", 6) :: Nil
)

checkAnswer(
sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"),
(Seq(true, false, null), Seq("str1", null, null)) :: Nil
)

checkAnswer(
sql("select complexNestedArray.field, complexNestedArray.field.innerField from jsonTable"),
(
Seq(
Seq(
Seq("str1", null),
Seq("str2", null)
),
Seq(
Seq("str3", null),
Seq(null, "str4")
),
null
),

Seq(
Seq("str1", "str2"),
Seq("str3", null),
null
)
) :: Nil
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,31 @@ object TestJsonData {
"inner1": "str4"
}],
"field2": [[5, 6], [7, 8]]
}]
}],
"complexNestedArray": [
{
"field": [
{
"innerField": "str1"
},
{
"innerField": "str2"
}
]
},
{
"field": [
{
"innerField": "str3"
},
{
"otherInner": "str4"
}
]
},
{
"otherField": "str5"
}
]
}""" :: Nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

package org.apache.spark.sql.hive.execution

import scala.reflect.ClassTag

import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.test.TestHive._

case class Nested1(f1: Nested2)
Expand Down

0 comments on commit ee8a724

Please sign in to comment.