[SPARK-42359][SQL] Support row skipping when reading CSV files#39907
[SPARK-42359][SQL] Support row skipping when reading CSV files#39907ted-jenks wants to merge 36 commits intoapache:masterfrom
Conversation
cab25be to
eda35c2
Compare
eda35c2 to
284dde6
Compare
|
@HyukjinKwon I can see you have touched a lot of this code. What do you think about these changes? |
| nonEmptyLines.as[String] | ||
| } | ||
| } | ||
| commentFilteredLines.rdd.zipWithIndex().toDF("value", "order") |
There was a problem hiding this comment.
zipWithIndex is actually expensive .. it requires another job to execute.
There was a problem hiding this comment.
@HyukjinKwon I reworked it to try to avoid this, how does it look now?
|
@HyukjinKwon This is ready for a re-review 🤞 |
|
@HyukjinKwon would be great to get an update on your thoughts on this issue? |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
Outdated
Show resolved
Hide resolved
|
@HyukjinKwon I have done more work on this, please let me know what you think! |
holdenk
left a comment
There was a problem hiding this comment.
Some questions (mostly just things I think we should document better for the new behaviour).
| <tr> | ||
| <td><code>skipLines</code></td> | ||
| <td>0</td> | ||
| <td>Sets the number of non-empty, uncommented lines to skip before parsing CSV files. If the <code>header</code> option is set to <code>true</code>, the first line after the number of <code>skipLines</code> will be taken as the header.</td> | ||
| <td>read</td> | ||
| </tr> |
There was a problem hiding this comment.
Does skipLines apply before or after the filtering? (e.g. if we have 10 empty lines at the top of partition 1, what is the behaviour)?
There was a problem hiding this comment.
When does a CSV file have multiple header rows?
| val handleSkipLines: () => Unit = | ||
| () => 1.to(skipLines).foreach(_ => tokenizer.parseNext()) |
There was a problem hiding this comment.
Whats the behaviour when skipLines is greater than the length of the input file?
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
MaxGekk
left a comment
There was a problem hiding this comment.
@ted-jenks The feature is useful from my point of view. May I ask you to rebase on the recent master.
What changes were proposed in this pull request?
Added support row skipping when reading CSV files with a
skipLinesoption.Why are the changes needed?
In SPARK-42359 we highlight the need for row skipping functionality in CSV reads. In summary, there is no way of users reading CSV files that do not have the header/data in the first row with the DataFrame API. Advanced users can use RDDs and
zipWithIndexto remove the first n rows, though this is not compatible with RDDs being an unordered concept.Does this PR introduce any user-facing change?
Now the user's have access to a new a option when reading CSVs. This option defaults to 0 so legacy code will be unaffected. The
skipLinesoption that has been added will cause the parser to skip a specified number of lines before parsing the data. It will respect multline values. If theheaderoption is set totrue, the first line after theskipLineswill be taken as the header.The option is used:
spark.read.option("skipLines", 2).csv("/path/to/file.csv").This change has been reflected in the user docs.
How was this patch tested?
Tests added in CSVSuite for:
multiLineenabled and disabled.multiLineenabled and disabled (these take different code-paths).headerset astrueand as asfalse(ensure schema is correctly inferred).skipLinesoption throws exception.