### Table deletes, updates, and merges

Delta Lake supports several statements to facilitate deleting data from and updating data in Delta tables. Delta Lake is an open-source storage layer that brings ACID (Atomicity, Consistency, Isolation, Durability) transactions to Apache Spark and big data workloads.

```

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = (SparkSession.builder\
         .appName('delta-lake-transformation')\
         .config('spark.jars.packages','io.delta:delta-spark_2.12:3.2.0')\
         .config('spark.sql.extensions','io.delta.sql.DeltaSparkSessionExtension')\
         .config('spark.sql.catalog.spark_catalog','org.apache.spark.sql.delta.catalog.DeltaCatalog')\
         .config('spark.sql.catalogImplementation', 'hive')\
         .getOrCreate())
 ```

### (A) Updating data to fix errors
You can update data that matches a predicate in a Delta table. For example, in a table named people10m or a path at /tmp/delta/people-10m, to change an abbreviation in the gender column from M or F to Male or Female, you can run the following:
```SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
```

```python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)
```


### (B) Deleting user-related data:
You can remove data that matches a predicate from a Delta table. For instance, in a table named people10m or a path at /tmp/delta/people-10m, to delete all rows corresponding to people with a value in the birthDate column from before 1955, you can run the following:


```SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
```

```python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')


### (C) Upsert into a table using merge

You can upsert data from a source table, view, or DataFrame into a target Delta table by using the MERGE SQL operation. Delta Lake supports inserts, updates and deletes in MERGE, and it supports extended syntax beyond the SQL standards to facilitate advanced use cases.

Suppose you have a source table named people10mupdates or a source path at /tmp/delta/people-10m-updates that contains new data for a target table named people10m or a target path at /tmp/delta/people-10m. Some of these new records may already be present in the target data. To merge the new data, you want to update rows where the person’s id is already present and insert the new rows where no matching id is present. You can run the following:

```SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )
  ```


```python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()
```

**Modify all unmatched rows using merge**

Note

WHEN NOT MATCHED BY SOURCE clauses are supported by the Scala, Python and Java Delta Lake APIs in Delta 2.3 and above. SQL is supported in Delta 2.4 and above.

You can use the WHEN NOT MATCHED BY SOURCE clause to UPDATE or DELETE records in the target table that do not have corresponding records in the source table. We recommend adding an optional conditional clause to avoid fully rewriting the target table.

The following code example shows the basic syntax of using this for deletes, overwriting the target table with the contents of the source table and deleting unmatched records in the target table.

```python
(targetDeltaTable
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
```

In summary, this snippet performs an upsert operation on a Delta table, updating existing rows where the `id` matches and inserting new rows where there is no match. This is a common operation for keeping a Delta table in sync with a source DataFrame.

In the context of a Delta Lake merge operation using PySpark, **the source is the DataFrame that contains the new data you want to merge into the existing Delta table, while the target is the existing Delta table where the merge operation will be applied.**

Here is a summary of the roles:

Target: The existing Delta table (ptbl) that you want to update or insert into.
Source: The new data (btbl) represented as a DataFrame that you want to merge into the target Delta table.

### (D) Auditing Data changes with operation history
 
```python
deltapath = '/path/to/delta_table'
deltaTable = DeltaTable.forPath(spark,deltapath) # load delta table

deltaTable.history().columns
['version','timestamp','userId','userName','operation','operationParameters', 'job','notebook','clusterId','readVersion','isolationLevel','isBlindAppend',
 'operationMetrics','userMetadata','engineInfo']

deltaTable.history().select('version','timestamp','operation','operationParameters').show(truncate=False)

+-------+-----------------------+---------+------------------------------------------+
|version|timestamp              |operation|operationParameters                       |
+-------+-----------------------+---------+------------------------------------------+
|3      |2024-07-24 23:34:20.892|UPDATE   |{predicate -> ["(origin#8508 = LGA)"]}    |
|2      |2024-07-24 21:38:53.541|WRITE    |{mode -> Append, partitionBy -> []}       |
|1      |2024-07-22 22:04:33.685|WRITE    |{mode -> Overwrite, partitionBy -> []}    |
|0      |2024-07-22 18:04:28.729|WRITE    |{mode -> ErrorIfExists, partitionBy -> []}|
+-------+-----------------------+---------+------------------------------------------+

Note that the operatipn and operationParameters that are useful for auditing the changes


### (E) Querying Previous snapshots of a table with time travel

We can query previous versioned snapshots of a table by using **the DataFrameReader options "versionAsOf" and "timestampAsOf".**

```python

#In Python

spark.read.format('delta').option("timestampAsOf","2024-07-22").load(delta_dir)
spark.read.format('delta').option("versionAsOf","2").load(delta_dir)

````

This is useful in a variety of situations ,such as:

- Reproducing machine learning experiments abd reperts by rerunning the job on a specfic table version.
- Comparing the data changes between different versions for auditting.
- Rolling back incorrect changes by reading a previous snpshots as a DataFrame and overwriting the table with it.
 


