Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26915][SQL] DataFrameWriter.save() should write without schema validation #23836

Closed
wants to merge 1 commit into from

Conversation

gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented Feb 19, 2019

What changes were proposed in this pull request?

Spark supports writing to file data sources without getting and validation with the table schema.
For example,

spark.range(10).write.orc(path)
val newDF = spark.range(20).map(id => (id.toDouble, id.toString)).toDF("double", "string")
newDF.write.mode("overwrite").orc(path)
  1. There is no need to get/infer the schema from the table/path
  2. The schema of newDF can be different with the original table schema.

However, the behavior is changed since #23606 . Currently, data source V2 always validates the output query with the table schema. Even after the catalog support of DS V2 is implemented, I think it is hard to support both behaviors with the current API/framework.

To me, DataFrameWriter.save is more like a simple IO API(e.g. file IO). It doesn't have to be involved with the table concept, otherwise:

  1. Overwrite: can be Drop table + CTAS, can be Insert overwrite, can be CTAS...
  2. Append: can be Insert, can be Alter table add column + Insert, can be CTAS..

Things can be too complex if we decide to allow both with/without schema validation in one API DataFrameWriter.save. That is to say, let's remove the expression AppendData and OverwriteByExpression in DataFrameWriter.save, since their behaviors are different from the API's. The expressions are still useful. We can use AppendData and OverwriteByExpression in DataFrameWriter.saveAsTable, which is more appropriate.

This PR proposes to remove the new expressions in DataFrameWriter.save, and reenable ORC V2. I am aware that the interface SupportsSaveMode might be removed in the future. But in the current stage, we should prevent the regression, and make sure the behavior is unchanged and predictable in future development.

How was this patch tested?

Unit test

@gengliangwang
Copy link
Member Author

@gengliangwang
Copy link
Member Author

Before this one, there were two attempts in #23829 and #23824 .
To me, The proposal of this PR is the best one so far.

@gengliangwang gengliangwang changed the title [SPARK-26915][SQL]File source should write without schema validation in DataFrameWriter.save() [SPARK-26915][SQL] DataFrameWriter.save() should write without schema validation Feb 19, 2019
@rdblue
Copy link
Contributor

rdblue commented Feb 19, 2019

@gengliangwang: The community agreed to remove the v2 write paths using SaveMode before the next release. The problem is that SaveMode is ambiguous and doesn't have reliable behavior. That's why we are introducing the new logical plans: to set expectations for behavior.

Part of the challenge while standardizing behavior across sources is to support what Spark already does in v2. In this case, we need to define how a table can opt out of schema validation, or at least have relaxed validation rules that allow things like adding new columns by writing a DF with a new column.

I don't think that the right way to do that is to use a write path that has no validation (the WriteToDataSourceV2 plan), especially when that write path is set to be removed.

As I've said before, the right way to do this is to:

  1. Define what the behavior should be in v2 for these tables
  2. Propose an API that allows sources to request that behavior

I think number 1 is the most important. What you have here removes for all v2 writes, but I think the behavior you are trying to mimic from v1 is applied when writing to path-based tables. That's a big unintended consequence, and why it is important to state what you're trying to accomplish and have a design for how you're going to do it.

Please consider this a -1.

@SparkQA
Copy link

SparkQA commented Feb 19, 2019

Test build #102493 has finished for PR 23836 at commit 2b6694d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor

I would argue that putting back SaveMode should be on the table, if that's the best way to ensure that queries which work today will work with a v2 file source implementation.

I agree that this PR isn't the right way to go about it. If the v2 ORC implementation doesn't work because of problems in the API, we need to go back to the drawing board and fix the API, not make ad-hoc changes to work around the problem.

@rdblue
Copy link
Contributor

rdblue commented Feb 19, 2019

@jose-torres, SaveMode is used by v1. That is the best way to ensure that queries don't break.

SaveMode should be translated to concrete plans for v2. Otherwise, v2 is just as unpredictable as v1 and we don't gain anything.

@jose-torres
Copy link
Contributor

We eventually have to ensure that queries don't break even with v2, unless your proposal is to have format("orc") and such invoke v1 forever.

@rdblue
Copy link
Contributor

rdblue commented Feb 19, 2019

@jose-torres, I'm not saying that the default should be v1 forever.

The right way to move over is to develop them in parallel and switch over when we can validate that the behavior is the same. Right now, v2 can't run CTAS plans so we clearly can't switch. But when v2 has all of the necessary logical plans, then we can start running the existing behavior tests on v2 to see what changes remain, like changing validation for path-based tables.

