Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support schema evolution / schema overwrite in DeltaLake MERGE #170

Closed
kentore82 opened this issue Sep 12, 2019 · 34 comments
Closed

Support schema evolution / schema overwrite in DeltaLake MERGE #170

kentore82 opened this issue Sep 12, 2019 · 34 comments
Labels
enhancement New feature or request
Milestone

Comments

@kentore82
Copy link

As far as I can tell, schema evolution / schema overwrite in DeltaLake MERGE is not currently supported. The below pyspark code illustrates my issue (Spark 2.4.4, Scala 2.11, DeltaLake 0.3.0):

schema1 = StructType([ StructField("id", IntegerType()),
                       StructField("col1", StringType()),  
                       StructField("col2", IntegerType())
                      ])
# Define DFs
DF1 = spark.createDataFrame([ [1, "a", 1],
                              [2, "a", 5], 
                              [3, "b", 5]], schema=schema1)

schema2 = StructType([
  StructField("id", IntegerType()),
  StructField("col1", StringType()),  
  StructField("col2", DoubleType()),
  StructField("col3", StringType())  
])

myDF2 =spark.createDataFrame([ [1, "b", 4.0, "a"],
                               [2, "c", 5.0, "b"], 
                               [4, "b", 5.0, "c"]], schema=schema2) 

# Write first DF as a DeltaTable
DF2.write.format("delta") \
          .mode("overwrite") \
          .option("overwriteSchema", "true") \
          .save("/tmp/delta/testtable")

# DeltaLake merge via py4j
jvm_for_table = spark._jvm.io.delta.tables.DeltaTable \
                             .forPath(spark._jsparkSession, "/tmp/delta/testtable")

# workaround to avoid using Python keyword 'as' in JVM method call 
# (Python interpreter will throw an error.)
jvm_for_table_as=getattr(jvm_for_table, 'as')("testtable")

# update existing rows and insert new
jvm_for_table_as.merge(getattr(DF2._jdf, 'as')("updates"), \
"testtable.id = updates.id").whenMatched().updateAll().execute()

# Read merged table and show
spark.read.format("delta").load("/tmp/delta/testtable").show()

This quietly outputs:

+---+----+------+
| id |col1| col2|
+---+----+------+
|  1|   a  | 1  |
|  2|   a  | 5  |
|  3|   b  | 5  |
+---+----+------+

Would be great, if by option (.option("overwriteSchema", "true") .option("mergeSchema", "true") ), to support schemaevolution to get instead:

+---+----+------+------+
| id |col1| col2| col3 |
+---+----+------+------+
|  1|   b  | 4.0 |  a  |
|  2|   c  | 5.0 |  b  |
|  3|   b  | 5.0 |None |
|  4|   b  | 5.0 |  c  |
+---+----+------+------+

and a new schema

StructType([
  StructField("id", IntegerType()),
  StructField("col1", StringType()),  
  StructField("col2", DoubleType()),
  StructField("col3", StringType())  
])
@tdas tdas added the enhancement New feature or request label Sep 19, 2019
@tdas tdas pinned this issue Nov 5, 2019
@subashsivaji
Copy link

Any update on this issue?

@mukulmurthy
Copy link
Collaborator

Not yet. It is something we want to do eventually, but we're not sure we're going to get to it this quarter. It is on our roadmap though.

@mukulmurthy mukulmurthy added this to the Future Roadmap milestone Nov 13, 2019
@gerardwolf
Copy link

I have implemented a solution to do schema evolution in the interim. In my use case I wanted to have the sink represent a superset of columns, i.e. everything that existed at any given stage in the source.

image

image

Hope that helps in the interim!

P.s. Don't judge my Python coding skills, I'm self-taught!

@tdas
Copy link
Contributor

tdas commented Dec 11, 2019

@gerardwolf this is great! thank you. it will be super helpful if you paste the code directly in comments in markdown format rather than screenshots. would be much easier for others to copy and use the code.

@gerardwolf
Copy link

I was a little lazy, source code zipped and attached.

DeltaSchemaMerge.zip

@kkr78
Copy link

kkr78 commented Feb 21, 2020

will this be supported in the next version? we have an issue with this at the moment.

@tdas
Copy link
Contributor

tdas commented Feb 24, 2020

