Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta merge doesn't update schema (automatic schema evolution enabled) #553

Open
louisdubaere opened this issue Nov 18, 2020 · 17 comments
Open
Assignees
Labels
acknowledged This issue has been read and acknowledged by Delta admins assessment Assessing issue or PR bug Something isn't working

Comments

@louisdubaere
Copy link

Hi,

I am having problems with the Automatic Schema Evolution for merges with delta tables.

I have a certain Delta table in my data lake with around 330 columns (the target table) and I want to upsert some new records into this delta table. The thing is that this 'source' table has some extra columns that aren't present in the target Delta table. I use the following code for the merge in Databricks:

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")
from delta.tables import *
deltaTarget = DeltaTable.forPath(spark, pathInDataLake)
deltaTarget.alias('target').merge(df.alias('source'), mergeStatement).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

While the documentation on Automatic schema evolution indicates that the schema will be changed when using .whenMatchedUpdateAll() and .whenNotMatchedInsertAll(), this piece of code gives the following error:

AnalysisException: cannot resolve new_column in UPDATE clause given columns [list of columns in the target table].

I have the impression that I had this issue in the past but was able to solve it then with the spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true") setting.

Am I missing something to make the automatic schema evolution work?

@jose-torres
Copy link
Contributor

Looks like you have a trailing space at the end of "spark.databricks.delta.schema.autoMerge.enabled ". Can you check if it works after removing that? (It's unfortunate that there's no Spark handle to catch this and ensure that only valid confs are specified.)

@louisdubaere
Copy link
Author

Thanks for your comment, you are right but unfortunately this doesn't solve the problem. The autoMerge option was also enabled in the Spark config of the Databricks cluster so I think it was enabled either way, but it still gives the same error.

@Dom-M-C
Copy link

Dom-M-C commented Jan 19, 2021

I've come across the same issue when merging a dataframe with 4 new fields into a delta table with >900 fields

@gaco
Copy link

gaco commented Sep 17, 2021

I had the same problem.
Use spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True) and not spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") and it should work.

@scottsand-db scottsand-db added bug Something isn't working acknowledged This issue has been read and acknowledged by Delta admins assessment Assessing issue or PR labels Oct 7, 2021
@roenciso
Copy link

We are having this same issue for our pipelines running on Azure Data Factory. Is there an ETA for the fix?
TIA!

@thonsinger-rseg
Copy link

Schema automerge, using the same merge to the same delta table, worked in Scala, but not Python. Is there an ETA for the fix? Thanks.

@zsxwing
Copy link
Member

zsxwing commented Feb 16, 2022

@thonsinger-rseg Do you have a reproduction? I tried the following code and it worked for me:

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
from delta.tables import *
path = "/tmp/mergetest"
df = spark.range(20).selectExpr("id", "id as id2")
spark.range(10).write.format("delta").save(path)
target = DeltaTable.forPath(spark, path)
target.alias('target').merge(df.alias('source'), "target.id = source.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
DeltaTable.forPath(spark, path).toDF().show()

@thonsinger-rseg
Copy link

@zsxwing i have a whenMatched() condition that comes before my .whenMatchedUpdateAll() and .whenNotMatchedInsertAll(). that's the only real difference. exact same logic on the exact same delta table worked in Scala, but not Python.

(silver_table.alias("t") .merge(bronze_cdc_df.alias("s"), merge_match) .whenMatchedUpdate("s.DeletedDate IS NOT NULL", soft_deleted_cols_map) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute())

@AlexWRZ01
Copy link

AlexWRZ01 commented Feb 18, 2022

I have the same issue. A table with about 20 or so columns. I am merging a data frame that has 1 additional column, and a smaller number of records. The new column contains integers and has no null values.

In Scala, this works with no problem.

In Python it fails, on an error similar to:

AnalysisException: cannot resolve 'struct(ID, oldField, newField)' due to data type mismatch: cannot cast struct<ID:int, oldField:int, newField:void> to struct<ID:int, oldField:int, newField:int>;

It seems like the schema is being evolved to a void for the new field, and then merge fails due to mismatch on data types.

zsxwing's example works, but a real table in our warehouse does not.

@AlexWRZ01
Copy link

AlexWRZ01 commented Feb 21, 2022

There is a workaround for this. Do an empty dataframe append with schema merge before doing the delta merge:

df.limit(0).write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(tableName)

Then perform the normal merge using DeltaTable, but don't enable spark.databricks.delta.schema.autoMerge.enabled

For some reason append merge schema works, but delta auto merge does not.

@zsxwing zsxwing self-assigned this Feb 22, 2022
@zsxwing
Copy link
Member

zsxwing commented May 10, 2022

@AlexWRZ01 could you provide the table schema and the merged data frame schema if possible so that we can try to create a reproduction? You can just call the schema.json on a DataFrame to get the schema, such as spark.range(10).schema.json().

@sdaberdaku
Copy link

I am having a similar problem when I try to use multi-threading to speed up the merge operations of different, independent delta tables with PySpark. When submitting multiple merge operations with parallel threads to Spark, the new columns are not added to the resulting delta tables. I am using the same SparkSession object across all threads (should be thread safe).

@zsxwing
Copy link
Member

zsxwing commented Jan 5, 2023

@sdaberdaku did you hit the same error? Do you have a quick reproduction we can try?

@doggerz
Copy link

doggerz commented May 16, 2023

@zsxwing I have encountered a similar issue while using Delta with Java Spark. I managed to find a workaround by configuring the following properties in the spark-defaults file:

"spark.databricks.delta.schema.autoMerge.enabled": "True",
"spark.databricks.delta.schema.autoMerge.enabledOnWrite": "True"

It's worth noting that I observed Spark being case-sensitive when specifying these properties. If I switch them back to lowercase or use "true" instead of "True," the problem resurfaces. It's important to maintain the correct case and use "True" to ensure the properties are recognized properly.

Furthermore, I should mention that this issue seems to be specific to AWS EMR servers, in my case emr-6.9.0 with delta 2.1.0, as I haven't encountered it in other environments. Therefore, this workaround may be applicable specifically to AWS EMR setups.

I hope this explanation helps in debugging.

@zeotuan
Copy link

zeotuan commented Oct 24, 2023

I am encountering this issue using scala on a standalone cluster with client deployment mode. Going to test to see if

"spark.databricks.delta.schema.autoMerge.enabled": "True"

instead of

"spark.databricks.delta.schema.autoMerge.enabled": "true"

fix the issue

@matheus-rossi
Copy link

matheus-rossi commented Dec 20, 2023

I am having a similar problem when I try to use multi-threading to speed up the merge operations of different, independent delta tables with PySpark. When submitting multiple merge operations with parallel threads to Spark, the new columns are not added to the resulting delta tables. I am using the same SparkSession object across all threads (should be thread safe).

Having the same issue here. Did you solve the problem ? @sdaberdaku

@oliver-was-here
Copy link

@zsxwing i'm experiencing the same issue as those above.

I'm using pyspark on a cluster running LTS 12.2 runtime.

i tried setting the value in the Spark config seen below without success (tried both true and True)
image

similarly, i tried setting it explicitly in the script like:

  • spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")
  • spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
  • spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
acknowledged This issue has been read and acknowledged by Delta admins assessment Assessing issue or PR bug Something isn't working
Projects
None yet
Development

No branches or pull requests