In [0]:
#Create dataframe
df = spark.createDataFrame([(1, "John", "Cambridge"), (2, "Smith", "Oxford"), (3, "Asher", "Liverpool")],["id", "name", "location"])
df.show()

In [0]:
#write to parquet format
df.write.format('parquet').save("/FileStore/tables/parquetdata/")

In [0]:
#write to delta
df.write.format('delta').save("/FileStore/tables/deltadata/")

If number of columns or the schema changes, you would be able to write to parquet or any other format but it would not work for delta. So delta is better, note to explicitly state delta in the command

In [0]:
#Create dataframe
df1 = spark.createDataFrame([(1, "John", "Cambridge", 123456789), (2, "Smith", "Oxford", 888888888), (3, "Asher", "Liverpool", 999999999)],["id", "name", "location", "contact"])
df1.show()

In [0]:
#overwrite parquet file specifying the mode
df1.write.mode("overwrite").format('parquet').save("/FileStore/tables/parquetdata/")

In [0]:
#overwrite to delta -- this is for schema enforcement
df1.write.mode("overwrite").format('delta').save("/FileStore/tables/deltadata/")
# this will detect mismatch in the data

In [0]:
#If you want to overwrite the data in delta without getting the schema mismatch error use .option("mergeSchema",True). This will forcefully overwrite the data
#this is to remove schema enforcement
df1.write.mode("overwrite").format('delta').option("mergeSchema", True).save("/FileStore/tables/deltadata/")
# Be sure to get the requirements if you want the schema to change

In [0]:
#to read the data from the location
spark.read.format("delta").load("/FileStore/tables/deltadata/").show()

In [0]:
#to append data to your delta lake
df.write.mode("append").format('delta').option("mergeSchema", True).save("/FileStore/tables/deltadata/")
#note this will keep adding to the df dataframe and contact will be null here as the df has no contact column

In [0]:
spark.read.format("delta").load("/FileStore/tables/deltadata/").show()

In [0]:
# to go back to a specific time before corrupt data or data you do not want in the table use the command
spark.read.format("delta").load("/FileStore/tables/deltadata/", timestampAsOf="2021-09-06 23:00:00").show()
# with this you should be able to restore back to a previous time.

In [0]:
# to view data at the very beginning of your processing, when you created the df, use
spark.read.format("delta").load("/FileStore/tables/deltadata/", versionAsOf=0).show()
#this helps for version control. e.g version 0 has 3 columns, version 1 has 4 columns. but be sure which version you want to restore

In [0]:
# to update a table, e.g replace null values to a dummy value, you need to create a delta table
from delta.tables import DeltaTable
dtable = DeltaTable.forPath(spark, "/FileStore/tables/deltadata/")
dtable.update("contact is null",{"contact":"111111"})

In [0]:
#convert the delta table to a delta frame
dtable.toDF().show()

In [0]:
#to update a specific value
from delta.tables import DeltaTable
dtable = DeltaTable.forPath(spark, "/FileStore/tables/deltadata/")
dtable.update("id=2",{"name":"'Sean'"}) #put string for a specific value in single quotes

In [0]:
dtable.toDF().show()

In [0]:
# to delete records
from delta.tables import DeltaTable
dtable = DeltaTable.forPath(spark, "/FileStore/tables/deltadata/")
dtable.delete("id=2")
#or dtable.delete("id in (1,2)")

In [0]:
dtable.toDF().show()

In [0]:
spark.read.format('delta').load("/FileStore/tables/deltadata/").show()

#to create delta table using sql

In [0]:
%sql
create table if not exists mydeltadata
using delta
location "/FileStore/tables/deltadata/"

In [0]:
%sql
select * from mydeltadata

id,name,location,contact
3,Asher,Liverpool,111111
3,Asher,Liverpool,111111
1,John,Cambridge,111111
1,John,Cambridge,111111
3,Asher,Liverpool,999999999
1,John,Cambridge,123456789
3,Asher,Liverpool,111111
1,John,Cambridge,111111


to rename table

In [0]:
%sql
alter table mydeltadata rename to my_delta_data_1

In [0]:
# to read json log
spark.read.format("json").load("/FileStore/tables/deltadata/_delta_log/00000000000000000001.json").show()

In [0]:
#Create dataframe for slow changing dimension type 1
df = spark.createDataFrame([(1, "John", "Cambridge"), (2, "Smith", "Oxford"), (3, "Asher", "Liverpool")],["id", "name", "location"])
df.show()

In [0]:
#this is capture history the first time
df.write.format('delta').save("/FileStore/tables/scdtarget/")

In [0]:
#note you do not need to import libs over again if you have already imported it
from delta.tables import DeltaTable
targetdata = DeltaTable.forPath(spark, "/FileStore/tables/scdtarget/")

In [0]:
targetdata.toDF().show()

In [0]:
# for example you get data the next day, john has move and a new record Jane has be added

df11 = spark.createDataFrame([(1, "John", "Manchester"), (4, "Jane", "London")], ["id", "name", "location"])

In [0]:
#using delta to implement scd1
df11.write.format('delta').save("/FileStore/tables/sourcedata/")

In [0]:
#view updated data in the source folder
updateddata = spark.read.format('delta').load("/FileStore/tables/sourcedata/")
updateddata.show()

In [0]:
# use merge command to update the records in the target data for scd
#in this example, you know John's address has changed, so you use the WhenMattchedUpdate(set={columnname}) and WhenNotMatchedInsert(values={columnnames}) command
from pyspark.sql.functions import *
from delta.tables import DeltaTable

targetdata = DeltaTable.forPath(spark, "/FileStore/tables/scdtarget/")
targetdata.alias("a").merge(updateddata.alias("b"),col("a.id") == col("b.id")).whenMatchedUpdate(set={"location":"b.location"}).whenNotMatchedInsert(values={"id":"b.id","name":"b.name","location":"b.location"}).execute()


In [0]:
#if you want to store your history for only 7 days use vacuum
targetdata.vacuum(200)
#by default, the history is 168 hours (24*7)

In [0]:
targetdata.toDF().show()