we are hoping to add this for 0.6.0

@tdas tdas modified the milestones: Future Roadmap, 0.6.0 Feb 24, 2020
@Lytiker
Copy link

Lytiker commented Mar 30, 2020

I am trying to use this workaround of yours @gerardwolf, and I am wondering about what the FILTER=reduce does in the SQL query, could you please explain it a bit for me? :)

@gerardwolf
Copy link

@Lytiker. the "reduce" is a variable I defined earlier in the notebook which can contain a WHERE clause. I use the same code to load all my source objects, so this allows me to populate the "reduce" variable with a clause relevant to a specific objects filter criteria. It allows me to MERGE INTO the target delta table using the same filter condition I have on the source query. Basically so I compare apples and apples in the source and sink objects. Hope that makes sense.

@Lytiker
Copy link

Lytiker commented Mar 31, 2020

ah, thank you @gerardwolf , so do I understand you correctly, if I say this is where you would handle the case of incorporating rows for id 1,2 and 4 into DF1 of the example on top of this thread?

@gerardwolf
Copy link

Pretty much just a way for me to 'reduce' the dataset return from the sink side to use in the dataframe comparison to speed up the process. Pretty much pruning data I don't need to compare to speed up the process.

@JassAbidi
Copy link
Contributor

in order to have a clear idea about this issue, I want to share with you the different cases related to this feature and there expected results:

Given

target :

+----+------+
|key1|value1|
+----+------+
|   1|    10|
|   2|    20|
+----+------+

source :

+----+------+------+------+
|key1|value1|value3|value4|
+----+------+------+------+
|   1|   100|  1000| 10000|
|   3|    30|  3000| 30000|
+----+------+------+------+

merge command with :
1- insert clause only

        io.delta.tables.DeltaTable.forPath(spark, tempPath)
            .as("target")
            .merge(source.as("source"), "target.key1 = source.key1")
            .whenNotMatched()
            .insertExpr(
              Map(
            "key1" -> "source.key1",
            "value1" -> "source.value1",
            "value3" -> "source.value3"
            ))
        .option("mergeSchema", "true")
        .execute()

expected result:

            +----+------+------+
            |key1|value1|value3|
            +----+------+------+
            |   3|    30|  3000|
            |   1|    10|  null|
            |   2|    20|  null|
            +----+------+------+

2- update clause only

        io.delta.tables.DeltaTable.forPath(spark, tempPath)
                .as("target")
                .merge(source.as("source"), "target.key1 = source.key1")
                .whenMatched()
                .updateExpr(
                  Map(
                    "value1" -> "source.value1",
                    "value3" -> "source.value3"
                  )
                )
                .option("mergeSchema", "true")
                .execute()

expected result:

            +----+------+------+
            |key1|value1|value3|
            +----+------+------+
            |   1|   100|  1000|
            |   2|    20|  null|
            +----+------+------+

3- insert and update clause :
3.1- new columns in insert actions only

	    io.delta.tables.DeltaTable.forPath(spark, tempPath)
            .as("target")
            .merge(source.as("source"), "target.key1 = source.key1")
            .whenMatched()
            .updateExpr(
              Map(
                "value1" -> "source.value1"
              )
            )
            .whenNotMatched()
            .insertExpr(
              Map(
                "key1" -> "source.key1",
                "value1" -> "source.value1",
                "value3" -> "source.value3"
              ))
            .option("mergeSchema", "true")
            .execute()

expected result:

            +----+------+------+
            |key1|value1|value3|
            +----+------+------+
            |   3|    30|  3000|
            |   1|   100|  null|
            |   2|    20|  null|
            +----+------+------+

3.2- new columns in update actions only

            io.delta.tables.DeltaTable.forPath(spark, tempPath)
            .as("target")
            .merge(source.as("source"), "target.key1 = source.key1")
            .whenMatched()
            .updateExpr(
              Map(
                "value1" -> "source.value1",
	            "value3" -> "source.value3"   
              )
            )
            .whenNotMatched()
            .insertExpr(
              Map(
                "key1" -> "source.key1",
                "value1" -> "source.value1"
              ))
            .option("mergeSchema", "true")
            .execute()

