Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25894][SQL] Add a ColumnarFileFormat type which returns the column count for a given schema #22905

Conversation

mallman
Copy link
Contributor

@mallman mallman commented Oct 31, 2018

(link to Jira: https://issues.apache.org/jira/browse/SPARK-25894)

What changes were proposed in this pull request?

Knowing the number of physical columns Spark will read from a columnar file format (such as Parquet) is extremely helpful (if not critical) in debugging poor query performance on a parquet table with a deeply nested schema. This PR adds a new metadata field for FileSourceScanExec which identifies the maximum number of columns Spark will read from that file source. (N.B. the actual number of columns read may be lower if the physical file Spark is reading is missing some of them. For example, this can occur in a large partitioned table with a wide schema with sparse data—some partitions may not have data for some columns.) This metadata is printed as part of a physical plan. For example, take a contacts table with the following schema:

root
 |-- id: integer (nullable = true)
 |-- name: struct (nullable = true)
 |    |-- first: string (nullable = true)
 |    |-- middle: string (nullable = true)
 |    |-- last: string (nullable = true)
 |-- address: string (nullable = true)
 |-- employer: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- company: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- address: string (nullable = true)

With schema pruning, the following explain query

explain select name.first, employer.company.address from contacts

prints

== Physical Plan ==
*(1) Project [name#3726.first AS first#3742, employer#3728.company.address AS address#3743]
+- *(1) FileScan parquet [name#3726,employer#3728,p#3729] Batched: false, ColumnCount: 2,
        DataFilters: [], Format: Parquet, Location:
        InMemoryFileIndex[file:/blah/blah/spark/target/tmp/spark-686cac53-0...,
        PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema:
        struct<name:struct<first:string>,employer:struct<company:struct<address:string>>>

Note ColumnCount: 2. This tells us that schema pruning is working at planning time. Without schema pruning, that same query prints

== Physical Plan ==
*(1) Project [name#3726.first AS first#3742, employer#3728.company.address AS address#3743]
+- *(1) FileScan parquet [name#3726,employer#3728,p#3729] Batched: false, ColumnCount: 6,
        DataFilters: [], Format: Parquet, Location:
        InMemoryFileIndex[file:/blah/blah/spark/target/tmp/spark-947d2af3-8...,
        PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema:
        struct<name:struct<first:string,middle:string,last:string>,employer:struct<id:int,
        company:struct<...

Note ColumnCount: 6. This tells us either schema pruning is disabled or is not working as expected. If we've enabled schema pruning, this query plan gives us an obvious avenue for investigation of poor query performance.

How was this patch tested?

A new test was added to ParquetSchemaPruningSuite.scala.

which specifies a method for returning the physical column count
associated with a given StructType. We include this count as metadata in
FileSourceScanExec
@mallman
Copy link
Contributor Author

mallman commented Oct 31, 2018

@gatorsmile @viirya @cloud-fan @dbtsai your thoughts?

cc @dongjoon-hyun for ORC file format perspective.

case columnar: ColumnarFileFormat =>
val sqlConf = relation.sparkSession.sessionState.conf
val columnCount = columnar.columnCountForSchema(sqlConf, requiredSchema)
withSelectedBucketsCount + ("ColumnCount" -> columnCount.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we know column count from requiredSchema metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can "guess-timate" the physical column count by counting the leaf fields in the ReadSchema metadata value, but the true answer is an implementation issue of the file format. For example, in the implementation of ColumnarFileFormat for Parquet, we convert the the Catalyst schema to the Parquet schema before counting columns. I suppose a similar approach would be required for ORC and other columnar formats.

That being said, this new metadata value isn't really meant to provide new and essential information, per se. Its purpose is to provide easy-to-read, practical information that's useful for quickly validating that schema pruning is working as expected. For example, seeing that a query is reading all 423 columns from a table instead of 15 tells us pretty quickly that schema pruning is not working (unless we really are trying to read the entire table schema). I've found the ReadSchema value to be difficult to read in practice because of its terse syntax, and because its printout is truncated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we only include this info when the columnar reader is on?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we really should include in the metadata? If the purpose of this is to check if the column pruning works or not, logging should be good enough. Adding a trait for it sounds an overkill for the current status. Let's not add an abstraction just for rough guess that it can be generalised.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging should be good enough

What's your basis for this assertion?

Also, what kind of logging are you suggesting?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who wants that? If someone wants to put metadata somewhere in the physical plan, let them open a PR and make a case for it

No .. I don't think we should add it only because it's requested once. They look same instances to me. I will have no argument if this one is added and other people request to add others later. We should make it clear why this one should be specifically added. We're not going to add all the information to metadata as requested.

If the purpose of adding it is to check if the pushing down is actually working or not, the logging sounds appropriate for its purpose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean I really think it's more appropriate to check if something as expected or not by logging.

That's speaking from experience, not conjecture.

I am not underestimating your statement. Let's be very clear why it should be put in metadata over logging. How and why it can be useful over logging? in what cases?

For clarification, the scope of this information is narrower then just checking if the column pruning is working or not since we print out requested columns from Spark side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll reiterate a sample use case:

Consider also the case of the beeline user connecting to a multiuser thriftserver. They are pretty far from the log file, whereas running an 'explain' is right there in the terminal.

This also matters to users planning/debugging queries in a Jupyter notebook, as we have in VideoAmp. The LOE for these users to go to a driver log file is quite high by comparison to inspecting a query plan.

When you refer to logging, which log are you referring to? When would this information be logged? And at what log level?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That basically says logging is useless when to use beeline. I don't think this info is super important to (non-advanced) users.

I mean log4j which is Spark's logging module, and I meant information you're including in the metadata. Maybe info level? or debug level.

logInfo("The number of actual column being pruned is blah blah")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That basically says logging is useless when to use beeline. I don't think this info is super important to (non-advanced) users.

My experience says otherwise, and advanced users use beeline and Jupyter, too.

@cloud-fan
Copy link
Contributor

is there anything blocked by this? I agree this is a good feature, but it asks the data source to provide a new ability, which may become a problem when migrating file sources to data source v2.

@mallman
Copy link
Contributor Author

mallman commented Oct 31, 2018

is there anything blocked by this? I agree this is a good feature, but it asks the data source to provide a new ability, which may become a problem when migrating file sources to data source v2.

This isn't blocking anything. It's just a contribution that's shown itself to be very helpful for us identifying the source of performance problems in past experience. I think it would be helpful for others, too.

That being said, I don't know enough about what would be involved in migrating file sources to data source v2 to say how difficult that would be. This implementation (for Parquet) is essentially a one-liner. All the heavy lifting is done by SparkToParquetSchemaConverter.

@SparkQA
Copy link

SparkQA commented Oct 31, 2018

Test build #98316 has finished for PR 22905 at commit 4aa8d04.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @mallman .

@gatorsmile
Copy link
Member

@mallman Could you run the EXPLAIN with this new changes and post it in the PR description?

@SparkQA
Copy link

SparkQA commented Nov 2, 2018

Test build #98397 has finished for PR 22905 at commit b4f0584.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Nov 2, 2018

@mallman Could you run the EXPLAIN with this new changes and post it in the PR description?

Done.

@SparkQA
Copy link

SparkQA commented Nov 2, 2018

Test build #98399 has finished for PR 22905 at commit 7124ffa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Dec 5, 2018

I think I've made my case for this patch as best I can. It does not appear this PR has unanimous support, but I continue to believe we should merge it to master. So where do we take it from here?

@mallman
Copy link
Contributor Author

mallman commented Jan 2, 2019

@cloud-fan @viirya @HyukjinKwon Can we make a decision on this PR? As I mentioned before, I've pretty much made my case as best I can.

@gatorsmile
Copy link
Member

I think it is useful but the current impl looks not a little bit hacky.

How about adding an API for counting the number of leaf columns in class StructType? Also cc @rxin @cloud-fan @hvanhovell

@mallman
Copy link
Contributor Author

mallman commented Jan 4, 2019

How about adding an API for counting the number of leaf columns in class StructType? Also cc @rxin @cloud-fan @hvanhovell

That could work as a more generally-applicable alternative. I'll have to think more on whether or how that might differ from the number of format-specific columns selected for a given StructType.

Thanks for your perspective.

@github-actions
Copy link

github-actions bot commented Jan 5, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 5, 2020
@github-actions github-actions bot closed this Jan 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants