-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30323][SQL] Support filters pushdown in CSV datasource #26973
Conversation
This reverts commit 11bcbc6.
Test build #115633 has finished for PR 26973 at commit
|
Test build #115636 has finished for PR 26973 at commit
|
Test build #116621 has finished for PR 26973 at commit
|
Test build #116624 has finished for PR 26973 at commit
|
Test build #116647 has finished for PR 26973 at commit
|
Test build #116649 has finished for PR 26973 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
Show resolved
Hide resolved
Looks pretty good but I will take a final look after the comments were addressed. |
Test build #116694 has finished for PR 26973 at commit
|
Test build #116722 has finished for PR 26973 at commit
|
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
Show resolved
Hide resolved
Test build #116755 has finished for PR 26973 at commit
|
jenkins, retest this, please |
Test build #116764 has finished for PR 26973 at commit
|
Merged to master. |
What changes were proposed in this pull request?
In the PR, I propose to support pushed down filters in CSV datasource. The reason of pushing a filter up to
UnivocityParser
is to apply the filter as soon as all its attributes become available i.e. converted from CSV fields to desired values according to the schema. This allows to skip conversions of other values if the filter returnsfalse
. This can improve performance when pushed filters are highly selective and conversion of CSV string fields to desired values are comparably expensive ( for example, conversion toTIMESTAMP
values).Here are details of the implementation:
UnivocityParser.convert()
converts parsed CSV tokens one-by-one sequentially starting from index 0 up toparsedSchema.length - 1
. At current indexi
, it applies filters that refer to attributes at row fields indexes0..i
. If any filter returnsfalse
, it skips conversions of other input tokens.requiredSchema
. The expressions are compiled to predicates via generating Java code.And
expression. Final predicate at indexN
can refer to row fields at the positions0..N
, and can be applied to a row even if other fields at the positionsN+1..requiredSchema.lenght-1
are not set.Why are the changes needed?
The changes improve performance on synthetic benchmarks more than 9 times (on JDK 8 & 11):
Does this PR introduce any user-facing change?
No
How was this patch tested?
CSVFiltersSuite
CSVSuite
andUnivocityParserSuite