expected result:

            +----+------+------+
            |key1|value1|value3|
            +----+------+------+
            |   3|    30|  null|
            |   1|   100|  1000|
            |   2|    20|  null|
            +----+------+------+

3.3 new columns in insert and update actions :
3.3.1 insertAll, updateAll

   		    io.delta.tables.DeltaTable.forPath(spark, tempPath)
                    .as("target")
                    .merge(source.as("source"), "target.key1 = source.key1")
                    .whenMatched()
                    .updateAll()
                    .whenNotMatched()
                    .insertAll()
                    .option("mergeSchema", "true")
                    .execute()

expected result:

                +----+------+------+------+
                |key1|value1|value3|value4|
                +----+------+------+------+
                |   3|    30|  3000| 30000|
                |   1|   100|  1000| 10000|
                |   2|    20|  null|  null|
                +----+------+------+------+

3.3.2 same subset of columns in insert and update actions

                io.delta.tables.DeltaTable.forPath(spark, tempPath)
                .as("target")
                .merge(source.as("source"), "target.key1 = source.key1")
                .whenMatched()
                .updateExpr(
                  Map(
				    "key1" -> "source.key1",
                    "value1" -> "source.value1",
	                "value3" -> "source.value3"   
                  )
                )
                .whenNotMatched()
                .insertExpr(
                  Map(
                    "key1" -> "source.key1",
                    "value1" -> "source.value1",
					"value3" -> "source.value3"   
                  ))
                .option("mergeSchema", "true")
                .execute()

expected result:

                +----+------+------+
                |key1|value1|value3|
                +----+------+------+
                |   3|    30|  3000|
                |   1|   100|  1000|
                |   2|    20|  null|
                +----+------+------+

3.3.3 subset of columns in insert action, updateAll

	        io.delta.tables.DeltaTable.forPath(spark, tempPath)
                .as("target")
                .merge(source.as("source"), "target.key1 = source.key1")
                .whenMatched()
                .updateAll()
                .whenNotMatched()
                .insertExpr(
                  Map(
                    "key1" -> "source.key1",
                    "value1" -> "source.value1",
					"value3" -> "source.value3"   
                  ))
                .option("mergeSchema", "true")
                .execute()

expected result:

                    +----+------+------+------+
                    |key1|value1|value3|value4|
                    +----+------+------+------+
                    |   3|    30|  3000|  null|
                    |   1|   100|  1000| 10000|
                    |   2|    20|  null|  null|
                    +----+------+------+------+

3.3.4 insertAll, subset of columns in update action

                io.delta.tables.DeltaTable.forPath(spark, tempPath)
                .as("target")
                .merge(source.as("source"), "target.key1 = source.key1")
                .whenMatched()
                .updateExpr(
                  Map(
				    "key1" -> "source.key1",
                    "value1" -> "source.value1",
	                "value3" -> "source.value3"   
                  )
                )
                .whenNotMatched()
                .insertAll()
				.option("mergeSchema", "true")
                .execute()

expected result:

                    +----+------+------+------+
                    |key1|value1|value3|value4|
                    +----+------+------+------+
                    |   3|    30|  3000| 30000|
                    |   1|   100|  1000|  null|
                    |   2|    20|  null|  null|
                    +----+------+------+------+

3.3.5 subset of columns in insert action, subset of columns in update action (different)

                io.delta.tables.DeltaTable.forPath(spark, tempPath)
                .as("target")
                .merge(source.as("source"), "target.key1 = source.key1")
                .whenMatched()
                .updateExpr(
                  Map(
				    "key1" -> "source.key1",
                    "value1" -> "source.value1",
	                "value3" -> "source.value3"   
                  )
                )
                .whenNotMatched()
                .insertExpr(
                  Map(
                    "key1" -> "source.key1",
                    "value1" -> "source.value1",
					"value4" -> "source.value4"   
                  ))
                .option("mergeSchema", "true")
                .execute()

expected result:

                    +----+------+------+------+
                    |key1|value1|value3|value4|
                    +----+------+------+------+
                    |   3|    30|  null| 30000|
                    |   1|   100|  1000|  null|
                    |   2|    20|  null|  null|
                    +----+------+------+------+
  • are these all the cases impacted by the schema evolution? Is there other cases that I'm missing?
  • are these the expected results ?

@tdas
Copy link
Contributor

tdas commented Apr 23, 2020

@JassAbidi These are good scenarios. At the first glance, they seem to make sense. However, it is a little complex to correctly figure out and implement the cases where different subsets of the columns are explicitly referred to in different clauses. In 0.6.0, we implemented a simpler solution where we added schema evolution only for updateAll() and insertAll() operations, where all the source columns will be used to added to the target table. This probably can address a pretty large fraction of use cases and is consistent with DataFrame.write.option("mergeSchema", "true")... where all the DataFrame's columns are added to the table.

We just released 0.6.0 a few minutes back - https://github.com/delta-io/delta/releases/tag/v0.6.0 See the docs linked in the notes for more information on the release schema evolution.

I am going close this ticket to mark the initial implementation of schema evolution as done. There are obviously improvements possible on top of this, please open more specific tickets for them.

@tdas tdas closed this as completed Apr 23, 2020
@tdas tdas unpinned this issue Apr 29, 2020
@kkr78
Copy link

kkr78 commented May 18, 2020

How do I specify the option? Is there an example? I see following error w/ pyspark w/ Delta 0.6.0

AttributeError: 'DeltaMergeBuilder' object has no attribute 'option'

d = DeltaTable.forPath(spark, targetFolder).alias("base")
.merge(cdc.alias("cdc"), "base.x=cdc.x")
.whenMatchedUpdate(set = prepareMap(cdc))
.whenNotMatchedInsert(values = prepareMap(cdc))
.option("mergeSchema", "true")
.execute()

@kkr78
Copy link

kkr78 commented May 18, 2020

Never mind...found the documentation. https://docs.delta.io/0.6.0/delta-update.html#automatic-schema-evolution. I was able to make it work w/ whenMatchedUpdateAll and whenNotMatchedInsertAll. This may work in some cases, in certain cases, we update the value of existing record using whenMatchedUpdate. is there any plan to support for whenMatchedUpdate and whenNotMatchedInsert in future?

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
d = DeltaTable.forPath(spark, targetFolder).alias("base")
.merge(cdc.alias("cdc"), "base.x=cdc.x")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()

@XBeg9
Copy link

XBeg9 commented Jun 19, 2020

@kkr78 could you please post your prepareMap method? :) thanks

@kkr78
Copy link

kkr78 commented Jul 27, 2020

@XBeg9 sorry I forgot to reply back. Not sure if it helps now. Mapping logic in a separate method.

def prepareMap(self, cdc):
rec = { col : "cdc." + str(col) for col in cdc.columns}
if "start_date" in cdc.columns:
rec["start_date"] = "base.start_date"
return rec

@XBeg9
Copy link

XBeg9 commented Jul 27, 2020

This is my solution

val objectDf = kafkaDf.select(col("value.after").alias("object"))
val index = objectDf.schema.fieldIndex("object")
val propSchema = objectDf.schema(index).dataType.asInstanceOf[StructType]
val columns = mutable.HashMap[String, String]()
propSchema.fields.foreach(field =>{
  columns += ("t."+field.name -> "s.object.".concat(field.name))
})

and then you can do this:

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  sinkTable.as("t")
    .merge(microBatchOutputDF.sort(desc("eventTime")).dropDuplicates("key").as("s"), "s.key = t.id")
    .whenMatched("s.op == 'd'").delete()
    .whenMatched().updateExpr(columns)
    .whenNotMatched().insertExpr(columns)
    .execute()
}

@junmuz
Copy link

junmuz commented Jul 29, 2020

Is this feature removed with version 0.7.0? I am not able to update the schema with the new version with Spark 3.0.0. The documentation however still says that it is supported.

I carried out one of the tests but when I check the schema table.toDF().schema(), it is still not updated. I am using Java 11, delta lake 0.7.0 and Spark 3.0.0.

@samkutty94
Copy link

Is this suppoerted in sql merge

@tdas
Copy link
Contributor

tdas commented Oct 1, 2020

@samkutty94 yes it is. all APIs have same functionality and semantics.

@gibran-akram
Copy link

@tdas
You mentioned that updateAll & insertAll covers most use cases, does it also cover scenarios where SCD Type 2 needs to be implemented on the same table that needs to have schema evolution?

For SCD type 2 I want to be able to 'retire' an existing / matching row and add another one with appropriate flags / dates, but I can't do that If i'm forced to use updateAll & insertAll

It seems I'm going to have to choose which feature I can have in my etl process using Delta Lake or is there an alternative approach which isn't documented? Any pointers?

Thanks

@Rohit25negi
Copy link

@tdas what is the depth with which schema evolution works while merging?

Automatic schema evolution does not work while merging in the following case.

import json
d1 = {'a':'b','b':{'c':{'1':1}}}
d2 = {'a':'s','b':{'c':{'1':2,'2':2}}}
d3 = {'a':'v','b':{'c':{'1':4}}}

df1 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d1)]))

#passes
df1.write.saveAsTable('test_table4',format='delta',mode='overwrite', path=f"hdfs://hdmaster:9000/dest/test_table4")


df2 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d2)]))
df2.createOrReplaceTempView('updates')

query = """
MERGE INTO test_table4 existing_records 
        USING updates updates 
        ON existing_records.a=updates.a
        WHEN MATCHED THEN UPDATE SET * 
        WHEN NOT MATCHED THEN INSERT *
"""
spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql(query) #passes



df3 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d3)]))

df3.createOrReplaceTempView('updates')
query = """
MERGE INTO test_table4 existing_records 
        USING updates updates 
        ON existing_records.a=updates.a
        WHEN MATCHED THEN UPDATE SET * 
        WHEN NOT MATCHED THEN INSERT *
"""
spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql(query) #FAILS #FAILS

This looks like failing when depth is more than 2 and incoming df has columns missing.
Is this intentionally like this?
This is handled perfectly with option("mergeSchema", "true") if want to append. But I want to UPSERT the data. But Merge is not able to handle this schema change

Using Delta Lake version 0.8.0

@tdas
Copy link
Contributor

tdas commented Feb 23, 2021

what is the error?

@Rohit25negi
Copy link

@tdas thanks for the quick response.
This is the error I get when running the above code. Cannot cast struct<1:bigint> to struct<1:bigint,2:bigint>. All nested columns must match.;

Noting that if a similar schema change(missing of column in incoming data) happens on the upper levels(I think till depth 2) of df, it works fine.

@tdas
Copy link
Contributor

tdas commented Feb 24, 2021

can you give the full stack trace?

@Rohit25negi
Copy link

@tdas sure, Below is the full stack trace of the error:

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-1-a5bf96348d93> in <module>
     36 """
     37 spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
---> 38 spark.sql(query) #FAILS #FAILS

/usr/local/spark/python/pyspark/sql/session.py in sql(self, sqlQuery)
    647         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    648         """
--> 649         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    650 
    651     @since(2.0)

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/usr/local/spark/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Cannot cast struct<1:bigint> to struct<1:bigint,2:bigint>. All nested columns must match.;

@tdas
Copy link
Contributor

tdas commented Feb 25, 2021

is there any more of the stack trace? the java part of the stack trace after the last line you have shown?

@Rohit25negi
Copy link

@tdas no. that is all it gives.

@divasgupta
Copy link

I am facing issue while merging the schema with existing Delta table, please find below scenario -
At first iteration of processing the Json file -
Delta table created by inferring schema of Json file and so it got a column (For say Column Name is Col1) with DataType as StringType where value was NULL initially.

Second iteration, while appending another Json file with this Delta table -
Col1 got some StructType Data in the Json file

While appending with option mergeSchema = true, it is throwing error something like below -
Failed to merge fields Col1 and Col1. Failed to merge incompatible data types StringType and StructType

@Rohit25negi
Copy link

Rohit25negi commented Mar 5, 2021

@divasgupta could you please share the code and the data you are merging I might be able to help

@francescomucio
Copy link

@Rohit25negi I am facing the same problem, how did you solve it?

@rachiitgupta
Copy link

I am facing a similar issue as @divasgupta,
I have columns c1 in my first iteration, while c1 is dropped in the second iteration.

The error I get is The specified schema does not match the existing schema

tdas pushed a commit to tdas/delta that referenced this issue May 31, 2023
@TheKnightCoder
Copy link

does this work now? I can't seem to get it working, it just drops the new columns

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests