Skip to content

Conversation

@juliuszsompolski
Copy link
Contributor

What changes were proposed in this pull request?

Make DataFrameWriter saveAsTable add a writeOption isDataFrameWriterV1 = true when using Overwrite mode with a delta Data source.
This is an emergency fix to prevent a breaking change resulting in data corruption with Delta data sources in Spark 4.1.

Why are the changes needed?

Spark's SaveMode.Overwrite is documented as:

        * if data/table already exists, existing data is expected to be overwritten
        * by the contents of the DataFrame.

It does not define the behaviour of overwriting the table metadata (schema, etc). Delta datasource interpretation of this API documentation of DataFrameWriter V1 is to not replace table schema, unless Delta-specific option "overwriteSchema" is set to true.

However, DataFrameWriter V1 creates a ReplaceTableAsSelect plan, which is the same as the plan of DataFrameWriterV2 createOrReplace API, which is documented as:

       * The output table's schema, partition layout, properties, and other configuration
       * will be based on the contents of the data frame and the configuration set on this
       * writer. If the table exists, its configuration and data will be replaced.

Therefore, for calls via DataFrameWriter V2 createOrReplace, the metadata always needs to be replaced, and Delta datasource doesn't use the overwriteSchema option.
Since the created plan is exactly the same, Delta had used a very ugly hack to detect where the API call is coming from based on the stack trace of the call.

In Spark 4.1 in connect mode, this stopped working because planning and execution of the commands go decoupled, and the stack trace no longer contains this point where the plan got created.

To retain compatibility of the Delta datasource with Spark 4.1 in connect mode, Spark provides this explicit storage option to indicate to Delta datasource that this call is coming from DataFrameWriter V1.

Followup: Since the details of the documented semantics of Spark's DataFrameWriter V1 saveAsTable API differs from that of CREATE/REPLACE TABLE AS SELECT, Spark should not be reusing the exact same logical plan for these APIs.
Existing Datasources which have been implemented following Spark's documentation of these APIs should have a way to differentiate between these APIs.

However, at this point this is an emergency fix, as releasing Spark 4.1 as is would cause data corruption issues with Delta in DataFrameWriter saveAsTable in overwrite mode, as it would not be correctly interpreting it's overwriteSchema mode.

Does this PR introduce any user-facing change?

No

How was this patch tested?

It has been tested with tests that are not part of the PR. To properly test in connect mode, changes are needed on both Spark and Delta side and integrating it will be done as followup work.

Was this patch authored or co-authored using generative AI tooling?

Assisted by Claude Code.
Generated-by: claude code, model sonnet 4.5

@github-actions github-actions bot added the SQL label Nov 22, 2025
@juliuszsompolski
Copy link
Contributor Author

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for informing the issue, @juliuszsompolski . I have a few questions.

  1. Which Apache Spark preview version and Delta version did you test this? Specifically, which preview did this start to happen?
  2. Why don't we implement this in io.delta.sql.DeltaSparkSessionExtension or document it instead of changing Apache Spark source code?
  3. Do you think we can have a test coverage with a dummy data source?
  4. If this is an emergency fix, what would be the non-emergency fix?

    This is an emergency fix to prevent a breaking change resulting in data corruption with Delta data sources in Spark

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-54462] Add isDataFrameWriterV1 option for Delta datasource compatibility [SPARK-54462][SQL] Add isDataFrameWriterV1 option for Delta datasource compatibility Nov 22, 2025
serde = None,
external = false,
constraints = Seq.empty)
val writeOptions = if (source == "delta") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Apache Spark source code have this kind delta-specific logic before, @juliuszsompolski ?

