Skip to content

[SPARK-34897][SQL][3.1] Support reconcile schemas based on index after nested column pruning#32279

Closed
wangyum wants to merge 1 commit intoapache:branch-3.1from
wangyum:SPARK-34897-3.1
Closed

[SPARK-34897][SQL][3.1] Support reconcile schemas based on index after nested column pruning#32279
wangyum wants to merge 1 commit intoapache:branch-3.1from
wangyum:SPARK-34897-3.1

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Apr 21, 2021

This PR backports #31993 to branch-3.1. The origin PR description:

What changes were proposed in this pull request?

It will remove StructField when pruning nested columns. For example:

spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")

spark.sql("SELECT _col0, _col2.c1 FROM t1").show

Before this pr. The returned schema is: `_col0` INT,`_col2` STRUCT<`c1`: STRING> add it will throw exception:

java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160)

After this pr. The returned schema is: `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING>.

The finally schema is `_col0` INT,`_col2` STRUCT<`c1`: STRING> after the complete column pruning:

val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

val neededFieldNames = neededOutput.map(_.name).toSet
r.pruneColumns(StructType(prunedSchema.filter(f => neededFieldNames.contains(f.name))))

Why are the changes needed?

Fix bug.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@github-actions github-actions bot added the SQL label Apr 21, 2021
@SparkQA
Copy link

SparkQA commented Apr 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42281/

@SparkQA
Copy link

SparkQA commented Apr 22, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42281/

@SparkQA
Copy link

SparkQA commented Apr 22, 2021

Test build #137754 has finished for PR 32279 at commit 040de22.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Apr 23, 2021

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42370/

@SparkQA
Copy link

SparkQA commented Apr 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42370/

@SparkQA
Copy link

SparkQA commented Apr 23, 2021

Test build #137840 has finished for PR 32279 at commit 040de22.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum requested review from cloud-fan and viirya and removed request for viirya April 23, 2021 06:34
wangyum added a commit that referenced this pull request Apr 23, 2021
…r nested column pruning

This PR backports #31993 to branch-3.1. The origin PR description:

### What changes were proposed in this pull request?

It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example:
```scala
spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")

spark.sql("SELECT _col0, _col2.c1 FROM t1").show
```

Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception:
```
java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160)
```

After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```.

The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning:
https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213

https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32279 from wangyum/SPARK-34897-3.1.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
@wangyum
Copy link
Member Author

wangyum commented Apr 23, 2021

Merged to branch-3.1.

@wangyum wangyum closed this Apr 23, 2021
@wangyum wangyum deleted the SPARK-34897-3.1 branch April 23, 2021 07:23
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
…r nested column pruning

This PR backports apache#31993 to branch-3.1. The origin PR description:

It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example:
```scala
spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")

spark.sql("SELECT _col0, _col2.c1 FROM t1").show
```

Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception:
```
java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160)
```

After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```.

The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning:
https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213

https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97

Fix bug.

No.

Unit test.

Closes apache#32279 from wangyum/SPARK-34897-3.1.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
…r nested column pruning

This PR backports apache#31993 to branch-3.1. The origin PR description:

### What changes were proposed in this pull request?

It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example:
```scala
spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")

spark.sql("SELECT _col0, _col2.c1 FROM t1").show
```

Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception:
```
java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160)
```

After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```.

The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning:
https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213

https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes apache#32279 from wangyum/SPARK-34897-3.1.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants