Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning #44939

Closed
wants to merge 9 commits into from

Conversation

dtenedor
Copy link
Contributor

@dtenedor dtenedor commented Jan 29, 2024

What changes were proposed in this pull request?

This PR fixes a CSV parsing bug with existence default values and column pruning (https://issues.apache.org/jira/browse/SPARK-46890).

The bug fix includes disabling column pruning specifically when checking the CSV header schema against the required schema expected by Catalyst. This makes the expected schema match what the CSV parser provides, since later we also happen instruct the CSV parser to disable column pruning and instead read each entire row in order to correctly assign the default value(s) during execution.

Why are the changes needed?

Before this change, queries from a subset of the columns in a CSV table whose CREATE TABLE statement contained default values would return an internal exception. For example:

CREATE TABLE IF NOT EXISTS products (
  product_id INT,
  name STRING,
  price FLOAT default 0.0,
  quantity INT default 0
)
USING CSV
OPTIONS (
  header 'true',
  inferSchema 'false',
  enforceSchema 'false',
  path '/Users/maximgekk/tmp/products.csv'
);

The CSV file products.csv:

product_id,name,price,quantity
1,Apple,0.50,100
2,Banana,0.25,200
3,Orange,0.75,50

The query fails:

spark-sql (default)> SELECT price FROM products;
24/01/28 11:43:09 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 6)
java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 4, schema size: 1
CSV file: file:///Users/Daniel.Tenedorio/tmp/products.csv

Does this PR introduce any user-facing change?

No.

How was this patch tested?

This PR adds test coverage.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Jan 29, 2024
@dtenedor
Copy link
Contributor Author

dtenedor commented Jan 29, 2024

@MaxGekk here is the fix!
and @cloud-fan

@MaxGekk
Copy link
Member

MaxGekk commented Jan 30, 2024

@dtenedor Thanks for the ping. I will review it today.

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dtenedor Could you explain, please, how your test passed for CSV V2 datasource if you haven't fixed it?

I haven't found any changes at:

val schema = if (options.isColumnPruningEnabled) actualReadDataSchema else actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
schema, options, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile)

respond to code review comments
@dtenedor
Copy link
Contributor Author

dtenedor commented Jan 30, 2024

@dtenedor Could you explain, please, how your test passed for CSV V2 datasource if you haven't fixed it?
I haven't found any changes at:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala

@MaxGekk I looked into this. The new unit test test("SPARK-46862: column pruning in the multi-line mode") runs with both DSV1 and DSV2 scans (with the CSVv1Suite and CSVv2Suite subclasses) so we have coverage for both cases.

To help answer your question: apparently the DSV2 scan sets the required columns to scan differently. For example, with the following test [1], I find the physical DataSourceScanExec node gets built with this expected output schema: val outputDataSchema = (readDataColumns ++ generatedMetadataColumns).toStructType equal to just two columns: year and comment. Column pruning has already applied to the schema provided to the DSV2 CSV scan.

[1]

      spark.sql(
        s"""
           |CREATE TABLE CarsTable(
           |  year INT,
           |  make STRING,
           |  model STRING,
           |  comment STRING DEFAULT '',
           |  blank STRING DEFAULT '')
           |USING csv
           |OPTIONS (
           |  header "true",
           |  inferSchema "false",
           |  enforceSchema "false",
           |  path "${testFile(carsFile)}"
           |)
       """.stripMargin)
      val expected = Seq(
        Row("No comment"),
        Row("Go get one now they are going fast"))
      checkAnswer(
        sql("SELECT comment FROM CarsTable WHERE year < 2014"),
        expected)
      checkAnswer(
        spark.read.format("csv")
          .options(

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dtenedor Could you write or modify your new test to check V2 DS implementation. I put a breakpoint in CSVPartitionReaderFactory:
Screenshot 2024-01-31 at 09 44 44

And your test didn't hit the breakpoint.

@dtenedor
Copy link
Contributor Author

dtenedor commented Jan 31, 2024

@dtenedor Could you write or modify your new test to check V2 DS implementation. I put a breakpoint in CSVPartitionReaderFactory.
And your test didn't hit the breakpoint.

@MaxGekk Good question, reproducing it now, requesting to run this new test("SPARK-46890: CSV fails on a column with default and without enforcing schema") in IntelliJ brings up a drop-down for CSVv1Suite or CSVv2Suite.

image

Both the V1 and V2 cases pass.

I am able to hit that breakpoint on the latter but not the former.

image

Interestingly, I copied the unit test to a new PR and it fails for both CSV V1 and CSV V2. So this PR fixes it for both versions, but only V2 hits the breakpoint you suggested.

@dtenedor dtenedor requested a review from MaxGekk January 31, 2024 17:04
@MaxGekk
Copy link
Member

MaxGekk commented Jan 31, 2024

@dtenedor Could you remove the first part of the test:

  test("SPARK-46890: CSV fails on a column with default and without enforcing schema") {
    withTable("Products") {
      spark.sql(
        s"""
           |CREATE TABLE IF NOT EXISTS Products (
           |  product_id INT,
           |  name STRING,
           |  price FLOAT default 0.0,
           |  quantity INT default 0
           |)
           |USING CSV
           |OPTIONS (
           |  header 'true',
           |  inferSchema 'false',
           |  enforceSchema 'false',
           |  path "${testFile(productsFile)}"
           |)
       """.stripMargin)
      checkAnswer(
        sql("SELECT price FROM Products"),
        Seq(
          Row(0.50),
          Row(0.25),
          Row(0.75)))
    }
  }

Set a breakpoint inside of the main constructor of CSVHeaderChecker:
Screenshot 2024-01-31 at 20 43 42
you should see where we hit the breakpoint from:

Screenshot 2024-01-31 at 20 45 49

The CSVFileFormat is v1 implementation. It seems we fallback from V2 to V1 datasource implementation for some reasons.

respond to code review comments
@dtenedor
Copy link
Contributor Author

dtenedor commented Jan 31, 2024

@MaxGekk I tried several different ways of testing this bug with DSV2 CSV scans, but was unable to use a schema with column defaults and hitting that breakpoint in the DSV2 CSVPartitionReaderFactory. I can hit that breakpoint, but not when loading the table schema from Hive which is necessary to get the column default metadata in there.

At any rate, I updated CSVOptions.isColumnPruningEnabled only have one overload that takes the required schema and checks for column defaults, which enforces that all V1 and V2 callers use the same code now to check whether column pruning is enabled. So this particular bug should be fixed in all cases now.

Copy link
Contributor Author

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @MaxGekk for your thorough reviews!!

@dtenedor dtenedor requested a review from MaxGekk February 1, 2024 21:32
Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except of minor comments.

@dtenedor dtenedor requested a review from MaxGekk February 2, 2024 19:48
@dtenedor
Copy link
Contributor Author

dtenedor commented Feb 2, 2024

Thanks again @MaxGekk for your reviews!

@MaxGekk
Copy link
Member

MaxGekk commented Feb 3, 2024

+1, LGTM. Merging to master.
Thank you, @dtenedor and @cloud-fan for review.

@MaxGekk MaxGekk closed this in 16ac820 Feb 3, 2024
@MaxGekk
Copy link
Member

MaxGekk commented Feb 3, 2024

@dtenedor Should we backport it to branch-3.5 and branch-3.4? If so, please, open separate PRs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants