
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning">
</div>


# 9 (BONUS) - Data Ingestion with MERGE INTO

### You may not have time to complete this during class, so please review it afterward.

MERGE INTO in Databricks is a powerful tool for data ingestion, especially for data ingestion. It enables efficient, atomic, scalable upsert and delete operations. This command is useful when you have an existing Delta table and you wish to combine incoming data. 


### Learning Objectives
By the end of this lesson, you should be able to:
- Utilize MERGE INTO to perform updates, inserts, and deletes on Delta tables.
- Apply MERGE INTO with schema enforcement to manage data integrity.
- Apply MERGE INTO with schema evolution to evolve the target tables.

## REQUIRED - SELECT CLASSIC COMPUTE

Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default and you have a Shared SQL warehouse.

<!-- ![Select Cluster](./Includes/images/selecting_cluster_info.png) -->

Follow these steps to select the classic compute cluster:


1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

2. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

   - Click **More** in the drop-down.

   - In the **Attach to an existing compute resource** window, use the first drop-down to select your unique cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

2. Find the triangle icon to the right of your compute cluster name and click it.

3. Wait a few minutes for the cluster to start.

4. Once the cluster is running, complete the steps above to select your cluster.


## A. Classroom Setup

Run the following cell to configure your working environment for this notebook.

**NOTE:** The `DA` object is only used in Databricks Academy courses and is not available outside of these courses. It will dynamically reference the information needed to run the course in the lab environment.

In [0]:
%run ./Includes/Classroom-Setup-09

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


----------------------------------------------------------------------------------------
Directory /Volumes/dbacademy/ops/labuser10983516_1758981184@vocareum_com/csv_demo_files already exists. No action taken.
Directory /Volumes/dbacademy/ops/labuser10983516_1758981184@vocareum_com/json_demo_files already exists. No action taken.
Directory /Volumes/dbacademy/ops/labuser10983516_1758981184@vocareum_com/xml_demo_files already exists. No action taken.
----------------------------------------------------------------------------------------



Created the main_users_target table.


Created the update_users_source table.


Created the new_users_source table.


Run the cell below to view your default catalog and schema. Notice that your default catalog is **dbacademy** and your default schema is your unique **labuser** schema.

**NOTE:** The default catalog and schema are pre-configured for you to avoid the need to specify the three-level name when writing your tables (i.e., catalog.schema.table).

In [0]:
SELECT current_catalog(), current_schema()

current_catalog(),current_schema()
dbacademy,labuser10983516_1758981184


## B. Preview the Current Delta table

1. Preview the **main_users_target** table (the target table to update). 

    Notice that the table contains 4 rows of user information including their **id**, **first_name**, **email**, **sign_up_date**, and **status**. Each user's status is *current*.

In [0]:
SELECT * 
FROM main_users_target
ORDER BY id;

id,first_name,email,sign_up_date,status
1,Panagiotis,panagiotis@example.com,2024-01-01,current
2,Samarth,samarth@example.com,2024-01-05,current
3,Zebi,zebi@example.com,2024-01-10,current
4,Mark,mark@leadinst.com,2024-02-10,current


2. Preview the **update_users_source** table (the table to use to update the target). You can think of this as your incoming dataset that has arrived in cloud object storage. 

    Notice that the table contains 4 rows and the same columns. In the **status** column, it displays the action to take for each user. We want to:
    - delete user **id** *1*
    - update the email of user **id** *2*
    - add new users with **id** values of *5* and *6*.

In [0]:
SELECT *
FROM update_users_source
ORDER BY id;

id,first_name,email,sign_up_date,status
1,Panagiotis,panagiotis@example.com,2024-01-01,delete
2,Samarth,samarth123@newemail.com,2024-01-05,update
5,Owen,owent@theemail.com,2023-01-15,new
6,Eva,ej@princessemail.com,2023-01-15,new


## C. MERGE INTO

As a part of ingestion, you can perform inserts, updates and deletes using data from a source table, view, or DataFrame into a target Delta table using the `MERGE` SQL operation. Delta Lake supports inserts, updates and deletes in `MERGE`, and supports extended syntax beyond the SQL standards to facilitate advanced use cases.
<br></br>

```
MERGE INTO target t
USING source s
ON {merge_condition}
WHEN MATCHED THEN {matched_action}
WHEN NOT MATCHED THEN {not_matched_action}
```

### C1. Merge Target with the Incoming Data
In this scenario we will want to update the current **main_users_target** table with user updates from the **update_users_source** table.

1. Use the `MERGE INTO` statement to merge the **update_users_source** table into the **main_users_target** table based on the **id** column.

    The code below does the following:
    - The `MERGE INTO` specifies the target table **main_users_target** to be modified. The table referenced must be a Delta table.
    - The `USING` statement specifies the source table **update_users_source** to be merged into the target table.
    - The `ON` statement specifies the condition for merging. Here it will merge based on the matching **id** values.
    - The `WHEN MATCHED AND source.status = 'update' THEN UPDATE SET` clause will update the target table's **email** and **status** if the condition is true.
    - The `WHEN MATCHED AND source.status = 'delete' THEN DELETE` clause will delete the target table's row if true.
    - The `WHEN NOT MATCHED THEN INSERT {cols} VALUES {columns to insert}` clause will insert new rows from the target table if there is not a match of the **id** column.

    Run the statement and view the results. Notice that:
    - the **num_affected_rows** is *4*
    - the **num_updated_rows** is *1*
    - the **num_deleted_rows** is *1*
    - the **num_inserted_rows** is *2*.


In [0]:
MERGE INTO main_users_target target
USING update_users_source source
ON target.id = source.id
WHEN MATCHED AND source.status = 'update' THEN
  UPDATE SET 
    target.email = source.email,
    target.status = source.status
WHEN MATCHED AND source.status = 'delete' THEN
  DELETE
WHEN NOT MATCHED THEN
  INSERT (id, first_name, email, sign_up_date, status)
  VALUES (source.id, source.first_name, source.email, source.sign_up_date, source.status);

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
4,1,1,2


2. View the updated **main_users_target** table. Notice that:
    - User **id** *1* was deleted.
    - User **id** *2* has an updated email.
    - User **id** *5* and *6* were added.


In [0]:
SELECT *
FROM main_users_target
ORDER BY id;

id,first_name,email,sign_up_date,status
2,Samarth,samarth123@newemail.com,2024-01-05,update
3,Zebi,zebi@example.com,2024-01-10,current
4,Mark,mark@leadinst.com,2024-02-10,current
5,Owen,owent@theemail.com,2023-01-15,new
6,Eva,ej@princessemail.com,2023-01-15,new


3. Use the `DESCRIBE HISTORY` statement to view the history of the **main_users_target** table. Notice that there are now 4 versions of the table.
    - Version *0* is the initial creation of the empty table.
    - Version *1* is the insertion of values into the table.
    - Version *2* is the merge (inserts, updates, deletes).
    - Version *3* is the optimization that occurred on the Delta table.

In [0]:
DESCRIBE HISTORY main_users_target;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2025-09-27T14:14:57Z,77561067406028,labuser10983516_1758981184@vocareum.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(1413133777274598),0927-135345-y4onvy73,2.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 3632, p25FileSize -> 1859, numDeletionVectorsRemoved -> 1, minFileSize -> 1859, numAddedFiles -> 1, maxFileSize -> 1859, p75FileSize -> 1859, p50FileSize -> 1859, numAddedBytes -> 1859)",,Databricks-Runtime/16.4.x-scala2.12
2,2025-09-27T14:14:53Z,77561067406028,labuser10983516_1758981184@vocareum.com,MERGE,"Map(predicate -> [""(id#8746 = id#8756)""], clusterBy -> [], matchedPredicates -> [{""predicate"":""(status#8760 = update)"",""actionType"":""update""},{""predicate"":""(status#8760 = delete)"",""actionType"":""delete""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(1413133777274598),0927-135345-y4onvy73,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 1, numTargetFilesAdded -> 1, numTargetBytesAdded -> 1817, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 3696, materializeSourceTimeMs -> 3, numTargetRowsInserted -> 2, numTargetRowsMatchedDeleted -> 1, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 1656, numTargetRowsUpdated -> 1, numOutputRows -> 3, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 4, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1974)",,Databricks-Runtime/16.4.x-scala2.12
1,2025-09-27T14:14:20Z,77561067406028,labuser10983516_1758981184@vocareum.com,WRITE,"Map(mode -> Append, statsOnLoad -> false, partitionBy -> [])",,List(1413133777274598),0927-135345-y4onvy73,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 4, numOutputBytes -> 1815)",,Databricks-Runtime/16.4.x-scala2.12
0,2025-09-27T14:14:19Z,77561067406028,labuser10983516_1758981184@vocareum.com,CREATE OR REPLACE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(1413133777274598),0927-135345-y4onvy73,,WriteSerializable,True,Map(),,Databricks-Runtime/16.4.x-scala2.12


4. You can use `VERSION AS OF` to query a specific version of the table. Query version *1* of the table to view the original data.

In [0]:
SELECT *
FROM main_users_target VERSION AS OF 1;

id,first_name,email,sign_up_date,status
1,Panagiotis,panagiotis@example.com,2024-01-01,current
2,Samarth,samarth@example.com,2024-01-05,current
3,Zebi,zebi@example.com,2024-01-10,current
4,Mark,mark@leadinst.com,2024-02-10,current


