Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,50 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
serde = None,
external = false,
constraints = Seq.empty)
val writeOptions = if (source == "delta") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Apache Spark source code have this kind delta-specific logic before, @juliuszsompolski ?

This looks like the first proposal to have a 3rd-party company data source in Apache Spark source code. At the first glance, this string match looks a little fragile to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun I am right now working on researching cleaner solutions. I have raised this PR as a straw man, given the Spark 4.1 timeline and trying to propose the most narrowly scoped change possible that would prevent a behaviour change that would lead to table corruption in Delta (unintentionally overwriting table's metadata in operations that have required an explicit overwriteSchema option for that before). In any case, if this is allowed to get in as a stop gap fix, I would like to replace it with a proper solution.

It could also be done by adding this option always and not mention Delta here, as @HyukjinKwon suggested. It would still be a "strange" piece of code to attach this kind of option just for this particular case, and kind of "leaky". Some people suggested to me that maybe having Spark attach some options that always point to the API that the command originated from could be useful also in other cases, but that's also a much bigger change to design and make.
Another option could be to, since the semantics of saveAsTable in DFWV1 can be interpreted differently that createOrReplace / replace of DFWV2, maybe it could have a new plan node SaveAsV2TableCommand, just like it has it's own node for SaveAsV1TableCommand? But again, this is a lot of changes.
Or, the existing CreateTableAsSelect / ReplaceTableAsSelect should have a flag parameter indicating that it's actually a SaveAsTable command? But again, this is not really clean...

I am researching options.

// This ia a very special workaround for Delta Lake.
// Spark's SaveMode.Overwrite is documented as:
// * if data/table already exists, existing data is expected to be overwritten
// * by the contents of the DataFrame.
// It does not define the behaviour of overwriting the table metadata (schema, etc).
// Delta datasource interpretation of this API documentation of DataFrameWriter V1 is
// to not replace table schema, unless Delta-specific option "overwriteSchema" is set
// to true.
//
// However, DataFrameWriter V1 creates a ReplaceTableAsSelect plan, which is the same as
// the plan of DataFrameWriterV2 createOrReplace API, which is documented as:
// * The output table's schema, partition layout, properties, and other configuration
// * will be based on the contents of the data frame and the configuration set on this
// * writer. If the table exists, its configuration and data will be replaced.
// Therefore, for calls via DataFrameWriter V2 createOrReplace, the metadata always needs
// to be replaced, and Delta datasource doesn't use the overwriteSchema option.
//
// Since the created plan is exactly the same, Delta had used a very ugly hack to detect
// where the API call is coming from based on the stack trace of the call.
//
// In Spark 4.1 in connect mode, this stopped working because planning and execution of
// the commands go decoupled, and the stack trace no longer contains this point where the
// plan got created.
//
// To retain compatibility of the Delta datasource with Spark 4.1 in connect mode, Spark
// provides this explicit storage option to indicate to Delta datasource that this call
// is coming from DataFrameWriter V1.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per:

          // FIXME: Since the details of the documented semantics of Spark's DataFrameWriter V1
          //  saveAsTable API differs from that of CREATE/REPLACE TABLE AS SELECT, Spark should
          //  not be reusing the exact same logical plan for these APIs.
          //  Existing Datasources which have been implemented following Spark's documentation of
          //  these APIs should have a way to differentiate between these APIs.

Why don't we just always append the option? The downstream datasources who care about this behaviour will make the change accordingly.

// FIXME: Since the details of the documented semantics of Spark's DataFrameWriter V1
// saveAsTable API differs from that of CREATE/REPLACE TABLE AS SELECT, Spark should
// not be reusing the exact same logical plan for these APIs.
// Existing Datasources which have been implemented following Spark's documentation of
// these APIs should have a way to differentiate between these APIs.
extraOptions + ("isDataFrameWriterV1" -> "true")
} else {
extraOptions
}
ReplaceTableAsSelect(
UnresolvedIdentifier(nameParts),
partitioningAsV2,
df.queryExecution.analyzed,
tableSpec,
writeOptions = extraOptions.toMap,
writeOptions = writeOptions.toMap,
orCreate = true) // Create the table if it doesn't exist

case (other, _) =>
Expand Down