In [None]:
import warnings
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Window
from delta.tables import DeltaTable

warnings.filterwarnings("ignore", category=FutureWarning)

# Create SparkSession
spark = SparkSession.builder.appName("DeltaSession") \
            .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
            .getOrCreate()

## [whenMatchedUpdate](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdate)

whenMatchedUpdate(condition: Union[pyspark.sql.column.Column, str, None] = None, set: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → delta.tables.DeltaMergeBuilder

Update a matched table row based on the rules defined by set. If a condition is specified, then it must evaluate to true for the row to be updated.

Parameters:
- condition (str or pyspark.sql.Column) – Optional condition of the update
- set (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.

## [whenMatchedUpdateAll](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll)

whenMatchedUpdateAll(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder

Update all the columns of the matched table row with the values of the corresponding columns in the source row. If a condition is specified, then it must be true for the new row to be updated.

See DeltaMergeBuilder for complete usage details.

Parameters:
- condition (str or pyspark.sql.Column) – Optional condition of the insert


## [whenMatchedDelete](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedDelete)

whenMatchedDelete(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder

Delete a matched row from the table only if the given condition (if specified) is true for the matched row.

See DeltaMergeBuilder for complete usage details.

Parameters:
- condition (str or pyspark.sql.Column) – Optional condition of the delete

## [whenNotMatchedInsert](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedInsert)

whenNotMatchedInsert(condition: Union[pyspark.sql.column.Column, str, None] = None, values: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → delta.tables.DeltaMergeBuilder

Insert a new row to the target table based on the rules defined by values. If a condition is specified, then it must evaluate to true for the new row to be inserted.

Parameters:
- condition (str or pyspark.sql.Column) – Optional condition of the insert
- values (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.

## [whenNotMatchedInsertAll](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedInsertAll)

whenNotMatchedInsertAll(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder

Insert a new target Delta table row by assigning the target columns to the values of the corresponding columns in the source row. If a condition is specified, then it must evaluate to true for the new row to be inserted.

Parameters:
- condition (str or pyspark.sql.Column) – Optional condition of the insert

## whenNotMatchedInsert

In [None]:
data = [(0, "Bob", 23), 
        (1, "Sue", 25), 
        (2, "Jim", 27)]

df = spark.createDataFrame(data).toDF("id", "name", "age")

df.toPandas()

In [None]:
df.repartition(1).write.format("delta").save("extract/04merge")

In [None]:
new_data = [
    (0, "Bob", 23),  # exists in our original dataset above
    (3, "Sally", 30),  # new data
    (4, "Henry", 33),  # new data
]

new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)

new_df.toPandas()

In [None]:
people_table = DeltaTable.forPath(spark, "extract/04merge")

people_table.toDF().toPandas()

In [None]:
(
    people_table.alias("target")
    .merge(
        source=new_df.alias("source"), 
        condition="target.id = source.id"
    ).whenNotMatchedInsert(
        values={"id": "source.id", 
                "name": "source.name"}
    ).execute()
)

In [None]:
people_table.toDF().limit(10).toPandas()

## whenNotMatchedInsertAll

In [None]:
new_data = [
    (0, "Bob", 23),  # exists in our original dataset above
    (5, "Thamires", 30),  # new data
    (6, "Cristian", 33),  # new data
]

new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)

new_df.toPandas()

In [None]:
(
    people_table.alias("target")
    .merge(
        source=new_df.alias("source"), 
        condition="target.id = source.id"
    ).whenNotMatchedInsertAll(
    ).execute()
)

In [None]:
people_table.toDF().orderBy('id').limit(10).toPandas()

## whenMatchedUpdate

In [None]:
new_data = [
    (4, "Henry", 34),
    (10, "Allie", 22),
]

new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)

new_df.toPandas()

In [None]:
(
    people_table.alias("target")
    .merge(
        source=new_df.alias("source"), 
        condition="target.id = source.id"
    ).whenMatchedUpdate(
        set={"age": "source.age"}
    )
    .whenNotMatchedInsertAll()
    .execute()
)

In [None]:
people_table.toDF().limit(10).orderBy('id').toPandas()

## Apply change data with merge

In [None]:
new_data = [
    (9, "Richard", 75, "INSERT"),
    (3, "Sally", 31, "UPDATE"),
    (0, "Bob", 23, "DELETE"),
]


new_df = spark.createDataFrame(new_data).toDF("id", "name", "age", "_op").repartition(1)

new_df.orderBy('id').toPandas()

In [None]:
(
    people_table.alias("target")
    .merge(
        source=new_df.alias("source"), 
        condition="target.id = source.id"
    ).whenNotMatchedInsert(
        condition='source._op = "INSERT"',
        values={"id": "source.id", 
                "name": "source.name", 
                "age": "source.age"},
    ).whenMatchedUpdate(
        condition='source._op = "UPDATE"',
        set={"id": "source.id", 
             "name": "source.name", 
             "age": "source.age"},
    ).whenMatchedDelete(
        condition='source._op = "DELETE"'
    ).execute()
)

In [None]:
people_table.toDF().limit(10).orderBy('id').toPandas()

## Delta Lake merge for partial Change Data

In [None]:
new_data = [
    (1, "SueNew", None, "UPDATE"),
    (3, None, 32, "UPDATE"),
]

new_df = spark.createDataFrame(new_data).toDF("id", "name", "age", "_op").repartition(1)

new_df.toPandas()

In [None]:
(
    people_table.alias("target")
    .merge(
        source=new_df.alias("source"), 
        condition="target.id = source.id"
    ).whenMatchedUpdate(
        condition='source._op = "UPDATE"',
        set={
            "id": "source.id",
            "name": "CASE WHEN source.name IS NOT NULL THEN source.name ELSE target.name END",
            "age": "CASE WHEN source.age IS NOT NULL THEN source.age ELSE target.age END",
        },
    ).execute()
)

In [None]:
people_table.toDF().orderBy('id').limit(10).toPandas()

## [whenNotMatchedBySourceUpdate](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedBySourceUpdate)

whenNotMatchedBySourceUpdate(condition: Union[pyspark.sql.column.Column, str, None] = None, set: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → delta.tables.DeltaMergeBuilder

Update a target row that has no matches in the source based on the rules defined by set. If a condition is specified, then it must evaluate to true for the row to be updated.

See DeltaMergeBuilder for complete usage details.

Parameters:
- condition (str or pyspark.sql.Column) – Optional condition of the update
- set (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.

In [None]:
data = [
    (0, "Bob", 23, datetime.date(2022, 1, 2), "inactive"),  # inactive
    (1, "Sue", 25, datetime.date(2023, 4, 5), "active"),  # active
    # marked as active, but should not be active anymore
    (2, "Jim", 27, datetime.date(2023, 2, 7), "active",),
]

df = spark.createDataFrame(data).toDF("id", "name", "age", "last_seen", "status")

df.toPandas()

In [None]:
df.repartition(1).write.format("delta").save("extract/04bysource")

In [None]:
customers_table = DeltaTable.forPath(spark, "extract/04bysource")

customers_table.toDF().toPandas()

In [None]:
new_data = [
    (0, "Bob", 23, datetime.date.today()),  # existing customer
    (3, "Sally", 30, datetime.date.today()),  # new customer
]

new_df = spark.createDataFrame(new_data).toDF("id", "name", "age", "current_date")

new_df.toPandas()

In [None]:
(
    customers_table.alias("target")
    .merge(
        source=new_df.alias("source"), 
        condition="target.id = source.id"
    )
    .whenMatchedUpdate(
        set={"target.last_seen": "source.current_date", 
             "target.status": "'active'"}
    )
    .whenNotMatchedInsert(
        values={
            "target.id": "source.id",
            "target.name": "source.name",
            "target.age": "source.age",
            "target.last_seen": "source.current_date",
            "target.status": "'active'",
        }
    )
    .whenNotMatchedBySourceUpdate(
        condition="target.last_seen <= (current_date() - INTERVAL '30' DAY)",
        set={"target.status": "'inactive'"},
    )
    .execute()
)

In [None]:
DeltaTable.forPath(spark, "extract/04bysource").toDF().toPandas()

## [whenNotMatchedBySourceDelete](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedBySourceDelete)

whenNotMatchedBySourceDelete(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder

Delete a target row that has no matches in the source from the table only if the given condition (if specified) is true for the target row.

Parameters:
- condition (str or pyspark.sql.Column) – Optional condition of the delete

In [None]:
(
    customers_table.alias("target")
    .merge(
        source=new_df.alias("source"), 
        condition="target.id = source.id"
    )
    .whenMatchedUpdate(
        set={"target.last_seen": "source.current_date", 
             "target.status": "'active'"}
    )
    .whenNotMatchedInsert(
        values={
            "target.id": "source.id",
            "target.name": "source.name",
            "target.age": "source.age",
            "target.last_seen": "source.current_date",
            "target.status": "'active'",
        }
    )
    .whenNotMatchedBySourceDelete(
        condition="target.last_seen <= (current_date() - INTERVAL '30' DAY)",
    )
    .execute()
)

In [None]:
DeltaTable.forPath(spark, "extract/04bysource").toDF().toPandas()