This looks like the first proposal to have a 3rd-party company data source in Apache Spark source code. At the first glance, this string match looks a little fragile to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun I am right now working on researching cleaner solutions. I have raised this PR as a straw man, given the Spark 4.1 timeline and trying to propose the most narrowly scoped change possible that would prevent a behaviour change that would lead to table corruption in Delta (unintentionally overwriting table's metadata in operations that have required an explicit overwriteSchema option for that before). In any case, if this is allowed to get in as a stop gap fix, I would like to replace it with a proper solution.

It could also be done by adding this option always and not mention Delta here, as @HyukjinKwon suggested. It would still be a "strange" piece of code to attach this kind of option just for this particular case, and kind of "leaky". Some people suggested to me that maybe having Spark attach some options that always point to the API that the command originated from could be useful also in other cases, but that's also a much bigger change to design and make.
Another option could be to, since the semantics of saveAsTable in DFWV1 can be interpreted differently that createOrReplace / replace of DFWV2, maybe it could have a new plan node SaveAsV2TableCommand, just like it has it's own node for SaveAsV1TableCommand? But again, this is a lot of changes.
Or, the existing CreateTableAsSelect / ReplaceTableAsSelect should have a flag parameter indicating that it's actually a SaveAsTable command? But again, this is not really clean...

I am researching options.

// To retain compatibility of the Delta datasource with Spark 4.1 in connect mode, Spark
// provides this explicit storage option to indicate to Delta datasource that this call
// is coming from DataFrameWriter V1.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per:

          // FIXME: Since the details of the documented semantics of Spark's DataFrameWriter V1
          //  saveAsTable API differs from that of CREATE/REPLACE TABLE AS SELECT, Spark should
          //  not be reusing the exact same logical plan for these APIs.
          //  Existing Datasources which have been implemented following Spark's documentation of
          //  these APIs should have a way to differentiate between these APIs.

Why don't we just always append the option? The downstream datasources who care about this behaviour will make the change accordingly.

@juliuszsompolski
Copy link
Contributor Author

  1. Which Apache Spark preview version and Delta version did you test this? Specifically, which preview did this start to happen?

I have been testing it with Delta master building with Spark master.
The behaviour change was caused by 27aba95, which is present in 4.1. It separates the planning and execution of Command in Spark Connect. The very ugly hack in Delta depended on the execution of the command happening on the same call trace as calling the API. Now only logical planning happens within that call trace, the plan gets returned to Spark connect, and execution happens later on from a different call trace.

  1. Why don't we implement this in io.delta.sql.DeltaSparkSessionExtension or document it instead of changing Apache Spark source code?

It would make me very happy to find a way to fix it without changing Spark code. It seems to me however that currently I lose all the ability to distinguish DFWV1 saveAsTable with mode overwrite vs. DFWV2 replace, because both operations create an identical logical plan.
So maaybe, I could move the stack trace hack to DeltaAnalysis and then make DeltaAnalysis do the same kind of stack trace hack... but I would really want to get rid of hack depending on the stack trace, because that is bound to blow up eventually, like this one was a bomb that was ticking for 6 years...

  1. Do you think we can have a test coverage with a dummy data source?

I could have a test with a Dummy data source that verifies that this option is added for saveAsTable, but the e2e behavior change is Delta specific.

  1. If this is an emergency fix, what would be the non-emergency fix?

    This is an emergency fix to prevent a breaking change resulting in data corruption with Delta data sources in Spark

See my other comment #53173 (comment) for some of the options I am exploring.

@juliuszsompolski
Copy link
Contributor Author

It would make me very happy to find a way to fix it without changing Spark code. It seems to me however that currently I lose all the ability to distinguish DFWV1 saveAsTable with mode overwrite vs. DFWV2 replace, because both operations create an identical logical plan.

@dongjoon-hyun I investigated it more, raised a PR delta-io/delta#5569 with a sketch of trying to move it to a Delta extension rule.
But https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L3343

        writeOperation.getTable.getSaveMethod match {
          case proto.WriteOperation.SaveTable.TableSaveMethod.TABLE_SAVE_METHOD_SAVE_AS_TABLE =>
->          w.saveAsTableCommand(tableName)

is directly returning the ReplaceTableAsSelect plan created by DataFrameWriter, without running any resolution or analysis on it. At the point or returning it we already returned from DataFrameWriter call.
It further gets analyzed and executed in https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L92

      case proto.Plan.OpTypeCase.COMMAND =>
        val command = request.getPlan.getCommand
        planner.transformCommand(command) match {
          case Some(transformer) =>
            val qe = new QueryExecution(
              session,
              transformer(tracker),
              tracker,
              shuffleCleanupMode = shuffleCleanupMode)
->          qe.assertCommandExecuted()
            executeHolder.eventsManager.postFinished()

at which point the plan is identical for saveAsTable and replace, and it cannot be distinguished anymore by a similar stack trace call, as we have already returned from there.

@dongjoon-hyun
Copy link
Member

According to @juliuszsompolski 's analysis, do you think we need revise or revert the original patch (SPARK-52451) from branch-4.1, @heyihong and @cloud-fan ?

@juliuszsompolski
Copy link
Contributor Author

@dongjoon-hyun
With that, I don't see any option to distinguish DFWV1 saveAsTable and DFWV2 replace without a Spark side change. This PR is an abstraction-leaky, but scoped as narrowly as possible change to prevent a behaviour change in Delta with Spark 4.1 that would be very user unfriendly - cause tables metadata to be overwritten in workloads that didn't overwrite it before.

The change is essentially two lines:

extraOptions + ("isDataFrameWriterV1" -> "true")

Adding the option.

if (source == "delta")

If source is delta.
We could omit this, and add it always.
Then it wouldn't create a precedent on "have a 3rd-party company data source in Apache Spark source code" (note though that I am not referencing any company but open source project Delta, and there are many precedents in DataFrameWriter itself having multiple references to Hive, Hadoop, Parquet, Orc in both its code and public documentation). But then it would be less narrowly scoped, piggy backing this option to other datasources that don't need or don't expect it.

Or, to not mention Delta by name, maybe a new interface interface RequiresDataFrameWriterV1WriteOption extends TableProvider, which would then add this option to all V2 source write commands created by DataFrameWriter V1 (to be more generic than this one specific Overwrite problem - and this would also match the current Delta behavior, which detects DFWV1 on stack trace all the time).
Justification for such an interface would be that the documentation of the public APIs of DFW V1 allows for different interpretation, so some existing datasources may have adopted different behaviors, and now need to distinguish whether the plan is actually coming from DataFrameWriter V1 or other API?
What do you think about adding a new interface like that?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Nov 25, 2025

To be clear, @juliuszsompolski , Apache Spark 4.1 release process is not considering for additional development.

If there is no reasonable solution, the release process is simply supposed to find any new problems like the above and to revert it to unblock the release process, @juliuszsompolski .

It's a little sad because we missed this regression at Apache Spark 4.1.0-preview2 (which is the first release including the root causes, SPARK-52451 and SPARK-53097).

27aba95bdf7 [SPARK-52451][CONNECT][SQL] Make WriteOperation in SparkConnectPlanner side effect free
70c20087b70 [SPARK-53097][CONNECT][SQL] Make WriteOperationV2 in SparkConnectPlanner side effect free

@dongjoon-hyun
Copy link
Member

Also, cc @aokolnychyi too.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 25, 2025

I think the root cause is that Delta Lake claims to be a v2 source, which means DataFrameWriter#saveAsTable will create v2 commands for it. However, Delta Lake used to be a v1 source and still want to retain the old behavior for DataFrameWriter#saveAsTable which is slightly different from the v2 commands.

I understand the intention to avoid behavior changes, and Delta used to rely on a hack to identify DataFrameWriter#saveAsTable commands, but the hack does not work anymore after some reasonable changes in Spark 4.1.

Instead of reverting that reasonable change from Spark, I'm more in favor of adding a way to identify the v2 commands from v1 DataFrameWriter#saveAsTable. We can add a new bool flag in TableProvider, something like boolean indicateV1SaveAsTableWithOption. For data sources that migrated from v1 to v2, they now have a way to retain the old v1 behavior.

@dongjoon-hyun
Copy link
Member

Thank you for all your feedback.

AFAIK, we have 5 ways to move forward so far.

  1. @juliuszsompolski 's this PR and Delta PR: "if (source == "delta") { extraOptions + ("isDataFrameWriterV1" -> "true") }"
  2. @juliuszsompolski 's suggestion: "interface RequiresDataFrameWriterV1WriteOption extends TableProvider"
  3. @cloud-fan 's idea: "add a new bool flag in TableProvider, something like boolean indicateV1SaveAsTableWithOption"
  4. Release the AS-IS Spark and ask Delta community to implement V2 correctly like Apache Iceberg community.
  5. Revert Spark 4.1 improvements (SPARK-52451 and SPARK-53097)

For 1 ~ 4, Delta community needs to change their code in any way. Is it okay for them?

@dongjoon-hyun
Copy link
Member

Could you make alternative working PRs (which passes all CIs) for further decision, @cloud-fan or @juliuszsompolski ?

@cloud-fan
Copy link
Contributor

2 and 3 are basically the same but just different API flavor. I think a new bool flag is simpler. @juliuszsompolski what do you think?

@juliuszsompolski
Copy link
Contributor Author

I have slight preference for 2, which is a bit more verbose, but:

  1. From Spark perspective, it's cleaner to not pullute the TableProvider directly with something that is a specific workaround for a specific migration case. Having mixins that provide extra addons to interfaces is the usual way DSv2 interfaces seem to be structured.
  2. From Delta perspective, Delta will cross-compile against 4.0 and 4.1 for a while. If it's a separate mixin, it's easier to shim: I just need to add a shim for it for 4.0 cross-compile, and do nothing for 4.1 cross-compile. After we drop 4.0, no change is needed other than dropping the shim. If I just add a field in TableProvider, I need to add an intermediate shim subclass in TableProvider that adds this field in 4.0, and then actual DeltaDataSource extend that shim instead of TableProvider; for 4.1 I also need to then make an empty shim; then it needs cleanup after 4.0 is dropped.

For 4, Delta community is actively working in developing a proper V2 datasource. See already closed PRs in https://github.com/delta-io/delta/pulls?q=is%3Apr+dsv2+is%3Aclosed. But it won't be ready now, for 4.1...

For 5, I agree with @cloud-fan that we shouldn't be reverting reasonable changes to Spark. Delta, and the fact that we let that horrible hack be there for 6 years is at fault here. Because of the timeline of it being detected this late in 4.1 process, and the behaviour change being very unfriendly (causing silent overwrite of metadata where it was not overwritten before) to users of open source Spark with open source Delta I'd much prefer to be allowed to move forward with a narrowly scoped fix.

I will prepare a PR for option 2.

@juliuszsompolski
Copy link
Contributor Author

juliuszsompolski commented Nov 25, 2025

I raised #53215 with option 2.
Essentially, this is a cleaned up version of 1, so essentially 1, 2, 3 are the same proposal, just I think 2 variant is the cleanest.

@dongjoon-hyun
Copy link
Member

Thank you, @cloud-fan and @juliuszsompolski .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants