Skip to content

Commit

Permalink
SPARK-2096 Correctly parse dot notations
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Sep 10, 2014
1 parent 25b5b86 commit dc31698
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
expression ~ "[" ~ expression <~ "]" ^^ {
case base ~ _ ~ ordinal => GetItem(base, ordinal)
} |
dotExpressionHeader |
expression ~ "." ~ ident ^^ {
case base ~ _ ~ fieldName => GetField(base, fieldName)
} |
TRUE ^^^ Literal(true, BooleanType) |
FALSE ^^^ Literal(false, BooleanType) |
cast |
Expand All @@ -367,6 +371,11 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
"*" ^^^ Star(None) |
literal

protected lazy val dotExpressionHeader: Parser[Expression] =
ident ~ "." ~ ident ^^ {
case i1 ~ _ ~ i2 => UnresolvedAttribute(i1 + "." + i2)
}

protected lazy val dataType: Parser[DataType] =
STRING ^^^ StringType | TIMESTAMP ^^^ TimestampType
}
Expand All @@ -380,7 +389,7 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical {

delimiters += (
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
",", ";", "%", "{", "}", ":", "[", "]"
",", ";", "%", "{", "}", ":", "[", "]", "."
)

override lazy val token: Parser[Token] = (
Expand All @@ -401,7 +410,7 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical {
| failure("illegal character")
)

override def identChar = letter | elem('_') | elem('.')
override def identChar = letter | elem('_')

override def whitespace: Parser[Any] = rep(
whitespaceChar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,24 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {

/** Performs attribute resolution given a name and a sequence of possible attributes. */
protected def resolve(name: String, input: Seq[Attribute]): Option[NamedExpression] = {
val parts = name.split("\\.")
// Collect all attributes that are output by this nodes children where either the first part
// matches the name or where the first part matches the scope and the second part matches the
// name. Return these matches along with any remaining parts, which represent dotted access to
// struct fields.
val options = input.flatMap { option =>
// If the first part of the desired name matches a qualifier for this possible match, drop it.
val remainingParts =
if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1) else parts
if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
def handleResult[A <: NamedExpression](result: Seq[A]) = {
result match {
case Seq(a) => Some(a)
case Seq() => None
case ambiguousReferences =>
throw new TreeNodeException(
this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
}
}

options.distinct match {
case Seq((a, Nil)) => Some(a) // One match, no nested fields, use it.
// One match, but we also need to extract the requested nested field.
case Seq((a, nestedFields)) =>
a.dataType match {
case StructType(fields) =>
Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
case _ => None // Don't know how to resolve these field references
}
case Seq() => None // No matches.
case ambiguousReferences =>
throw new TreeNodeException(
this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
name.split("\\.") match {
case Array(s) => handleResult(input.filter(_.name == s))
case Array(s1, s2) =>
handleResult(input.collect {
case a if a.qualifiers.contains(s1) && a.name == s2 => a
case a if a.name == s1 && a.dataType.isInstanceOf[StructType] => Alias(GetField(a, s2), s2)()
})
case _ => None
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -581,4 +581,17 @@ class JsonSuite extends QueryTest {
"this is a simple string.") :: Nil
)
}

test("SPARK-2096 Correctly parse dot notations") {
val jsonSchemaRDD = jsonRDD(complexFieldAndType2)
jsonSchemaRDD.registerTempTable("jsonTable")
checkAnswer(
sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"),
(true, "str1") :: Nil
)
checkAnswer(
sql("select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1] from jsonTable"),
("str2", 6) :: Nil
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,30 @@ object TestJsonData {
"""{"c":[33, 44]}""" ::
"""{"d":{"field":true}}""" ::
"""{"e":"str"}""" :: Nil)

val complexFieldAndType2 =
TestSQLContext.sparkContext.parallelize(
"""{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"complexArrayOfStruct": [
{
"field1": [
{
"inner1": "str1"
},
{
"inner2": ["str2", "str22"]
}],
"field2": [[1, 2], [3, 4]]
},
{
"field1": [
{
"inner2": ["str3", "str33"]
},
{
"inner1": "str4"
}],
"field2": [[5, 6], [7, 8]]
}]
}""" :: Nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@

package org.apache.spark.sql.parquet

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}

import parquet.hadoop.ParquetFileWriter
import parquet.hadoop.util.ContextUtil
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
import org.apache.spark.sql.catalyst.types.IntegerType
import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
Expand Down Expand Up @@ -87,11 +82,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA

var testRDD: SchemaRDD = null

// TODO: remove this once SqlParser can parse nested select statements
var nestedParserSqlContext: NestedParserSQLContext = null

override def beforeAll() {
nestedParserSqlContext = new NestedParserSQLContext(TestSQLContext.sparkContext)
ParquetTestData.writeFile()
ParquetTestData.writeFilterFile()
ParquetTestData.writeNestedFile1()
Expand Down Expand Up @@ -718,11 +709,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}

test("Projection in addressbook") {
val data = nestedParserSqlContext
.parquetFile(ParquetTestData.testNestedDir1.toString)
.toSchemaRDD
val data = parquetFile(ParquetTestData.testNestedDir1.toString).toSchemaRDD
data.registerTempTable("data")
val query = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM data")
val query = sql("SELECT owner, contacts[1].name FROM data")
val tmp = query.collect()
assert(tmp.size === 2)
assert(tmp(0).size === 2)
Expand All @@ -733,21 +722,19 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}

test("Simple query on nested int data") {
val data = nestedParserSqlContext
.parquetFile(ParquetTestData.testNestedDir2.toString)
.toSchemaRDD
val data = parquetFile(ParquetTestData.testNestedDir2.toString).toSchemaRDD
data.registerTempTable("data")
val result1 = nestedParserSqlContext.sql("SELECT entries[0].value FROM data").collect()
val result1 = sql("SELECT entries[0].value FROM data").collect()
assert(result1.size === 1)
assert(result1(0).size === 1)
assert(result1(0)(0) === 2.5)
val result2 = nestedParserSqlContext.sql("SELECT entries[0] FROM data").collect()
val result2 = sql("SELECT entries[0] FROM data").collect()
assert(result2.size === 1)
val subresult1 = result2(0)(0).asInstanceOf[CatalystConverter.StructScalaType[_]]
assert(subresult1.size === 2)
assert(subresult1(0) === 2.5)
assert(subresult1(1) === false)
val result3 = nestedParserSqlContext.sql("SELECT outerouter FROM data").collect()
val result3 = sql("SELECT outerouter FROM data").collect()
val subresult2 = result3(0)(0)
.asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
.asInstanceOf[CatalystConverter.ArrayScalaType[_]]
Expand All @@ -760,19 +747,18 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}

test("nested structs") {
val data = nestedParserSqlContext
.parquetFile(ParquetTestData.testNestedDir3.toString)
val data = parquetFile(ParquetTestData.testNestedDir3.toString)
.toSchemaRDD
data.registerTempTable("data")
val result1 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect()
val result1 = sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect()
assert(result1.size === 1)
assert(result1(0).size === 1)
assert(result1(0)(0) === false)
val result2 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect()
val result2 = sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect()
assert(result2.size === 1)
assert(result2(0).size === 1)
assert(result2(0)(0) === true)
val result3 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect()
val result3 = sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect()
assert(result3.size === 1)
assert(result3(0).size === 1)
assert(result3(0)(0) === false)
Expand All @@ -796,11 +782,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}

test("map with struct values") {
val data = nestedParserSqlContext
.parquetFile(ParquetTestData.testNestedDir4.toString)
.toSchemaRDD
val data = parquetFile(ParquetTestData.testNestedDir4.toString).toSchemaRDD
data.registerTempTable("mapTable")
val result1 = nestedParserSqlContext.sql("SELECT data2 FROM mapTable").collect()
val result1 = sql("SELECT data2 FROM mapTable").collect()
assert(result1.size === 1)
val entry1 = result1(0)(0)
.asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
Expand All @@ -814,7 +798,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(entry2 != null)
assert(entry2(0) === 49)
assert(entry2(1) === null)
val result2 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect()
val result2 = sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect()
assert(result2.size === 1)
assert(result2(0)(0) === 42.toLong)
assert(result2(0)(1) === "the answer")
Expand All @@ -825,15 +809,12 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
// has no effect in this test case
val tmpdir = Utils.createTempDir()
Utils.deleteRecursively(tmpdir)
val result = nestedParserSqlContext
.parquetFile(ParquetTestData.testNestedDir1.toString)
.toSchemaRDD
val result = parquetFile(ParquetTestData.testNestedDir1.toString).toSchemaRDD
result.saveAsParquetFile(tmpdir.toString)
nestedParserSqlContext
.parquetFile(tmpdir.toString)
parquetFile(tmpdir.toString)
.toSchemaRDD
.registerTempTable("tmpcopy")
val tmpdata = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM tmpcopy").collect()
val tmpdata = sql("SELECT owner, contacts[1].name FROM tmpcopy").collect()
assert(tmpdata.size === 2)
assert(tmpdata(0).size === 2)
assert(tmpdata(0)(0) === "Julien Le Dem")
Expand All @@ -844,20 +825,17 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}

test("Writing out Map and reading it back in") {
val data = nestedParserSqlContext
.parquetFile(ParquetTestData.testNestedDir4.toString)
.toSchemaRDD
val data = parquetFile(ParquetTestData.testNestedDir4.toString).toSchemaRDD
val tmpdir = Utils.createTempDir()
Utils.deleteRecursively(tmpdir)
data.saveAsParquetFile(tmpdir.toString)
nestedParserSqlContext
.parquetFile(tmpdir.toString)
parquetFile(tmpdir.toString)
.toSchemaRDD
.registerTempTable("tmpmapcopy")
val result1 = nestedParserSqlContext.sql("""SELECT data1["key2"] FROM tmpmapcopy""").collect()
val result1 = sql("""SELECT data1["key2"] FROM tmpmapcopy""").collect()
assert(result1.size === 1)
assert(result1(0)(0) === 2)
val result2 = nestedParserSqlContext.sql("SELECT data2 FROM tmpmapcopy").collect()
val result2 = sql("SELECT data2 FROM tmpmapcopy").collect()
assert(result2.size === 1)
val entry1 = result2(0)(0)
.asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
Expand All @@ -871,42 +849,10 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(entry2 != null)
assert(entry2(0) === 49)
assert(entry2(1) === null)
val result3 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect()
val result3 = sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect()
assert(result3.size === 1)
assert(result3(0)(0) === 42.toLong)
assert(result3(0)(1) === "the answer")
Utils.deleteRecursively(tmpdir)
}
}

// TODO: the code below is needed temporarily until the standard parser is able to parse
// nested field expressions correctly
class NestedParserSQLContext(@transient override val sparkContext: SparkContext) extends SQLContext(sparkContext) {
override protected[sql] val parser = new NestedSqlParser()
}

class NestedSqlLexical(override val keywords: Seq[String]) extends SqlLexical(keywords) {
override def identChar = letter | elem('_')
delimiters += (".")
}

class NestedSqlParser extends SqlParser {
override val lexical = new NestedSqlLexical(reservedWords)

override protected lazy val baseExpression: PackratParser[Expression] =
expression ~ "[" ~ expression <~ "]" ^^ {
case base ~ _ ~ ordinal => GetItem(base, ordinal)
} |
expression ~ "." ~ ident ^^ {
case base ~ _ ~ fieldName => GetField(base, fieldName)
} |
TRUE ^^^ Literal(true, BooleanType) |
FALSE ^^^ Literal(false, BooleanType) |
cast |
"(" ~> expression <~ ")" |
function |
"-" ~> literal ^^ UnaryMinus |
ident ^^ UnresolvedAttribute |
"*" ^^^ Star(None) |
literal
}

0 comments on commit dc31698

Please sign in to comment.