### C2. Schema Enforcement and Schema Evolution with MERGE INTO
What if your source data evolves and adds new columns? You can use `MERGE WITH SCHEMA EVOLUTION` to update the schema of the target table.

1. View the updated **main_users_target** table. Confirm that it contains *5* columns with the updated values from the previous `MERGE INTO`.


In [0]:
SELECT *
FROM main_users_target
ORDER BY ID;

id,first_name,email,sign_up_date,status
2,Samarth,samarth123@newemail.com,2024-01-05,update
3,Zebi,zebi@example.com,2024-01-10,current
4,Mark,mark@leadinst.com,2024-02-10,current
5,Owen,owent@theemail.com,2023-01-15,new
6,Eva,ej@princessemail.com,2023-01-15,new


2. View the **new_users_source** table. This table contains an additional column named **country**, which captures information about our new users. This column was not captured with the original data.


In [0]:
SELECT *
FROM new_users_source

id,first_name,email,sign_up_date,status,country
7,Kristi,kristi@theemail.com,2023-01-15,new,USA
8,Mohammad,mohammad@princessemail.com,2023-01-15,new,Pakistan
9,Christos,christos@example.com,2024-01-01,new,Greece


3. Use the `MERGE INTO` statement to update the target table **main_users_target** with the **new_users_source** table.

    The only change in this `MERGE INTO` statement is in the last `WHEN NOT MATCHED AND source.status='new' THEN` clause. Here, we added the **country** column to insert into the target table.

    Run the query and view the error:

    *Cannot resolve country in INSERT clause given columns target.id, target.first_name, target.email, target.sign_up_date, target.status.*

    Notice that the statement cannot resolve the **country** column in the INSERT clause.


In [0]:
--------------------------------------------
-- This query will return an ERROR
--------------------------------------------

MERGE INTO main_users_target target
USING new_users_source source
ON target.id = source.id
WHEN MATCHED AND source.status = 'update' THEN
  UPDATE SET 
    target.email = source.email,
    target.status = source.status
WHEN MATCHED AND source.status = 'delete' THEN
  DELETE
WHEN NOT MATCHED AND source.status = 'new' THEN
  INSERT (id, first_name, email, sign_up_date, status, country)
  VALUES (source.id, source.first_name, source.email, source.sign_up_date, source.status, source.country);

com.databricks.sql.transaction.tahoe.DeltaAnalysisException: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve country in INSERT clause given columns target.id, target.first_name, target.email, target.sign_up_date, target.status.; line 5 pos 0
	at com.databricks.sql.transaction.tahoe.ResolveDeltaMergeInto$.$anonfun$throwIfNotResolved$3(ResolveDeltaMergeInto.scala:57)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at com.databricks.sql.transaction.tahoe.ResolveDeltaMergeInto$.throwIfNotResolved(ResolveDeltaMergeInto.scala:50)
	at com.databricks.sql.transaction.tahoe.BatchedDeltaMergeActionResolver.batchResolveTargetColumns(DeltaMergeActionResolver.scala:214)
	at com.databricks.sql.transaction.tahoe.BatchedDeltaMergeActionResolver.resolve(DeltaMergeActionResolver.scala:264)
	at com.databricks.sql.transaction.t

