### Process on delta file

In [0]:
from pyspark.sql import Row

data = [Row(id=1, name="Arun"), Row(id=2, name="Vinod")]
df = spark.createDataFrame(data)
df.display()

In [0]:
# Store it as Delta file
df.write.format("delta").mode("overwrite").save("dbfs:/Volumes/inceptez_catalog/inputdb/employee/employee_delta")
print("Delta file created")

In [0]:
spark.sql("DESCRIBE HISTORY delta.`/Volumes/inceptez_catalog/inputdb/employee/employee_delta`").display()

In [0]:

# Insert by Appending with PySpark DataFrame API

# New rows to insert
new_data = [
    Row(id=3, name="Kumar"),
    Row(id=4, name="Anita")
]

new_df = spark.createDataFrame(new_data)

# Append to existing Delta dataset in the volume
new_df.write.format("delta") \
    .mode("append") \
    .save("dbfs:/Volumes/inceptez_catalog/inputdb/employee/employee_delta")
print("Data appended")	

In [0]:
### update record
from delta.tables import DeltaTable
from pyspark.sql import functions as F
# Load Delta table
delta_table = DeltaTable.forPath(spark, "/Volumes/inceptez_catalog/inputdb/employee/employee_delta")

# Update a single row where id = 2
delta_table.update(
    condition = "id = 2",
    set = { "name": F.lit("Vinod Kumar") }  # new value
)
print("Record updated")

df = spark.sql("select * from delta.`/Volumes/inceptez_catalog/inputdb/employee/employee_delta`")
df.display()



In [0]:
spark.sql("DESCRIBE HISTORY delta.`/Volumes/inceptez_catalog/inputdb/employee/employee_delta`").display()

### Delete record

In [0]:
# Delete a single row where id = 3
delta_table.delete(
    condition = "id = 3"
)
print("Deleted")

In [0]:
df = spark.sql("select * from delta.`/Volumes/inceptez_catalog/inputdb/employee/employee_delta`")
df.display()

In [0]:
spark.sql("DESCRIBE HISTORY delta.`/Volumes/inceptez_catalog/inputdb/employee/employee_delta`").display()

In [0]:
### Insert with Upsert (MERGE)

from delta.tables import DeltaTable

# New incoming data
updates_df = spark.createDataFrame([
    Row(id=2, name="Vinod Roy"),  # update existing row
    Row(id=7, name="Divya")         # insert new row
])

# Load the existing delta table from volume
delta_table = DeltaTable.forPath(spark, "/Volumes/inceptez_catalog/inputdb/employee/employee_delta")

# Perform upsert
delta_table.alias("t").merge(
    updates_df.alias("s"),
    "t.id = s.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()
print("Upserted")

In [0]:
spark.sql("DESCRIBE HISTORY delta.`/Volumes/inceptez_catalog/inputdb/employee/employee_delta`").display()