### Deep dive into Delta Lake table

In [0]:
from pyspark.sql import Row

data = [Row(id=1, name="Arun"), Row(id=2, name="Vinod")]
df = spark.createDataFrame(data)

df.display()


In [0]:
# datawarehouse data
df.write.format("delta").mode("overwrite").saveAsTable("inceptez_catalog.inputdb.tblemployee")
print("Delta table created")
# datalake
#df.write.format("delta").mode("overwrite").save("dbfs:/Volumes/empdata/empinfo")


In [0]:
df= spark.sql("select * from inceptez_catalog.inputdb.tblemployee")
df.display()

In [0]:
spark.sql("DESCRIBE EXTENDED inceptez_catalog.inputdb.tblemployee").display()

In [0]:
%sql
CREATE TABLE inceptez_catalog.inputdb.tblemployee1 (id INT, name STRING) USING DELTA;
INSERT INTO inceptez_catalog.inputdb.tblemployee1 VALUES (1, 'Arun'), (2, 'Vinod');

In [0]:
%sql
select * from inceptez_catalog.inputdb.tblemployee1

In [0]:
%sql
INSERT INTO inceptez_catalog.inputdb.tblemployee1 VALUES (4, 'Ramesh');

In [0]:
#using dataframe api insert the record into the table
new_rows = spark.createDataFrame([(4, "Ramesh")], ["id", "name"])
new_rows.write.format("delta").mode("append").saveAsTable("inceptez_catalog.inputdb.tblemployee")
spark.sql("select * from inceptez_catalog.inputdb.tblemployee").display()
spark.sql("select * from inceptez_catalog.inputdb.tblemployee1").display()


### Update record

In [0]:
%sql
update inceptez_catalog.inputdb.tblemployee1 set name = 'Vinod Kumar' where id = 2;
select * from inceptez_catalog.inputdb.tblemployee1

In [0]:
from delta.tables import DeltaTable

dt = DeltaTable.forName(spark, "inceptez_catalog.inputdb.tblemployee")
dt.update(condition='id = 2', set = {"`name`" : "Vinod Kumar"})
spark.sql("select * from inceptez_catalog.inputdb.tblemployee").display()


### Delete Operation

In [0]:
%sql
DELETE FROM inceptez_catalog.inputdb.tblemployee WHERE id = 4;
select * from inceptez_catalog.inputdb.tblemployee;

In [0]:
dt = DeltaTable.forName(spark, "inceptez_catalog.inputdb.tblemployee1")
dt.delete("id = 4")
spark.sql("select * from inceptez_catalog.inputdb.tblemployee1").display()

### Merge/Upsert Operation

In [0]:
updates_df = spark.createDataFrame([Row(id=2, name="Vinod Roy"), Row(id=3, name="Sachin")])
updates_df.display()

In [0]:
target = DeltaTable.forName(spark, "inceptez_catalog.inputdb.tblemployee")
updates_df = spark.createDataFrame([Row(id=2, name="Vinod Roy"), Row(id=3, name="Sachin")])

(target.alias("t")
 .merge(updates_df.alias("s"), "t.id = s.id")
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute())
print("Merge completed")

In [0]:
spark.sql("select * from inceptez_catalog.inputdb.tblemployee").display()

In [0]:
%sql
SELECT * FROM inceptez_catalog.inputdb.tblemployee1;

In [0]:
%sql

MERGE `inceptez_catalog.inputdb.tblemployee1` AS t
USING (SELECT 2 AS id, 'Vinod Roy' AS name UNION ALL SELECT 3, 'Sachin') AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

SELECT * FROM inceptez_catalog.inputdb.tblemployee1;