Skip to content

Commit

Permalink
[SPARK-27241][SQL] Support map_keys and map_values in SelectedField
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

`SelectedField` doesn't support map_keys and map_values for now. When map key or value is complex struct, we should be able to prune unnecessary fields from keys/values. This proposes to add map_keys and map_values support to `SelectedField`.

## How was this patch tested?

Added tests.

Closes #24179 from viirya/SPARK-27241.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
viirya authored and dongjoon-hyun committed Mar 24, 2019
1 parent 01e6305 commit 6f18ac9
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,28 @@ object SelectedField {
val MapType(keyType, _, valueContainsNull) = child.dataType
val opt = dataTypeOpt.map(dt => MapType(keyType, dt, valueContainsNull))
selectField(child, opt)
case MapValues(child) =>
val MapType(keyType, _, valueContainsNull) = child.dataType
// MapValues does not select a field from a struct (i.e. prune the struct) so it can't be
// the top-level extractor. However it can be part of an extractor chain.
val opt = dataTypeOpt.map {
case ArrayType(dataType, _) => MapType(keyType, dataType, valueContainsNull)
case x =>
// This should not happen.
throw new AnalysisException(s"DataType '$x' is not supported by MapValues.")
}
selectField(child, opt)
case MapKeys(child) =>
val MapType(_, valueType, valueContainsNull) = child.dataType
// MapKeys does not select a field from a struct (i.e. prune the struct) so it can't be
// the top-level extractor. However it can be part of an extractor chain.
val opt = dataTypeOpt.map {
case ArrayType(dataType, _) => MapType(dataType, valueType, valueContainsNull)
case x =>
// This should not happen.
throw new AnalysisException(s"DataType '$x' is not supported by MapKeys.")
}
selectField(child, opt)
case GetArrayItem(child, _) =>
// GetArrayItem does not select a field from a struct (i.e. prune the struct) so it can't be
// the top-level extractor. However it can be part of an extractor chain.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package org.apache.spark.sql.catalyst.expressions

import org.scalatest.BeforeAndAfterAll
import org.scalatest.exceptions.TestFailedException

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.types._

class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
class SelectedFieldSuite extends AnalysisTest {
private val ignoredField = StructField("col1", StringType, nullable = false)

// The test schema as a tree string, i.e. `schema.treeString`
Expand Down Expand Up @@ -317,6 +316,18 @@ class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
StructField("subfield1", IntegerType) :: Nil)) :: Nil), valueContainsNull = false)))
}

testSelect(arrayOfStruct, "map_values(col5[0]).field1.subfield1 as foo") {
StructField("col5", ArrayType(MapType(StringType, StructType(
StructField("field1", StructType(
StructField("subfield1", IntegerType) :: Nil)) :: Nil), valueContainsNull = false)))
}

testSelect(arrayOfStruct, "map_values(col5[0]).field1.subfield2 as foo") {
StructField("col5", ArrayType(MapType(StringType, StructType(
StructField("field1", StructType(
StructField("subfield2", IntegerType) :: Nil)) :: Nil), valueContainsNull = false)))
}

// |-- col1: string (nullable = false)
// |-- col6: map (nullable = true)
// | |-- key: string
Expand Down Expand Up @@ -394,6 +405,90 @@ class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
:: Nil)))
}

// |-- col1: string (nullable = false)
// |-- col2: map (nullable = true)
// | |-- key: struct (containsNull = false)
// | | |-- field1: string (nullable = true)
// | | |-- field2: integer (nullable = true)
// | |-- value: array (valueContainsNull = true)
// | | |-- element: struct (containsNull = false)
// | | | |-- field3: struct (nullable = true)
// | | | | |-- subfield1: integer (nullable = true)
// | | | | |-- subfield2: integer (nullable = true)
private val mapWithStructKey = StructType(Array(ignoredField,
StructField("col2", MapType(
StructType(
StructField("field1", StringType) ::
StructField("field2", IntegerType) :: Nil),
ArrayType(StructType(
StructField("field3", StructType(
StructField("subfield1", IntegerType) ::
StructField("subfield2", IntegerType) :: Nil)) :: Nil), containsNull = false)))))

testSelect(mapWithStructKey, "map_keys(col2).field1 as foo") {
StructField("col2", MapType(
StructType(StructField("field1", StringType) :: Nil),
ArrayType(StructType(
StructField("field3", StructType(
StructField("subfield1", IntegerType) ::
StructField("subfield2", IntegerType) :: Nil)) :: Nil), containsNull = false)))
}

testSelect(mapWithStructKey, "map_keys(col2).field2 as foo") {
StructField("col2", MapType(
StructType(StructField("field2", IntegerType) :: Nil),
ArrayType(StructType(
StructField("field3", StructType(
StructField("subfield1", IntegerType) ::
StructField("subfield2", IntegerType) :: Nil)) :: Nil), containsNull = false)))
}

// |-- col1: string (nullable = false)
// |-- col2: map (nullable = true)
// | |-- key: array (valueContainsNull = true)
// | | |-- element: struct (containsNull = false)
// | | | |-- field1: string (nullable = true)
// | | | |-- field2: struct (containsNull = false)
// | | | | |-- subfield1: integer (nullable = true)
// | | | | |-- subfield2: long (nullable = true)
// | |-- value: array (valueContainsNull = true)
// | | |-- element: struct (containsNull = false)
// | | | |-- field3: struct (nullable = true)
// | | | | |-- subfield3: integer (nullable = true)
// | | | | |-- subfield4: integer (nullable = true)
private val mapWithArrayOfStructKey = StructType(Array(ignoredField,
StructField("col2", MapType(
ArrayType(StructType(
StructField("field1", StringType) ::
StructField("field2", StructType(
StructField("subfield1", IntegerType) ::
StructField("subfield2", LongType) :: Nil)) :: Nil), containsNull = false),
ArrayType(StructType(
StructField("field3", StructType(
StructField("subfield3", IntegerType) ::
StructField("subfield4", IntegerType) :: Nil)) :: Nil), containsNull = false)))))

testSelect(mapWithArrayOfStructKey, "map_keys(col2)[0].field1 as foo") {
StructField("col2", MapType(
ArrayType(StructType(
StructField("field1", StringType) :: Nil), containsNull = false),
ArrayType(StructType(
StructField("field3", StructType(
StructField("subfield3", IntegerType) ::
StructField("subfield4", IntegerType) :: Nil)) :: Nil), containsNull = false)))
}

testSelect(mapWithArrayOfStructKey, "map_keys(col2)[0].field2.subfield1 as foo") {
StructField("col2", MapType(
ArrayType(StructType(
StructField("field2", StructType(
StructField("subfield1", IntegerType) :: Nil)) :: Nil), containsNull = false),
ArrayType(StructType(
StructField("field3", StructType(
StructField("subfield3", IntegerType) ::
StructField("subfield4", IntegerType) :: Nil)) :: Nil), containsNull = false)))
}

def assertResult(expected: StructField)(actual: StructField)(selectExpr: String): Unit = {
try {
super.assertResult(expected)(actual)
Expand Down Expand Up @@ -439,7 +534,7 @@ class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
private def unapplySelect(expr: String, relation: LocalRelation) = {
val parsedExpr = parseAsCatalystExpression(Seq(expr)).head
val select = relation.select(parsedExpr)
val analyzed = select.analyze
val analyzed = caseSensitiveAnalyzer.execute(select)
SelectedField.unapply(analyzed.expressions.head)
}

Expand Down

0 comments on commit 6f18ac9

Please sign in to comment.