Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23203][SQL]: DataSourceV2: Use immutable logical plans. #20387

Closed

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Jan 24, 2018

What changes were proposed in this pull request?

SPARK-23203: DataSourceV2 should use immutable catalyst trees instead of wrapping a mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and consolidates much of the DataSourceV2 API requirements for the read path in it. Instead of wrapping a reader that changes, the relation lazily produces a reader from its configuration.

This commit also updates the predicate and projection push-down. Instead of the implementation from SPARK-22197, this reuses the rule matching from the Hive and DataSource read paths (using PhysicalOperation) and copies most of the implementation of SparkPlanner.pruneFilterProject, with updates for DataSourceV2. By reusing the implementation from other read paths, this should have fewer regressions from other read paths and is less code to maintain.

The new push-down rules also supports the following edge cases:

  • The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection
  • The requested projection passed to the DataSourceV2Reader should include filter columns
  • The push-down rule may be run more than once if filters are not pushed through projections

How was this patch tested?

Existing push-down and read tests.

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86598 has finished for PR 20387 at commit d3233e1.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class StreamingDataSourceV2Relation(

@rdblue rdblue changed the title SPARK-22386: DataSourceV2: Use immutable logical plans. [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immutable logical plans. Jan 24, 2018
@rdblue rdblue force-pushed the SPARK-22386-push-down-immutable-trees branch from d3233e1 to 9c4dcb5 Compare January 24, 2018 19:19
@rdblue
Copy link
Contributor Author

rdblue commented Jan 24, 2018

@cloud-fan, please have a look at these changes. This will require follow-up for the Streaming side. I have yet to review the streaming interfaces for DataSourceV2, so I haven't made any changes there.

In our Spark build, I've also moved the write path to use DataSourceV2Relation, which I intend to do in a follow-up to this issue.

@rxin FYI.

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86600 has finished for PR 20387 at commit 9c4dcb5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class StreamingDataSourceV2Relation(

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86601 has finished for PR 20387 at commit ac58844.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86602 has finished for PR 20387 at commit 83203a6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

source: DataSourceV2,
options: Map[String, String],
path: Option[String] = None,
table: Option[TableIdentifier] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need these 2 parameters? Can't we get them from options when needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could keep these in options, but because they are the main two ways to identify tables, they should be easier to work with. I'd even suggest adding them to the DataSourceV2 read and write APIs.

Another benefit of adding these is that it is easier to use DataSourceV2Relation elsewhere. In our Spark build, I've added a rule to convert Hive relations to DataSourceV2Relation based on a table property. That's cleaner because we can pass the TableIdentifier instead of adding options to the map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess another way to say this is that it's better to set reliable path, database, and table parameters after passing the explicitly, than to require all the places where DataSourceV2Relations are created do the same thing. Better to standardize passing these options in v2Options, and it would be even better to pass these directly to the readers and writers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But not all data sources have path and table name, if you feel strongly about it, we can add 2 methods that exact path and table from options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why these are options. Passing either path or table name is the most common case, which we should have good support for. If tables are identified in other ways, that's supported.

path: Option[String] = None,
table: Option[TableIdentifier] = None,
projection: Option[Seq[AttributeReference]] = None,
filters: Option[Seq[Expression]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so every time we add a new push down interface, we need to add parameters here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you mean. When something is pushed, it creates a new immutable relation, so I think it has to be added to the relation. But I'm not sure that many things will be pushed besides the projection and filters. What are you thinking that we would need to add? Fragments of logical plan?

Assuming we add the ability to push parts of the logical plan, then this would need to have a reference to the part that was pushed down. I'm not sure that would be this relation class, a subclass, or something else, but I would be fine adding a third push-down option here. The number of things to push down isn't very large, is it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this pattern. I think it is important that the arguments to a query plan node are comprehensive so that it is easy to understand what is going on in the output of explain().

@cloud-fan
Copy link
Contributor

overall I think it's a good idea to make the plan immutable.

@cloud-fan
Copy link
Contributor

I dig into the commit history and recalled why I made these decisions:

  • having an mutable DataSourceV2Relation. This is mostly to avoid to keep adding more constructor parameters to DataSourceV2Relation, make the code easy to maintain. I'm ok to make it immutable if there is an significant benefit.
  • not using PhysicalOperation. This is because we will add more push down optimizations(e.g. limit, aggregate, join), and we have a specify push down order for them. It's hard to improve PhysicalOperation to support more operators and specific push down orders, so I created the new one. Eventually all data sources will be implemented as data source v2, so PhysicalOperation will go away.

The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection

Good catch! Since DataSourceV2Reader is mutable, the output can't be fixed, as it may change when we apply data source optimizations. Using lazy val output ... can fix this.

The requested projection passed to the DataSourceV2Reader should include filter columns

I did this intentionally. If a column is only refered by pushed filters, Spark doesn't need this column. Even if we require this column from the data source, we just read it out and wait it to be pruned by the next operator.

The push-down rule may be run more than once if filters are not pushed through projections

This looks weird, do you have a query to reproduce this issue?

This updates DataFrameReader to parse locations that do not look like paths as table names and pass the result as "database" and "table" keys in v2 options.

Personally I'd suggest to use spark.read.format("iceberg").option("table", "db.table").load(), as load is defined as def load(paths: String*), but I think your usage looks better. The communition protocol between Spark and data source is options, I'd suggest that we just propogate the paths parameter to options, and data source implementations are free to interprete the path option to whatever they want, e.g. table and database names.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 29, 2018

I'm ok to make it immutable if there is an significant benefit.

Mutable nodes violate a basic assumption of catalyst, that trees are immutable. Here's a good quote from the SIGMOD paper (by @rxin, @yhuai, and @marmbrus et al.):

In our experience, functional transformations on immutable trees make the whole optimizer very easy to reason about and debug. They also enable parallelization in the optimizer, although we do not yet exploit this.

Mixing mutable nodes into supposedly immutable trees is a bad idea. Other nodes in the tree assume that children do not change.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 29, 2018

I'd suggest that we just propogate the paths parameter to options, and data source implementations are free to interprete the path option to whatever they want, e.g. table and database names.

What about code paths that expect table names? In our branch, we've added support for converting Hive relations (which have a TableIdentifier, not a path) and using insertInto. Table names are paths are the two main ways to identify tables and I think both should be supported.

This is a new API, so it doesn't matter that load and save currently use paths. We can easily update that support for tables. If we don't, then there will be no common way to refer to tables: some implementations will use table, some will pass db separately, and some might use database. Standardizing this and adding support in Spark will produce more consistent behavior across data sources.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 29, 2018

[The push-down rule may be run more than once if filters are not pushed through projections] looks weird, do you have a query to reproduce this issue?

One of the DataSourceV2 tests hit this. I thought it was a good thing to push a single node down at a time and not depend on order.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 29, 2018

It's hard to improve PhysicalOperation to support more operators and specific push down orders, so I created the new one

I'm concerned about the new one. The projection support seems really brittle because it calls out specific logical nodes and scans the entire plan. If we are doing push-down wrong on the current v1 and Hive code paths, then I'd like to see a proposal for fixing that without these drawbacks.

I like that this PR pushes projections and filters just like the other paths. We should start there and add additional push-down as necessary.

@cloud-fan
Copy link
Contributor

This is a new API...

Are you saying you wanna add a new method in DataFreameReader that is different than load? In Scala, parameter name is part of the method signature, so for def load(path: String), we can't change its semantic, the parameter is a path. It's fine if a data source impelementation teach its users that path will interpreted as database/tables by it, but this should not be a contract in Spark.

I do agree that Spark should set a standard for specifying database and table, as it's very common. We can even argue that path is not a general concept for data sources, but we still provide special APIs for path.

My proposal: How about we add a new methods table in DataFrameReader? The usage would look like: spark.read.format("iceberg").table("db.table").load(), what do you think? We should not specify database, as if we may have catalog federation and table name may have 3 parts catalog.db.table. Let's keep it general and let the data source to interprete it.

@cloud-fan
Copy link
Contributor

I thought it was a good thing to push a single node down at a time and not depend on order.

The order must be taken care. For example, we can't push down a limit through Filter, unless the entire filter is pushed into the data source. Generally, if we pushed down multiple operators into a data source, we should clearly define what the order is to apply these operators in the data source.

pushedFilters: Seq[Expression]) = {
val newReader = userSchema match {
case Some(s) =>
asReadSupportWithSchema.createReader(s, v2Options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea. Although DataSourceReader is mutable, we can create a new one every time when we wanna apply the operator pushdowns.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 31, 2018

@felixcheung, yes, we do already have a table option. That creates an UnresolvedRelation with the parsed table name as a TableIdentifier, which is not currently compatible with DataSourceV2 because there is no standard way to pass the identifier's db and table name.

Part of the intent here is to add support in DataSourceV2Relation for cases where we have a TableIdentifier, so that we can add a resolver rule that replaces UnresolvedRelation with DataSourceV2Relation. This is what we do in our Spark branch.

@cloud-fan, what is your objection to support like this?

@rdblue
Copy link
Contributor Author

rdblue commented Jan 31, 2018

spark.read.format("iceberg").table("db.table").load()

I'm fine with this if you think it is confusing to parse the path as a table name in load. I think it is reasonable.

I'd still like to keep the Option[TableIdentifier] parameter on the relation, so that we can support table or insertInto on the write path.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 31, 2018

@cloud-fan, to your point about push-down order, I'm not saying that order doesn't matter at all, I'm saying that the push-down can run more than once and it should push the closest operators. That way, if you have a situation where operators can't be reordered but they can all be pushed, they all get pushed through multiple runs of the rule, each one further refining the relation.

If we do it this way, then we don't need to traverse the logical plan to find out what to push down. We continue pushing projections until the plan stops changing. This is how the rest of the optimizer works, so I think it is a better approach from a design standpoint.

My implementation also reuses more existing code that we have higher confidence in, which is a good thing. We can add things like limit pushdown later, by adding it properly to the existing code. I don't see a compelling reason to toss out the existing implementation, especially without the same level of testing.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 31, 2018

Let's keep it general and let the data source to interprete it.

I think this is the wrong approach. The reason why we are using a special DataSourceOptions object is to ensure that data sources consistently ignore case when reading their own options. Consistency across data sources matters and we should be pushing for more consistency, not less.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 31, 2018

@dongjoon-hyun, @gatorsmile, could you guys weigh in on some this discussion? I'd like to get additional perspectives on the changes I'm proposing.

@cloud-fan
Copy link
Contributor

Currently DataSourceOptions is the major way for Spark and users to pass information to the data source. It's very flexible and only defines one rule: the option key lookup should be case-insensitive.

I agree with your point that more consistency is better. It's annoying if every data source needs to define their own option keys for table and database, and tell users about it. It's good if Spark can define some rules about what option keys should be used for some common information.

My proposal:

class DataSourceOptions {
  ...
  
  def getPath(): String = get("path")

  def getTimeZone(): String = get("timeZone")

  def getTableName(): String = get("table")
}

We can keep adding these options since this won't break binary compatibility.

And then we just need to document it and tell both users and data source developers about how to specify and retrieve these common options.

Then I think we don't need to add table and database parameters to DataSourceV2Relation, because we can easily do relation.options.getTable.

BTW this doesn't change the API so I think it's fine to do it after 2.3.

@cloud-fan
Copy link
Contributor

We can add things like limit pushdown later, by adding it properly to the existing code.

I tried and can't figure out how to do it with PhysicalOperation, that's why I build something new for data source v2 pushdown. I'm OK to reuse it if you can convince me PhysicalOperation is extendable, e.g. support limit push down.

@cloud-fan
Copy link
Contributor

Hi @rdblue , I think we all agree that the plan should be immutable, but other parts are still under discussion. Can you send a new PR that focus on making the plan immutable? so that we can merge that one first, and continue to discuss other parts in this PR.

@dongjoon-hyun
Copy link
Member

+1 for @cloud-fan 's suggestion.

@rdblue rdblue force-pushed the SPARK-22386-push-down-immutable-trees branch from 3b55609 to 1a603db Compare February 17, 2018 21:18
@rdblue
Copy link
Contributor Author

rdblue commented Feb 17, 2018

Thanks for the update! Enjoy your vacation, and thanks for letting me know.

@SparkQA
Copy link

SparkQA commented Feb 18, 2018

Test build #87532 has finished for PR 20387 at commit 1a603db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val canonicalOutput: Seq[AttributeReference] = this.output
.map(a => QueryPlan.normalizeExprId(a, projection))

new DataSourceV2Relation(c.source, c.options, c.projection) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hacky but I don't have a better idea now, let's revisit it later.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in aadf953 Feb 20, 2018
@rdblue
Copy link
Contributor Author

rdblue commented Feb 20, 2018

Thanks for all your help getting this committed, @cloud-fan!

asfgit pushed a commit that referenced this pull request Feb 21, 2018
…, but not supported.

## What changes were proposed in this pull request?

DataSourceV2 initially allowed user-supplied schemas when a source doesn't implement `ReadSupportWithSchema`, as long as the schema was identical to the source's schema. This is confusing behavior because changes to an underlying table can cause a previously working job to fail with an exception that user-supplied schemas are not allowed.

This reverts commit adcb25a0624, which was added to #20387 so that it could be removed in a separate JIRA issue and PR.

## How was this patch tested?

Existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #20603 from rdblue/SPARK-23418-revert-adcb25a0624.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Mar 7, 2019
SPARK-23203: DataSourceV2 should use immutable catalyst trees instead of wrapping a mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and consolidates much of the DataSourceV2 API requirements for the read path in it. Instead of wrapping a reader that changes, the relation lazily produces a reader from its configuration.

This commit also updates the predicate and projection push-down. Instead of the implementation from SPARK-22197, this reuses the rule matching from the Hive and DataSource read paths (using `PhysicalOperation`) and copies most of the implementation of `SparkPlanner.pruneFilterProject`, with updates for DataSourceV2. By reusing the implementation from other read paths, this should have fewer regressions from other read paths and is less code to maintain.

The new push-down rules also supports the following edge cases:

* The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection
* The requested projection passed to the DataSourceV2Reader should include filter columns
* The push-down rule may be run more than once if filters are not pushed through projections

Existing push-down and read tests.

Author: Ryan Blue <blue@apache.org>

Closes apache#20387 from rdblue/SPARK-22386-push-down-immutable-trees.

(cherry picked from commit aadf953)

Conflicts:
	external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Oct 15, 2019
SPARK-23203: DataSourceV2 should use immutable catalyst trees instead of wrapping a mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and consolidates much of the DataSourceV2 API requirements for the read path in it. Instead of wrapping a reader that changes, the relation lazily produces a reader from its configuration.

This commit also updates the predicate and projection push-down. Instead of the implementation from SPARK-22197, this reuses the rule matching from the Hive and DataSource read paths (using `PhysicalOperation`) and copies most of the implementation of `SparkPlanner.pruneFilterProject`, with updates for DataSourceV2. By reusing the implementation from other read paths, this should have fewer regressions from other read paths and is less code to maintain.

The new push-down rules also supports the following edge cases:

* The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection
* The requested projection passed to the DataSourceV2Reader should include filter columns
* The push-down rule may be run more than once if filters are not pushed through projections

Existing push-down and read tests.

Author: Ryan Blue <blue@apache.org>

Closes apache#20387 from rdblue/SPARK-22386-push-down-immutable-trees.

(cherry picked from commit aadf953)

Conflicts:
	external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
SPARK-23203: DataSourceV2 should use immutable catalyst trees instead of wrapping a mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and consolidates much of the DataSourceV2 API requirements for the read path in it. Instead of wrapping a reader that changes, the relation lazily produces a reader from its configuration.

This commit also updates the predicate and projection push-down. Instead of the implementation from SPARK-22197, this reuses the rule matching from the Hive and DataSource read paths (using `PhysicalOperation`) and copies most of the implementation of `SparkPlanner.pruneFilterProject`, with updates for DataSourceV2. By reusing the implementation from other read paths, this should have fewer regressions from other read paths and is less code to maintain.

The new push-down rules also supports the following edge cases:

* The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection
* The requested projection passed to the DataSourceV2Reader should include filter columns
* The push-down rule may be run more than once if filters are not pushed through projections

Existing push-down and read tests.

Author: Ryan Blue <blue@apache.org>

Closes apache#20387 from rdblue/SPARK-22386-push-down-immutable-trees.

Ref: LIHADOOP-48531
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…, but not supported.

DataSourceV2 initially allowed user-supplied schemas when a source doesn't implement `ReadSupportWithSchema`, as long as the schema was identical to the source's schema. This is confusing behavior because changes to an underlying table can cause a previously working job to fail with an exception that user-supplied schemas are not allowed.

This reverts commit adcb25a0624, which was added to apache#20387 so that it could be removed in a separate JIRA issue and PR.

Existing tests.

Author: Ryan Blue <blue@apache.org>

Closes apache#20603 from rdblue/SPARK-23418-revert-adcb25a0624.

Ref: LIHADOOP-48531
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants