### **DAY 5 ‚Äì Delta Lake Advanced**

### Learn:

- Time travel (version history)
- MERGE operations (upserts)
- OPTIMIZE & ZORDER
- VACUUM for cleanup

### üõ†Ô∏è Tasks:

1. Implement incremental MERGE
2. Query historical versions
3. Optimize tables
4. Clean old files

# Time Travel

In [0]:
%sql
-- About 2019_nov delta table history
DESCRIBE HISTORY workspace.default.2019_nov;

We have only one version as of now that refer to table creation.
Let us insert a new record and verift history again.

In [0]:
df = spark.read.table("2019_nov") 

In [0]:
df.limit(5).display()

In [0]:
%sql

Insert into workspace.default.`2019_nov` 
values ("2019-11-01T00:00:00.000+00:00",'view',1,1,'electronics','brand1',100,1,'123456789');


Record inserted successfully

In [0]:
%sql
-- Lets check the history again
DESCRIBE HISTORY workspace.default.2019_nov;

New version is creaed after data insert

## Method 1 : Pyspark - Timestamp + Table

In [0]:
df1 = spark.read\
    .format("delta")\
    .option("versionasOf",0)\
    .table("2019_nov")
display(df1.count())

In [0]:
df1 = spark.read\
    .format("delta")\
    .option("versionasOf",1)\
    .table("2019_nov")
display(df1.count())

### Verifying path details to access table

In [0]:
%sql
-- to check the location
DESCRIBE DETAIL default.`2019_nov`;

- Hive Metastore managed tables:
Normally you‚Äôd see something like /user/hive/warehouse/default.db/<table_name>/. If it‚Äôs blank, it usually means:
- The table is registered in the metastore but hasn‚Äôt been written to yet.
- Or it‚Äôs a view, not a physical table. Views don‚Äôt have a storage location.

In [0]:
%sql
--- Check if it‚Äôs a view:
SHOW TABLES IN default;

In [0]:
%sql 
-- Check catalog type
DESCRIBE EXTENDED default.`2019_nov`;

Managed tables are stored under the workspace‚Äôs default warehouse directory:

/user/hive/warehouse/`<schema>`.db/`<table_name>`/

## Method 2 : SQL - version + Table

In [0]:
%sql
select count(*) from default.`2019_nov` version as of 0;

In [0]:
%sql
select count(*) from default.`2019_nov` version as of 1;

# MERGE operations (upserts)

MERGE operations (also called upserts) are one of the most powerful features of Delta Lake. 

They let you update existing rows and insert new rows in a single atomic operation, ensuring ACID guarantees.


üîπ Why MERGE?
- Prevents duplicate inserts.
- Ensures updates happen safely even with concurrent jobs.
- Ideal for slowly changing dimensions (SCD), CDC (change data capture), and deduplication.


üîπ Syntax (SQL)
```
MERGE INTO target_table AS t
USING source_table AS s
ON t.id = s.id
WHEN MATCHED THEN
  UPDATE SET t.value = s.value, t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
  INSERT (id, value, updated_at)
  VALUES (s.id, s.value, s.updated_at);
```
- ON ‚Üí defines the match condition (usually primary key).
- WHEN MATCHED ‚Üí update existing rows.
- WHEN NOT MATCHED ‚Üí insert new rows.

üîπ Syntax (pyspark)

With Tables:
```
from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, "default.customers")

deltaTable.alias("t").merge(
    updates_df.alias("s"),
    "t.customer_id = s.customer_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()
```
with Path:

```
from delta.tables import DeltaTable

deltaTable = DeltaTable.forpath(spark, "path")

deltaTable.alias("t").merge(
    updates_df.alias("s"),
    "t.customer_id = s.customer_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()
```


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

data = [(1, 'John', 20), (2, 'Mary', 24), (3, 'Bob', 22)]

df = spark.createDataFrame(data=data, schema=schema)
display(df) 

In [0]:
%sql
-- Creating a dummy Table 
create or replace table dummy_table (
  id int,
  name string, 
  age int
) using Delta

In [0]:
%sql
Describe Detail dummy_table

In [0]:
%sql
Describe history dummy_table

## Methon 1 - Spark SQl

- If we want to use spark sql approach for merge Both the Source and target should be tables

In [0]:
df.createOrReplaceTempView("source_view")

In [0]:
%sql
select * from source_view;

In [0]:
%sql
select *from dummy_table;

### Insert

In [0]:
%sql

Merge into dummy_table as t
using source_view as s
  on t.id = s.id
  when matched
then update set
t.name=s.name,
t.age=s.age
when not matched then
insert (id, name, age) values (id, name, age) 


In [0]:
%sql
select *from dummy_table;

### Update

In [0]:
data = [(1, 'John', 20), (2, 'Mary', 21), (3, 'Bob', 22),(4,'Ash',23)]

df = spark.createDataFrame(data=data, schema=schema)
display(df) 

In [0]:
df.createOrReplaceTempView("source_view")

In [0]:
%sql
select * from source_view;

In [0]:
%sql
select * from dummy_table;

In [0]:
%sql

Merge into dummy_table as t
using source_view as s
  on t.id = s.id
  when matched
then update set
t.name=s.name,
t.age=s.age
when not matched then
insert (id, name, age) values (id, name, age) 



In [0]:
%sql
select * from dummy_table;

## Method 2 - Pyspark

In [0]:
data = [(1, 'John', 20), (2, 'Mary', 21), (3, 'Bob', 22),(4,'Ash',23),(5,'Ani',24),(4,'Ash',23)]

df = spark.createDataFrame(data=data, schema=schema)
df = df.dropDuplicates()
display(df) 

In [0]:
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.dummy_table")
# this can also be done with forPath
deltaTable.alias("target")

In [0]:
deltaTable.alias("target").merge(
    source=df.alias("source"),
    condition = "target.id = source.id"
).whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()

In [0]:
%sql
select * from dummy_table;

In [0]:
spark.sql("OPTIMIZE `2019_nov` ZORDER BY (event_type, user_id)")

In [0]:
spark.sql("VACUUM `2019_nov` RETAIN 168 HOURS")