4. You must explicitly enable schema evolution to evolve the schema of the target table. In Databricks Runtime 15.2 and above, you can specify schema evolution in a merge statement using SQL with `MERGE WITH SCHEMA EVOLUTION INTO` statement. 

    [Schema evolution syntax for merge](https://docs.databricks.com/en/delta/update-schema.html#schema-evolution-syntax-for-merge)

**NOTES**: You can also set the Spark conf `spark.databricks.delta.schema.autoMerge.enabled` to *true* for the current SparkSession. For more information check out the [Enable schema evolution](https://docs.databricks.com/en/delta/update-schema.html#enable-schema-evolution) documentation page.

In [0]:
MERGE WITH SCHEMA EVOLUTION INTO main_users_target target  -- Use the MERGE WITH SCHEMA EVOLUTION INTO statement
USING new_users_source source
ON target.id = source.id
WHEN MATCHED AND source.status = 'update' THEN
  UPDATE SET 
    target.email = source.email,
    target.status = source.status
WHEN MATCHED AND source.status = 'delete' THEN
  DELETE
WHEN NOT MATCHED AND source.status = 'new' THEN
  INSERT (id, first_name, email, sign_up_date, status, country)
  VALUES (source.id, source.first_name, source.email, source.sign_up_date, source.status, source.country);

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


5. Preview the **main_users_target** table. Notice the following:
    - The **country** column was added to the table, evolving the table schema.
    - The three new users were inserted into the table.


In [0]:
SELECT * 
FROM main_users_target;

id,first_name,email,sign_up_date,status,country
7,Kristi,kristi@theemail.com,2023-01-15,new,USA
8,Mohammad,mohammad@princessemail.com,2023-01-15,new,Pakistan
9,Christos,christos@example.com,2024-01-01,new,Greece
3,Zebi,zebi@example.com,2024-01-10,current,
4,Mark,mark@leadinst.com,2024-02-10,current,
2,Samarth,samarth123@newemail.com,2024-01-05,update,
5,Owen,owent@theemail.com,2023-01-15,new,
6,Eva,ej@princessemail.com,2023-01-15,new,


6. View the history of the **main_users_target** table. Notice there is now a version *_4_* reflecting the latest merge.


In [0]:
DESCRIBE HISTORY main_users_target;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2025-09-27T14:18:31Z,77561067406028,labuser10983516_1758981184@vocareum.com,MERGE,"Map(predicate -> [""(id#11302 = id#11314)""], clusterBy -> [], matchedPredicates -> [{""predicate"":""(status#11318 = update)"",""actionType"":""update""},{""predicate"":""(status#11318 = delete)"",""actionType"":""delete""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""predicate"":""(status#11318 = new)"",""actionType"":""insert""}])",,List(1413133777274598),0927-135345-y4onvy73,4.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 0, numTargetBytesAdded -> 0, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 1866, materializeSourceTimeMs -> 1, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 976, numTargetRowsUpdated -> 0, numOutputRows -> 0, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 3, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 850)",,Databricks-Runtime/16.4.x-scala2.12
4,2025-09-27T14:17:01Z,77561067406028,labuser10983516_1758981184@vocareum.com,MERGE,"Map(predicate -> [""(id#10864 = id#10869)""], clusterBy -> [], matchedPredicates -> [{""predicate"":""(status#10873 = update)"",""actionType"":""update""},{""predicate"":""(status#10873 = delete)"",""actionType"":""delete""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""predicate"":""(status#10873 = new)"",""actionType"":""insert""}])",,List(1413133777274598),0927-135345-y4onvy73,3.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 2093, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 1145, materializeSourceTimeMs -> 1, numTargetRowsInserted -> 3, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 488, numTargetRowsUpdated -> 0, numOutputRows -> 3, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 3, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 599)",,Databricks-Runtime/16.4.x-scala2.12
3,2025-09-27T14:14:57Z,77561067406028,labuser10983516_1758981184@vocareum.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(1413133777274598),0927-135345-y4onvy73,2.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 3632, p25FileSize -> 1859, numDeletionVectorsRemoved -> 1, minFileSize -> 1859, numAddedFiles -> 1, maxFileSize -> 1859, p75FileSize -> 1859, p50FileSize -> 1859, numAddedBytes -> 1859)",,Databricks-Runtime/16.4.x-scala2.12
2,2025-09-27T14:14:53Z,77561067406028,labuser10983516_1758981184@vocareum.com,MERGE,"Map(predicate -> [""(id#8746 = id#8756)""], clusterBy -> [], matchedPredicates -> [{""predicate"":""(status#8760 = update)"",""actionType"":""update""},{""predicate"":""(status#8760 = delete)"",""actionType"":""delete""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(1413133777274598),0927-135345-y4onvy73,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 1, numTargetFilesAdded -> 1, numTargetBytesAdded -> 1817, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 3696, materializeSourceTimeMs -> 3, numTargetRowsInserted -> 2, numTargetRowsMatchedDeleted -> 1, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 1656, numTargetRowsUpdated -> 1, numOutputRows -> 3, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 4, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1974)",,Databricks-Runtime/16.4.x-scala2.12
1,2025-09-27T14:14:20Z,77561067406028,labuser10983516_1758981184@vocareum.com,WRITE,"Map(mode -> Append, statsOnLoad -> false, partitionBy -> [])",,List(1413133777274598),0927-135345-y4onvy73,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 4, numOutputBytes -> 1815)",,Databricks-Runtime/16.4.x-scala2.12
0,2025-09-27T14:14:19Z,77561067406028,labuser10983516_1758981184@vocareum.com,CREATE OR REPLACE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(1413133777274598),0927-135345-y4onvy73,,WriteSerializable,True,Map(),,Databricks-Runtime/16.4.x-scala2.12



&copy; 2025 Databricks, Inc. All rights reserved. Apache, Apache Spark, Spark, the Spark Logo, Apache Iceberg, Iceberg, and the Apache Iceberg logo are trademarks of the <a href="https://www.apache.org/" target="blank">Apache Software Foundation</a>.<br/>
<br/><a href="https://databricks.com/privacy-policy" target="blank">Privacy Policy</a> | 
<a href="https://databricks.com/terms-of-use" target="blank">Terms of Use</a> | 
<a href="https://help.databricks.com/" target="blank">Support</a>