Continuing to use SaveMode actually inhibits the move to v2. If write paths use SaveMode, then they can pass behavior tests and appear to work when they actually don't.

Also, let me clarify my comment on using v1. I think we need to keep v1 around until the process of moving to v2 is complete because there are code paths that we know can't be changed to v2 without altering behavior. For example, we've agreed to standardize behavior on what file sources do. Users will have to choose between existing behavior and using v2 for other sources.

I'm not confident that all v1 behaviors will be available in v2. In v1, a CTAS plan can be validated against an existing table. In some cases, that CTAS should fail because the table exists (SQL) and in some cases, the plan that is created should be AppendData instead of CTAS (DataFrameWriter). Does the validation for AppendData work exactly the same way as validating a CTAS that is actually and append? My guess is that it doesn't, and that we might not want it to.

I think the final solution is to introduce a new write API that always uses v2 and makes it obvious what plan will be used. I've proposed such an API in the logical plans SPIP. Moving users to that API and eventually deprecating the DataFrameWriter API will take care of migrating the last few cases (which should be minor) from v1 to v2.

@cloud-fan
Copy link
Contributor

cloud-fan commented Feb 20, 2019

I definitely agree with the direction: translate SaveMode to operators with clear semantic, and remove SaveMode from ds v2 but keep it in the public API for a while.

However I think the current translation is not precise: append mode doesn't mean append, it's actually "create table if not exist or append table". At least this is the case for file source and JDBC source. I believe it's true for most of the v1 sources.

The next problem is, how to implement "create table if not exist or append table" with ds v2 APIs. I have 2 proposals:

  1. keep the "catalog -> table -> write builder -> write", and implementation has 2 steps: a) create the table if not exists. b) do a normal append.
  2. slightly change the abstraction to "catalog -> staged table -> write builder -> write", so that we can write data to a non-existing table, and make the entire process atomic.

For proposal 1, file source doesn't work because it can't create an empty table(it doesn't have metastore). I guess other data source will face the same issue. And it requires the catalog API, which is not done yet.

I think proposal 2 is better. It's useful even after we have the catalog API, to implement atomic CTAS.

@gengliangwang
Copy link
Member Author

gengliangwang commented Feb 20, 2019

@rdblue Sorry if I cause some misunderstanding.
Let's focus on the API DataFrameWriter.save here:

  1. The expressions AppendData and OverwriteByExpression always get table schema and validate the output schema with table schema. This is a serious behavior change. Before we have a final agreement about file source V2 behavior(mostly likely we will keep the V1 behavior in this API), I would suggest to revert them for now.

  2. Keeping ORC V2 working helps us to find problems, while disabling it might lead to problematic design which we won't notice until we enable the file source V2 again. The ORC V2 is implemented as per the API we have been working for a long time: table -> write builder -> write. The only problem is how we deal with save modes. This PR is not proposing to keep SupportsSaveMode or to say this is the final solution. I am suggesting that we should keep original behavior in this API, unless there is a clear solution that eventually the following partial development will work.

The above solution of @cloud-fan is good directions to go. We can discuss the solution in tomorrow's meetup.
For the code changes in DataFrameWriter.save, I think we should remove the new expressions for now.

@rdblue
Copy link
Contributor

rdblue commented Feb 21, 2019

the current translation is not precise: append mode doesn't mean append, it's actually "create table if not exist or append table"

Agreed. This is why we need to get CTAS finished.

The next problem is, how to implement "create table if not exist or append table" with ds v2 APIs.

My understanding is that the plan is to do both. If a catalog supports staged tables, then Spark uses them to perform an atomic operation. If it doesn't, then Spark uses the create/append/drop-on-error strategy.

I agree that option 2 is "better" in that the operation is atomic. But sources are not required to support atomic CTAS. We need both options, so they are not mutually exclusive.

@gengliangwang
Copy link
Member Author

Close this one.
#24233 is a better solution and resolves the problem.

mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
…t path before delete it

## What changes were proposed in this pull request?
This is a followup PR to resolve comment: apache#23601 (review)

When Spark writes DataFrame with "overwrite" mode, it deletes the output path before actual writes. To safely handle the case that the output path doesn't exist,  it is suggested to follow the V1 code by checking the existence.

## How was this patch tested?

Apply apache#23836 and run unit tests

Closes apache#23889 from gengliangwang/checkFileBeforeOverwrite.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants