Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29908][SQL] Support partitioning and bucketing through DataFrameWriter.save for V2 Tables #25822

Closed
wants to merge 3 commits into from

Conversation

brkyvz
Copy link
Contributor

@brkyvz brkyvz commented Sep 17, 2019

What changes were proposed in this pull request?

We add a new interface SupportsCreateTable to support the passing of partitioning transforms and table properties for tables that can be created without the existence of a catalog. Traditionally, data sources were passed all necessary information to define a table through the options in DataFrameWriter in conjunction with save.

Through this new interface, we can continue to perform the necessary checks for SaveMode.ErrorIfExists and SaveMode.Ignore through save for V2 tables. For example, a file based data source such as parquet can check if the target directory is empty or not as part of the SupportsCreateTable.canCreateTable to support these save modes. In addition, if metadata is available for a table (e.g. the schema of a jdbc data source would be available), the data source can check if the correct schema and partitioning transforms have been provided as part of the SupportsCreateTable.buildTable if a table already exists for the given options. The buildTable method also takes in table properties. While this isn't available for save, they can be provided through the DataFrameWriterV2 API.

Thoughts about DataFrameWriterV2:
I'm also thinking that there could be a separate API that can potentially go from and to options <-> Identifier. This can make sure that these data sources can also leverage the DFWriterV2 API without requiring a catalog.

Why are the changes needed?

Currently partitioning and bucketing information cannot be passed through for DataSources that migrate to DataSource V2 through the DataFrameWriter.save method which is one of the most commonly used methods used in Apache Spark.

Does this PR introduce any user-facing change?

This adds a new interface SupportsCreateTable which DataSource developers can implement as part of their TableProvider interface to support the creation of tables when a catalog is not available.

How was this patch tested?

Tests in DataSourceV2DataFrameSuite

@brkyvz
Copy link
Contributor Author

brkyvz commented Sep 17, 2019

cc @jose-torres @cloud-fan

@dongjoon-hyun
Copy link
Member

Hi, @brkyvz . Could you file a JIRA issue and use it in the PR title, please?

@brkyvz
Copy link
Contributor Author

brkyvz commented Sep 17, 2019 via email

@dongjoon-hyun
Copy link
Member

Oh, I see. Thanks, @brkyvz !

@brkyvz brkyvz changed the title [SQL] Support partitioning and bucketing through DataFrameWriter.save for V2 Tables [SPARK-29127][SQL] Support partitioning and bucketing through DataFrameWriter.save for V2 Tables Sep 18, 2019
@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110832 has finished for PR 25822 at commit 9735728.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110833 has finished for PR 25822 at commit 5f1aaba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

This PR is related to #25651 but targets a different use case: DataFrameReader.load with save mode.

Both PRs need to update/extend TableProvider, so it's better to think about our requirements to TableProvider together.

In #25651, what we need is:

  1. report schema/partitioning (usually by inference). e.g. CREATE TABLE t USING format. Spark needs to ask the TableProvider to report schema/partitioning of a table first and then store it in the metastore.
  2. read/write data with given schema/partitioning. It's too expensive to do schema/partitioning inference every time. Spark gets the schema/partitioning from metastore and passes it to TableProvider to read/write data.

This use case is very similar to Hive's EXTERNAL TABLE. The table metadata is stored in Spark's metastore and the table data is stored outside of Spark (i.e. external data). So in this case, TableProvider only needs to provide the external data as tables, and we don't need to ask TableProvider to create/drop/... tables.

However, people may ask about Hive MANAGED TABLE. What's the corresponding concept in Spark? In Hive, what gets managed is the file directories. So it only applies to file sources(we can also call it path-based data source). Note that it doesn't mean we can only use file source with MANAGED TABLE, Hive can still create an EXTERNAL TABLE pointing to a file directory.

To support the use case like Hive MANAGED TABLE, we need a variant of TableProvider to indicate that it's a file source. For CREATE TABLE t(...) USING file_source, Spark creates the directory for this table. When read/write this table, Spark passes the directory path to the underlying file source. When the table is dropped, Spark removes the directory.

Back to DataFrameReader, some requirements are the same:

  1. DataFrameReader.load() needs TableProvider to report schema/partitioning.
  2. DataFrameReader.schema(...).load() needs TableProvider to get a table with given schema.

However, when it comes to SaveMode, it becomes complicated. It needs to check table existence and create table. IIUC in a previous discussion @rdblue was against this idea.

I think we can still support SaveMode for file source(path-based data source):

  1. ErrorIfExists: fail if the path exists
  2. Append: create the path if not exists
  3. Overwrite: create the path if not exists, truncate it if exists
  4. Ignore: skip if path exists.

That said, with the change in #25651 , we need to add one mixin trait of TableProvider to indicate it's a file source.

cc @rdblue @gengliangwang

@brkyvz
Copy link
Contributor Author

brkyvz commented Sep 18, 2019

@cloud-fan
I guess my main confusion comes from the lack of usefulness of TableProvider thus far. When a catalog is defined, it's only usefulness comes from defining whether a data source has a V2 definition. It seems as a combination of RelationProvider and SchemaRelationProvider from V1 land.

Let's look at the interfaces we have thus far:

  1. Table: Pretty great interface that describes all the properties and capabilities of a table
  2. TableCatalog: An interface that checks existence of Tables, and also the creation/alteration of these tables
  3. TableProvider: An interface that creates a Table through data source options without a catalog, but still doesn't have the complete set of APIs to fully define a Table.

TableProvider is currently missing the passing of partitioning info. This can be passed as part of DataFrameWriter, but unfortunately not as part of DataFrameReader. This means that for file based sources, where there is no catalog to store the partitioning info, Spark cannot initialize a complete and correct Table definition through user input.

I had a more radical idea, and I've started working on it here: #25833

Why don't we make TableProvider also extend TableCatalog? On top of that, it will also need a layer to go between DataSource options and an Identifier. This way, a lot of the V2 code paths can be re-used (you even get partition column normalization!), you get path based table support out of the box in DataFrameWriterV2, and you don't need to fill in incorrect information in DataFrameReader.load for file based data sources.

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 18, 2019

It's worthwhile to discuss the usefulness of TableProvider. So far I see 2 use cases:

  1. DataFrameReader.load() with only append/overwrite save mode. This was from a previous decision. If we revisit it and want to support all save modes, TableProvider can't be used here.
  2. CREATE TABLE USING with session catalog (similar to Hive EXTERNAL/MANAGED TABLE): The core idea is to keep metadata in Spark and keep data externally. TableProvider is a good fit as we don't need to create/alter/drop tables in the external systems, but register external data as tables in Spark. This is the major use case of DS V1 and many users are familiar with it, I think it's better to support it with DS v2.

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 18, 2019

BTW, I also thought about the save mode problem before. If we want to support all save modes, TableCatalog can be a good fit. But we need a variant of TableCatalog which can

  1. translate options to identifier
  2. be created by reflection
  3. accept user-specified schema when loading table

@dongjoon-hyun
Copy link
Member

cc @dbtsai

@brkyvz brkyvz closed this Nov 11, 2019
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-29127][SQL] Support partitioning and bucketing through DataFrameWriter.save for V2 Tables [SPARK-29908][SQL] Support partitioning and bucketing through DataFrameWriter.save for V2 Tables Nov 16, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants