In [0]:
%fs 
rm -r dbfs:/FileStore/Delta_Data

In [0]:
%fs
rm -r dbfs:/FileStore/parquet_Data

In [0]:
%sql 
drop Table  Delta_Data

In [0]:
%sql 
drop Table  parquet_Data

In [0]:
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as f
from pyspark.sql import Row
import pyspark.sql.types as T
import random
spark=SparkSession.builder.appName("deltalakedemo").getOrCreate()

In [0]:
df_row=Row("Item_ID","Item_Name","Item_type","Price") #Creating header
row1=df_row(110,"IPhone3","Mobile",300.00)
row2=df_row(210,"Airpods","Accessories",100.00)
row3=df_row(310,"Macbook","Laptoop",100.00)
row4=df_row(120,"IPhone4","Mobile",300.00)
row_seq=[row1,row2,row3,row4]
df=spark.createDataFrame(data=row_seq)
df.printSchema()

root
 |-- Item_ID: long (nullable = true)
 |-- Item_Name: string (nullable = true)
 |-- Item_type: string (nullable = true)
 |-- Price: double (nullable = true)



In [0]:
display(df)

Item_ID,Item_Name,Item_type,Price
110,IPhone3,Mobile,300.0
210,Airpods,Accessories,100.0
310,Macbook,Laptoop,100.0
120,IPhone4,Mobile,300.0


# Writing data to parquet and Delta

In [0]:
parquetdatapath="/FileStore/parquet_Data"
deltadatapath="/FileStore/Delta_Data"
#Writing to Parquet
(df.write
    .format("parquet") #Format is Parquet
    .mode("overwrite")
    .partitionBy("Item_type")
    .save(parquetdatapath))
#Writing to Delta
(df.write
    .format("delta") #Format is delta
    .mode("overwrite")
    .partitionBy("Item_type")
    .save(deltadatapath))

# Creating tables for Parquet and delta

In [0]:
%sql
create Table if not exists delta_data
using delta 
options (path="/FileStore/Delta_Data");


In [0]:
%sql
create Table if not exists parquet_data
using parquet 
options (path="/FileStore/parquet_Data");

In [0]:
# display(spark.read.format("parquet").load(parquetdatapath))
# display(spark.read.format("delta").load(deltadatapath))

In [0]:
%sql 
select * from delta_data;

Item_ID,Item_Name,Item_type,Price
110,IPhone3,Mobile,300.0
120,IPhone4,Mobile,300.0
210,Airpods,Accessories,100.0
310,Macbook,Laptoop,100.0


In [0]:
%sql
msck repair table parquet_data

In [0]:
%sql 
select * from parquet_data;

Item_ID,Item_Name,Price,Item_type
210,Airpods,100.0,Accessories
310,Macbook,100.0,Laptoop
110,IPhone3,300.0,Mobile
120,IPhone4,300.0,Mobile


# Audit History
Delta lake consits of data files(parquest format) and a  transaction logs( <_delta_log>  .json format) where as the Parquet format will not have transactionn log

In [0]:
display(dbutils.fs.ls(parquetdatapath))

path,name,size,modificationTime
dbfs:/FileStore/parquet_Data/Item_type=Accessories/,Item_type=Accessories/,0,0
dbfs:/FileStore/parquet_Data/Item_type=Laptoop/,Item_type=Laptoop/,0,0
dbfs:/FileStore/parquet_Data/Item_type=Mobile/,Item_type=Mobile/,0,0
dbfs:/FileStore/parquet_Data/_SUCCESS,_SUCCESS,0,1649627362000


In [0]:
display(dbutils.fs.ls(deltadatapath))

path,name,size,modificationTime
dbfs:/FileStore/Delta_Data/Item_type=Accessories/,Item_type=Accessories/,0,0
dbfs:/FileStore/Delta_Data/Item_type=Laptoop/,Item_type=Laptoop/,0,0
dbfs:/FileStore/Delta_Data/Item_type=Mobile/,Item_type=Mobile/,0,0
dbfs:/FileStore/Delta_Data/_delta_log/,_delta_log/,0,0


In [0]:
%fs 
ls dbfs:/FileStore/Delta_Data/_delta_log/

path,name,size,modificationTime
dbfs:/FileStore/Delta_Data/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1649707468000
dbfs:/FileStore/Delta_Data/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1649707468000
dbfs:/FileStore/Delta_Data/_delta_log/.s3-optimization-2,.s3-optimization-2,0,1649707469000
dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2048,1649707468000
dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000000.json,00000000000000000000.json,3227,1649707460000


In [0]:
%fs 
head dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000000.json

In [0]:
log_df=spark.read.json("dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000000.json")
display(log_df)

add,commitInfo,metaData,protocol
,,,"List(1, 2)"
,,"List(1649707451330, List(parquet), 8191ef87-7e8b-4be0-b3db-7d71c6104ff0, List(Item_type), {""type"":""struct"",""fields"":[{""name"":""Item_ID"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""Item_Name"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""Item_type"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""Price"",""type"":""double"",""nullable"":true,""metadata"":{}}]})",
"List(true, 1649707455000, List(Mobile), Item_type=Mobile/part-00001-602481ed-c077-4983-be5a-9825fa65b380.c000.snappy.parquet, 1139, {""numRecords"":1,""minValues"":{""Item_ID"":110,""Item_Name"":""IPhone3"",""Price"":300.0},""maxValues"":{""Item_ID"":110,""Item_Name"":""IPhone3"",""Price"":300.0},""nullCount"":{""Item_ID"":0,""Item_Name"":0,""Price"":0}}, List(1649707455000000, 268435456))",,,
"List(true, 1649707455000, List(Accessories), Item_type=Accessories/part-00003-946945ea-4e81-485b-a14b-5af232cd84ec.c000.snappy.parquet, 1139, {""numRecords"":1,""minValues"":{""Item_ID"":210,""Item_Name"":""Airpods"",""Price"":100.0},""maxValues"":{""Item_ID"":210,""Item_Name"":""Airpods"",""Price"":100.0},""nullCount"":{""Item_ID"":0,""Item_Name"":0,""Price"":0}}, List(1649707455000001, 268435456))",,,
"List(true, 1649707455000, List(Laptoop), Item_type=Laptoop/part-00005-e04d084b-462e-485b-93d2-355ae65c5443.c000.snappy.parquet, 1139, {""numRecords"":1,""minValues"":{""Item_ID"":310,""Item_Name"":""Macbook"",""Price"":100.0},""maxValues"":{""Item_ID"":310,""Item_Name"":""Macbook"",""Price"":100.0},""nullCount"":{""Item_ID"":0,""Item_Name"":0,""Price"":0}}, List(1649707455000002, 268435456))",,,
"List(true, 1649707455000, List(Mobile), Item_type=Mobile/part-00007-6f9e5b06-3d77-47d3-b76b-ab41012f591a.c000.snappy.parquet, 1139, {""numRecords"":1,""minValues"":{""Item_ID"":120,""Item_Name"":""IPhone4"",""Price"":300.0},""maxValues"":{""Item_ID"":120,""Item_Name"":""IPhone4"",""Price"":300.0},""nullCount"":{""Item_ID"":0,""Item_Name"":0,""Price"":0}}, List(1649707455000003, 268435456))",,,
,"List(0411-194010-pos01v9h, Databricks-Runtime/10.4.x-scala2.12, false, WriteSerializable, List(2174247527423951), WRITE, List(4, 4556, 4), List(Overwrite, [""Item_type""]), 1649707458818, 2d14fbd2-69d2-4ce7-bb2b-fa4f17773cee, 4640847102564132, vkanamanthareddy1@gsu.edu)",,


# Schema Enforcement
Delta lake will not allow the schema's other than pre-defined as it is schema on write  where as parquet will allow adjust the schema as its schema on read which leads to currupt data

In [0]:
df_row1=Row("Item_ID","Item_Name","Item_type","Price") #Creating header
row1=df_row1("110","IPhone","Mobile",300.00)
row_seq1=[row1]
df1=spark.createDataFrame(row_seq1)
df1.printSchema()

root
 |-- Item_ID: string (nullable = true)
 |-- Item_Name: string (nullable = true)
 |-- Item_type: string (nullable = true)
 |-- Price: double (nullable = true)



In [0]:
(df1.write
    .format("parquet")
    .mode("append")
    .partitionBy("Item_type")
    .save(parquetdatapath))


In [0]:
# display(spark.read.format("parquet").load(parquetdatapath))

In [0]:
(df1.write
    .format("delta")
    .mode("append")
    .partitionBy("Item_type")
    .save(deltadatapath))

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-2174247527423969>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m (df1.write
[0m[1;32m      2[0m     [0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"delta"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"append"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m     [0;34m.[0m[0mpartitionBy[0m[0;34m([0m[0;34m"Item_type"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m     .save(deltadatapath))

[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36msave[0;34m(self, path, format, mode, partitionBy, **options)[0m
[1;32m    738[0m             [0mself[0m[0;34m.[0m[0m_jwrite[0m[0;34m.[0m[0msave[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m    739[0m         [0

# Schema Evolution

In [0]:
df_row2=Row("Item_ID","Item_Name","Item_type","Price","Made_In") #Creating header
row1=df_row2(130,"IPhone4","Mobile",300.00,"Chaina")
row_seq2=[row1]
df2=spark.createDataFrame(row_seq2)
print(df2.printSchema())
display(df2)

root
 |-- Item_ID: long (nullable = true)
 |-- Item_Name: string (nullable = true)
 |-- Item_type: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Made_In: string (nullable = true)

None


Item_ID,Item_Name,Item_type,Price,Made_In
130,IPhone4,Mobile,300.0,Chaina


In [0]:
(df2.write
    .format("parquet")
    .mode("append")
    .partitionBy("Item_type")
    .save(parquetdatapath))

In [0]:
(df2.write
    .format("delta")
    .mode("append")
    .option("mergeSchema",True)
    .partitionBy("Item_type")
    .save(deltadatapath))

In [0]:
display(spark.read.format("delta").load(deltadatapath))


Item_ID,Item_Name,Item_type,Price,Made_In
130,IPhone4,Mobile,300.0,Chaina
110,IPhone3,Mobile,300.0,
120,IPhone4,Mobile,300.0,
210,Airpods,Accessories,100.0,
310,Macbook,Laptoop,100.0,


In [0]:
display(spark.read.format("parquet").load(parquetdatapath))

# DML Operations-Upsert-SCD-(Slowly changing dimensions)-Type 1&2

In [0]:
df_row3=Row("Item_ID","Item_Name","Item_type","Price","Made_In") #Creating header
row1=df_row3(110,"IPhone4","Mobile",250.00,"China")
row2=df_row3(320,"Mac Book Pro","Mobile",1000.00,"Japan")
row_seq3=[row1,row2]
df3=spark.createDataFrame(row_seq3)
print(df3.printSchema())
display(df3)

root
 |-- Item_ID: long (nullable = true)
 |-- Item_Name: string (nullable = true)
 |-- Item_type: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Made_In: string (nullable = true)

None


Item_ID,Item_Name,Item_type,Price,Made_In
110,IPhone4,Mobile,250.0,China
320,Mac Book Pro,Mobile,1000.0,Japan


In [0]:
df3.createOrReplaceTempView("Delta_data_upsert")

In [0]:
%sql
select * from Delta_data_upsert

Item_ID,Item_Name,Item_type,Price,Made_In
110,IPhone4,Mobile,250.0,China
320,Mac Book Pro,Mobile,1000.0,Japan


In [0]:
%sql
merge into Delta_data as a
using Delta_data_upsert as b
on a.Item_id=b.Item_id
when matched then
update set *
when not matched then
insert *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,1,0,1


# Delta table after merging

In [0]:
%sql
select * from Delta_data

Item_ID,Item_Name,Item_type,Price,Made_In
110,IPhone4,Mobile,250.0,China
320,Mac Book Pro,Mobile,1000.0,Japan
130,IPhone4,Mobile,300.0,Chaina
210,Airpods,Accessories,100.0,
310,Macbook,Laptoop,100.0,
120,IPhone4,Mobile,300.0,


In [0]:
%fs 
ls dbfs:/FileStore/Delta_Data/_delta_log/

path,name,size,modificationTime
dbfs:/FileStore/Delta_Data/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1649707468000
dbfs:/FileStore/Delta_Data/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1649707468000
dbfs:/FileStore/Delta_Data/_delta_log/.s3-optimization-2,.s3-optimization-2,0,1649707469000
dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2048,1649707468000
dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000000.json,00000000000000000000.json,3227,1649707460000
dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000001.crc,00000000000000000001.crc,2125,1649707956000
dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000001.json,00000000000000000001.json,1738,1649707952000
dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000002.crc,00000000000000000002.crc,2125,1649708138000
dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000002.json,00000000000000000002.json,1782,1649708134000


In [0]:
log_df=spark.read.json("dbfs:/FileStore/Delta_Data/_delta_log/00000000000000000002.json")
display(log_df)

add,commitInfo,remove
,,"List(true, 1649708133319, true, List(Mobile), Item_type=Mobile/part-00001-602481ed-c077-4983-be5a-9825fa65b380.c000.snappy.parquet, 1139, List(1649707455000000, 268435456))"
"List(true, 1649708133000, List(Mobile), Item_type=Mobile/part-00000-a1711aba-f690-4be5-9f97-a8b7cb7f75c5.c000.snappy.parquet, 1407, {""numRecords"":2,""minValues"":{""Item_ID"":110,""Item_Name"":""IPhone4"",""Price"":250.0,""Made_In"":""China""},""maxValues"":{""Item_ID"":320,""Item_Name"":""Mac Book Pro"",""Price"":1000.0,""Made_In"":""Japan""},""nullCount"":{""Item_ID"":0,""Item_Name"":0,""Price"":0,""Made_In"":0}}, List(1649708133000000, 268435456))",,
,"List(0411-194010-pos01v9h, Databricks-Runtime/10.4.x-scala2.12, false, WriteSerializable, List(2174247527423951), MERGE, List(8598, 2, 2, 0, 1, 1, 0, 0, 1, 1, 3499, 4873), List([{""actionType"":""update""}], [{""actionType"":""insert""}], (a.Item_id = b.Item_id)), 1, 1649708133506, 71ee0ec6-aec2-497d-b897-1acff72761d8, 4640847102564132, vkanamanthareddy1@gsu.edu)",


#History Table

In [0]:
%sql
describe history delta_data

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2022-04-11T20:15:34.000+0000,4640847102564132,vkanamanthareddy1@gsu.edu,MERGE,"Map(predicate -> (a.Item_id = b.Item_id), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(2174247527423951),0411-194010-pos01v9h,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, executionTimeMs -> 8598, numTargetRowsInserted -> 1, scanTimeMs -> 4873, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 3499)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-04-11T20:12:32.000+0000,4640847102564132,vkanamanthareddy1@gsu.edu,WRITE,"Map(mode -> Append, partitionBy -> [""Item_type""])",,List(2174247527423951),0411-194010-pos01v9h,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1394)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-04-11T20:04:20.000+0000,4640847102564132,vkanamanthareddy1@gsu.edu,WRITE,"Map(mode -> Overwrite, partitionBy -> [""Item_type""])",,List(2174247527423951),0411-194010-pos01v9h,,WriteSerializable,False,"Map(numFiles -> 4, numOutputRows -> 4, numOutputBytes -> 4556)",,Databricks-Runtime/10.4.x-scala2.12


In [0]:
%sql 
select 
version,
operationMetrics.numOutputRows,
operationMetrics.numTargetRowsCopied, 
operationMetrics.numTargetRowsDeleted,
operationMetrics.numTargetRowsInserted,
operationMetrics.numTargetRowsUpdated
from (describe history delta_data)

version,numOutputRows,numTargetRowsCopied,numTargetRowsDeleted,numTargetRowsInserted,numTargetRowsUpdated
2,2,0.0,0.0,1.0,1.0
1,1,,,,
0,4,,,,


# Time Travel

In [0]:
display(spark.read.format("delta").option("versionAsof",2).load(deltadatapath))

Item_ID,Item_Name,Item_type,Price,Made_In
110,IPhone4,Mobile,250.0,China
320,Mac Book Pro,Mobile,1000.0,Japan
130,IPhone4,Mobile,300.0,Chaina
210,Airpods,Accessories,100.0,
310,Macbook,Laptoop,100.0,
120,IPhone4,Mobile,300.0,
