Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27701][SQL] Extend NestedColumnAliasing to general nested field cases including GetArrayStructField #24599

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
object NestedColumnAliasing {

def unapply(plan: LogicalPlan)
: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match {
: Option[(Map[ExtractValue, Alias], Map[ExprId, Seq[Alias]])] = plan match {
case Project(projectList, child)
if SQLConf.get.nestedSchemaPruningEnabled && canProjectPushThrough(child) =>
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
getAliasSubMap(projectList)
Expand All @@ -43,7 +43,7 @@ object NestedColumnAliasing {
*/
def replaceToAliases(
plan: LogicalPlan,
nestedFieldToAlias: Map[GetStructField, Alias],
nestedFieldToAlias: Map[ExtractValue, Alias],
attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
case Project(projectList, child) =>
Project(
Expand All @@ -56,9 +56,9 @@ object NestedColumnAliasing {
*/
private def getNewProjectList(
projectList: Seq[NamedExpression],
nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
nestedFieldToAlias: Map[ExtractValue, Alias]): Seq[NamedExpression] = {
projectList.map(_.transform {
case f: GetStructField if nestedFieldToAlias.contains(f) =>
case f: ExtractValue if nestedFieldToAlias.contains(f) =>
nestedFieldToAlias(f).toAttribute
}.asInstanceOf[NamedExpression])
}
Expand Down Expand Up @@ -86,32 +86,39 @@ object NestedColumnAliasing {
}

/**
* Return root references that are individually accessed as a whole, and `GetStructField`s.
* Return root references that are individually accessed as a whole, and `GetStructField`s
* or `GetArrayStructField`s which on top of other `ExtractValue`s or special expressions.
* Check `SelectedField` to see which expressions should be listed here.
*/
private def collectRootReferenceAndGetStructField(e: Expression): Seq[Expression] = e match {
case _: AttributeReference | _: GetStructField => Seq(e)
case es if es.children.nonEmpty => es.children.flatMap(collectRootReferenceAndGetStructField)
private def collectRootReferenceAndExtractValue(e: Expression): Seq[Expression] = e match {
case _: AttributeReference => Seq(e)
case GetStructField(_: ExtractValue | _: AttributeReference, _, _) => Seq(e)
case GetArrayStructFields(_: MapValues |
_: MapKeys |
_: ExtractValue |
_: AttributeReference, _, _, _, _) => Seq(e)
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
case es if es.children.nonEmpty => es.children.flatMap(collectRootReferenceAndExtractValue)
case _ => Seq.empty
}

/**
* Return two maps in order to replace nested fields to aliases.
*
* 1. GetStructField -> Alias: A new alias is created for each nested field.
* 1. ExtractValue -> Alias: A new alias is created for each nested field.
* 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it.
*/
private def getAliasSubMap(projectList: Seq[NamedExpression])
: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
: Option[(Map[ExtractValue, Alias], Map[ExprId, Seq[Alias]])] = {
val (nestedFieldReferences, otherRootReferences) =
projectList.flatMap(collectRootReferenceAndGetStructField).partition {
case _: GetStructField => true
projectList.flatMap(collectRootReferenceAndExtractValue).partition {
case _: ExtractValue => true
case _ => false
}

val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
val aliasSub = nestedFieldReferences.asInstanceOf[Seq[ExtractValue]]
.filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
.groupBy(_.references.head)
.flatMap { case (attr, nestedFields: Seq[GetStructField]) =>
.flatMap { case (attr, nestedFields: Seq[ExtractValue]) =>
// Each expression can contain multiple nested fields.
// Note that we keep the original names to deliver to parquet in a case-sensitive way.
val nestedFieldToAlias = nestedFields.distinct.map { f =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

class NestedColumnAliasingSuite extends SchemaPruningTest {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There still are many usage of GetStructField in this test suite. Maybe make a minor PR to rewrite them.


Expand Down Expand Up @@ -221,6 +221,51 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
comparePlans(optimized, expected)
}

test("nested field pruning for getting struct field in array of struct") {
val field1 = GetArrayStructFields(child = 'friends,
field = StructField("first", StringType),
ordinal = 0,
numFields = 3,
containsNull = true)
val field2 = GetStructField('employer, 0, Some("id"))

val query = contact
.limit(5)
.select(field1, field2)
.analyze

val optimized = Optimize.execute(query)

val expected = contact
.select(field1, field2)
.limit(5)
.analyze
comparePlans(optimized, expected)
}

test("nested field pruning for getting struct field in map") {
val field1 = GetStructField(GetMapValue('relatives, Literal("key")), 0, Some("first"))
val field2 = GetArrayStructFields(child = MapValues('relatives),
field = StructField("middle", StringType),
ordinal = 1,
numFields = 3,
containsNull = true)

val query = contact
.limit(5)
.select(field1, field2)
.analyze

val optimized = Optimize.execute(query)

val expected = contact
.select(field1, field2)
.limit(5)
.analyze
comparePlans(optimized, expected)
}


private def collectGeneratedAliases(query: LogicalPlan): ArrayBuffer[String] = {
val aliases = ArrayBuffer[String]()
query.transformAllExpressions {
Expand Down
42 changes: 24 additions & 18 deletions sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,52 @@
Nested Schema Pruning Benchmark For ORC v1
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Selection: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 131 150 25 7.7 130.6 1.0X
Nested column 922 954 21 1.1 922.2 0.1X
Top-level column 127 163 24 7.9 127.1 1.0X
Nested column 974 1023 39 1.0 974.2 0.1X
Nested column in array 4834 4857 23 0.2 4834.1 0.0X

OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Limiting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 446 477 50 2.2 445.5 1.0X
Nested column 1328 1366 44 0.8 1328.4 0.3X
Top-level column 454 488 45 2.2 454.3 1.0X
Nested column 1539 1602 80 0.6 1539.3 0.3X
Nested column in array 5765 5848 69 0.2 5764.7 0.1X

OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 357 386 33 2.8 356.8 1.0X
Nested column 1266 1274 7 0.8 1266.3 0.3X
Top-level column 365 395 58 2.7 364.9 1.0X
Nested column 1456 1477 23 0.7 1456.0 0.3X
Nested column in array 5734 5842 91 0.2 5734.4 0.1X

OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning by exprs: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 368 394 54 2.7 367.6 1.0X
Nested column 3890 3954 80 0.3 3890.3 0.1X
Top-level column 373 387 15 2.7 372.8 1.0X
Nested column 4349 4397 59 0.2 4348.8 0.1X
Nested column in array 8893 8971 73 0.1 8893.2 0.0X

OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Sample: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 129 140 10 7.7 129.1 1.0X
Nested column 966 999 26 1.0 966.2 0.1X
Top-level column 130 159 24 7.7 129.9 1.0X
Nested column 1160 1216 50 0.9 1159.8 0.1X
Nested column in array 5297 5420 176 0.2 5296.8 0.0X

OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Sorting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 573 601 61 1.7 573.2 1.0X
Nested column 4417 4598 149 0.2 4417.1 0.1X
Top-level column 585 615 60 1.7 585.5 1.0X
Nested column 4972 5213 156 0.2 4972.2 0.1X
Nested column in array 10095 10156 32 0.1 10095.4 0.1X


42 changes: 24 additions & 18 deletions sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,52 @@
Nested Schema Pruning Benchmark For ORC v2
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12 on Linux 4.15.0-1021-aws
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Selection: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 117 144 21 8.5 117.0 1.0X
Nested column 1114 1138 23 0.9 1114.2 0.1X
Top-level column 122 161 29 8.2 121.9 1.0X
Nested column 1255 1279 23 0.8 1255.4 0.1X
Nested column in array 5352 5393 37 0.2 5352.3 0.0X

OpenJDK 64-Bit Server VM 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12 on Linux 4.15.0-1021-aws
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Limiting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 134 154 17 7.5 134.0 1.0X
Nested column 1156 1220 51 0.9 1156.2 0.1X
Top-level column 132 162 32 7.6 131.8 1.0X
Nested column 1246 1286 32 0.8 1245.6 0.1X
Nested column in array 5395 5542 143 0.2 5394.9 0.0X

OpenJDK 64-Bit Server VM 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12 on Linux 4.15.0-1021-aws
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 350 359 6 2.9 350.5 1.0X
Nested column 1426 1443 13 0.7 1426.5 0.2X
Top-level column 385 403 20 2.6 385.4 1.0X
Nested column 1663 1691 52 0.6 1663.2 0.2X
Nested column in array 6264 6335 73 0.2 6264.4 0.1X

OpenJDK 64-Bit Server VM 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12 on Linux 4.15.0-1021-aws
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning by exprs: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 357 363 5 2.8 356.9 1.0X
Nested column 4186 4245 71 0.2 4186.3 0.1X
Top-level column 392 422 58 2.5 392.2 1.0X
Nested column 4104 4153 57 0.2 4104.0 0.1X
Nested column in array 8668 8748 55 0.1 8668.3 0.0X

OpenJDK 64-Bit Server VM 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12 on Linux 4.15.0-1021-aws
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Sample: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 115 134 13 8.7 114.6 1.0X
Nested column 1148 1199 59 0.9 1148.3 0.1X
Top-level column 130 146 22 7.7 130.1 1.0X
Nested column 1127 1166 53 0.9 1127.3 0.1X
Nested column in array 4906 4968 40 0.2 4905.8 0.0X

OpenJDK 64-Bit Server VM 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12 on Linux 4.15.0-1021-aws
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Sorting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 274 279 3 3.6 274.1 1.0X
Nested column 3133 3254 145 0.3 3132.7 0.1X
Top-level column 291 308 25 3.4 290.5 1.0X
Nested column 3016 3091 58 0.3 3016.0 0.1X
Nested column in array 7730 7821 140 0.1 7729.5 0.0X


Loading