Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17636][SPARK-25557][SQL] Parquet and ORC predicate pushdown in nested fields #27155

Closed
wants to merge 7 commits into from

Conversation

emaynardigs
Copy link

@emaynardigs emaynardigs commented Jan 9, 2020

What changes were proposed in this pull request?

Firstly, much of this PR is a rebase of #22535, much thanks to @dbtsai for his work.

Spark can now push down predicates on struct columns when reading Parquet and ORC tables.

Why are the changes needed?

There are significant performance gains to be had from pushing down predicates.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing UT were extended to cover the new functionality.

Sanity check tests:

//// Setup ////
spark.range(1000 * 1000).toDF("id").selectExpr("id", "STRUCT(id x, STRUCT(CAST(id AS STRING) z) y) nested").write.mode("overwrite").parquet("/tmp/data")
spark.range(1000 * 1000).toDF("id").selectExpr("id", "STRUCT(id x, STRUCT(CAST(id AS STRING) z) y) nested").write.mode("overwrite").orc("/tmp/data_orc")
def hack_benchmark(f: (() => Any)): Double = {
	(0 to 100).map(i => {
		val start = System.currentTimeMillis
		f()
		(System.currentTimeMillis - start)
	}).sum / 100.0
}


//// Without patch ////
scala> spark.read.parquet("/tmp/data").filter("nested.x = 100").explain
== Physical Plan ==
*(1) Project [id#0L, nested#1]
+- *(1) Filter (isnotnull(nested#1) && (nested#1.x = 100))
   +- *(1) FileScan parquet [id#0L,nested#1] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/data], PartitionFilters: [], PushedFilters: [IsNotNull(nested)], ReadSchema: struct<id:bigint,nested:struct<x:bigint,y:string>>

scala> spark.read.orc("/tmp/data_orc").filter("nested.x = 100").explain
== Physical Plan ==
*(1) Project [id#9253L, nested#9254]
+- *(1) Filter (isnotnull(nested#9254) && (nested#9254.x = 100))
   +- *(1) FileScan orc [id#9253L,nested#9254] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/tmp/data_orc], PartitionFilters: [], PushedFilters: [IsNotNull(nested)], ReadSchema: struct<id:bigint,nested:struct<x:bigint,y:string>>

scala> hack_benchmark(spark.read.parquet("/tmp/data").filter("nested.x < 100").count _)
res0: Double = 419.82 

scala> hack_benchmark(spark.read.orc("/tmp/data_orc").filter("nested.x < 100").count _)
res5: Double = 1525.83  

//// With patch ////
scala> spark.read.parquet("/tmp/data").filter("nested.x = 100").explain
== Physical Plan ==
*(1) Project [id#54L, nested#55]
+- *(1) Filter (isnotnull(nested#55) AND (nested#55.x = 100))
   +- BatchScan[id#54L, nested#55] ParquetScan Location: InMemoryFileIndex[file:/tmp/data], ReadSchema: struct<id:bigint,nested:struct<x:bigint,y:string>>, PushedFilters: [EqualTo(nested.x,100)]

scala> spark.read.orc("/tmp/data_orc").filter("nested.x = 100").explain
== Physical Plan ==
*(1) Project [id#0L, nested#1]
+- *(1) Filter (isnotnull(nested#1) AND (nested#1.x = 100))
   +- BatchScan[id#0L, nested#1] OrcScan Location: InMemoryFileIndex[file:/tmp/data_orc], ReadSchema: struct<id:bigint,nested:struct<x:bigint,y:struct<z:string>>>, PushedFilters: [EqualTo(nested.x,100)]
            
scala> hack_benchmark(spark.read.parquet("/tmp/data").filter("nested.x < 100").count _)
res0: Double = 192.15                                                           

scala> hack_benchmark(spark.read.orc("/tmp/data_orc").filter("nested.x < 100").count _)
res1: Double = 1029.57                                                   

Note the significant performance improvement and the inclusion of the filter in PushedFilters in both cases.

@dongjoon-hyun
Copy link
Member

Thank you for making a PR, @emaynardigs
However, could you build and test locally first?

@emaynardigs
Copy link
Author

Yep, failure related, I was mistakenly only testing locally against the v2 ORC code path before but Jenkins failed on the v1 OrcFilters. Updating the v1 code path and re-testing.

@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jan 10, 2020

Test build #116456 has finished for PR 27155 at commit 3e78632.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jan 10, 2020

Test build #116479 has finished for PR 27155 at commit 3e78632.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 10, 2020

Test build #116501 has finished for PR 27155 at commit f612321.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, you must add @dbtsai 's authorship by add a commit with his authorship.

The following is not a standard way to keep the authorship.

Firstly, much of this PR is a rebase of #22535, much thanks to @dbtsai for his work.

Second, you need to address all the existing comment in the original PR. In the PR description, could you explain what is the improvement here from the original PR? If there is nothing new here, we had better close this one and asking @dbtsai to update his original PR.

@emaynardigs
Copy link
Author

emaynardigs commented Jan 11, 2020

First of all, you must add @dbtsai 's authorship by add a commit with his authorship.

The following is not a standard way to keep the authorship.

Firstly, much of this PR is a rebase of #22535, much thanks to @dbtsai for his work.

Second, you need to address all the existing comment in the original PR. In the PR description, could you explain what is the improvement here from the original PR? If there is nothing new here, we had better close this one and asking @dbtsai to update his original PR.

Hi @dongjoon-hyun, thanks for the feedback! Actually, when viewing the original PR I was unaware that @dbtsai is a member and, in fact, your colleague. Your concern definitely makes sense.

Firstly, I should say that there are actually no commits here under the original author's ownership; the code has diverged to the point now where, while I took some ideas & code from the original PR, it was easier to do all of this manually. I called this a "rebase", but it is a rebase only in the abstract sense that a lot of code was copied and updated for the latest master and not in the sense of any source control.

Secondly, I'll try to elaborate on why I opened a new PR...

  1. The original PR was abandoned with no activity or reply from the author in a year. The original author was asked to update his PR and did not respond.
  2. The original PR was flawed, failed several tests, and included some strange choices (like always splitting on a field that contained '.') that I did not agree with.
  3. The original PR did not extend to ORC and only worked for Parquet. My company uses yet another binary format that the original PR would have been incompatible with, but the new one can work with.

As you pointed out, I have addressed the comments in the original PR. I've extended the functionality to ORC as well as Parquet, tested the functionality myself, and have written more unit tests (largely copied from the original PR) and am currently writing more pending the approval of the basic code here. I would not say there is nothing new here.

@dongjoon-hyun
Copy link
Member

@emaynardigs . In this case, usually, we close the second PR (yours) because the original is still alive. You can retry this after the first one is closed.

Firstly, I should say that there are actually no commits here under the original author's ownership; the code has diverged to the point now where,

@dongjoon-hyun
Copy link
Member

I'll leave this PR to @dbtsai .

@emaynardigs
Copy link
Author

emaynardigs commented Jan 12, 2020

Yes, surely we should close one PR. The other one is inactive, fails tests, and doesn’t merge cleanly. This one has none of those issues and has more functionality.

I don’t mind closing this one out if the other PR can get us to the same place just as quickly, but that seems like it would take more work at this point? Either way, let’s make sure there is an active PR for this issue which can be merged in. As I have no control over any other PR this is my submission towards that end.

@emaynardigs
Copy link
Author

@dongjoon-hyun I see this has changes requested, do I need to make any changes here?

Or is this just pending review?

@dbtsai
Copy link
Member

dbtsai commented Jan 15, 2020

Hello @emaynardigs ,

Thank you for your contribution, and I do value your work a lot. In fact, at Apple, we are still using an updated version of #22535 which is critical to our production job. As far as I know, Databirkcs's runtime also has an implementation with similar approach to tackle this issue.

The reason why I am inactive on my previous PR is that I feel adding nested support to the current filter api is a short term solution since the design doesn't consider this complex use-cases. For a better long term solution, I would like to create a new set of FilterV2 apis in DSv2 framework that makes nested columns as first class support. + @cloud-fan @rdblue @viirya for feedback on this.

I already started to work on FilterV2 api, and here is WIP code https://github.com/dbtsai/spark/pull/10/files . The change is bigger than I thought, and now, I am debating do we actually need a new FilterV2 framework?

Feedback and idea are welcome.

Thanks.

@emaynardigs
Copy link
Author

Hello @emaynardigs ,

Thank you for your contribution, and I do value your work a lot. In fact, at Apple, we are still using an updated version of #22535 which is critical to our production job. As far as I know, Databirkcs's runtime also has an implementation with similar approach to tackle this issue.

The reason why I am inactive on my previous PR is that I feel adding nested support to the current filter api is a short term solution since the design doesn't consider this complex use-cases. For a better long term solution, I would like to create a new set of FilterV2 apis in DSv2 framework that makes nested columns as first class support. + @cloud-fan @rdblue @viirya for feedback on this.

I already started to work on FilterV2 api, and here is WIP code https://github.com/dbtsai/spark/pull/10/files . The change is bigger than I thought, and now, I am debating do we actually need a new FilterV2 framework?

Feedback and idea are welcome.

Thanks.

Hey @dbtsai no worries, actually I suspected the silence was because you had moved this into a fork and were running with it :)

Actually I think the core approach you took here is sufficient for most cases, right? The crux of my change was only porting it to the new APIs and looking at the schema itself to unpack nested columns instead of looking at the column name (needed this for ORC anyway). Then it was pretty easy to add ORC support as we use a fork of ORC internally while you guys use Parquet.

What complex cases do you think break under this PR?

@emaynardigs
Copy link
Author

@dbtsai checking in again -- is there an edge case that you think doesn't work here? It would be nice to have updated filters, but seeing as you yourself are running code very much like this in a fork, wouldn't the right thing to do be to merge it upstream?

@emaynardigs
Copy link
Author

@dongjoon-hyun @dbtsai pinging again for review; it doesn't seem there is any progress on another PR and as @dbtsai pointed out these performance improvements can be very helpful for production workloads in their current state.

@dbtsai
Copy link
Member

dbtsai commented Feb 14, 2020

@emaynardigs I have been distracted by other work, and finally I found some time to continue this work. The other approach mentioned above will take longer time, so I'm thinking to submit a PR based on our internal version (a modified version of #22535) which is proven to be stable and in production for awhile.

I need some time to do some cleanup, and I'll submit a PR so we can collaborate. I'll add you as an author for the collaboration. WDYT?

BTW, are you using this internally at your company? How does it perform?

Thanks.

@emaynardigs
Copy link
Author

emaynardigs commented Feb 14, 2020

@dbtsai

so I'm thinking to submit a PR based on our internal version (a modified version of #22535) which is proven to be stable and in production for awhile.

My main reservation is that #22535 relies on a dot in the name of the field, and so cannot support ORC. The key difference in this PR is that it actually inspects the type of the field and extends the same functionality to Parquet and ORC. It's also rebased for the current master already and so merges cleanly. I also added more tests. If we merge #22535 we'll need another PR to un-do this logic and implement the same for ORC again.

No, I intended to cherry pick this back after it merges, but if it doesn't get merge we'll probably end up using it on our fork much like you've done.

@dbtsai
Copy link
Member

dbtsai commented Feb 28, 2020

A new PR is submitted #27728 can you take a look? We can add ORC implementation on top of that once the PR is merged.

@dbtsai
Copy link
Member

dbtsai commented Mar 28, 2020

@emaynardigs #27728 is merged. Are you interested in rebase this PR based on that? Should not be hard to support ORC now as we have a proper framework to support nested predicate pushdown.

@emaynardigs
Copy link
Author

@dbtsai That PR still relies on a dot in the column name, as I called out above. Not sure why you don't just parse the schema, like I was already doing in this PR?

I may rebase, but as we'renon a 2.x fork any dependency on the v2 filters shipping in 3.x isn't really useful.

@dbtsai
Copy link
Member

dbtsai commented Mar 31, 2020

In this PR, you also use dots to create the source filter api. This doesn't handle column name containing dots by quoting it properly. As we have proper parser to parse mutipart identifier that is proven and used everywhere, it's much more easy to use dots in source filter apis.

The implementation of each data source can be different. I choose to use key as a string containing dots in parquet for simplicity. But you can always do the schema stuff.

  private def translateLeafNodeFilter(predicate: Expression): Option[Filter] = {
    // Recursively try to find an attribute name from the top level that can be pushed down.
    def attrName(e: Expression): Option[String] = e match {
      case a: Attribute if a.dataType != StructType =>
        Some(a.name)
      case s: GetStructField if s.childSchema(s.ordinal).dataType != StructType =>
        attrName(s.child).map(_ + s".${s.childSchema(s.ordinal).name}")
      case _ =>
        None
    }

@emaynardigs emaynardigs closed this Apr 1, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants