# Delta Lake: An Introduction to a High-Performance Data Management System

1. Create Delta tables by reading data stored in CSV file formats 
2. Create Delta tables by reading data stored in SQL Server (we will use PostgreSQL)
3. Create Delta tables by reading data stored in the NoSQL database (We will use MongoDB)
4. CRUD operations with Delta tables
    <ul><li>Read Data from Delta table</li>
    <li>Update rows in Delta table </li>
    <li>Delete rows in Delta table </li></ul>
5. Data and Metadata understanding for Delta tables
6. Select queries on Delta tables and SQL functions on Delta tables (or Lakehouse)
7. Versioning and time travel with delta lake 
8. Optimization features associated with Delta Lake
9. Create Lakehouse in Azure Data Lake Services or AWS S3 bucket

In [1]:
# First Load all the required library and also Start Spark Session
# Load all the required library
from pyspark.sql import SparkSession

In [2]:
#Start Spark Session
spark = SparkSession.builder.appName("DeltaLake") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")  \
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.postgresql:postgresql:42.5.4")  \
        .getOrCreate()
sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-057ce3d7-b6b2-434d-848b-1b8c29674bec;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.0.0 in central
	found io.delta#delta-storage;2.0.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found org.postgresql#postgresql;42.5.4 in central
	found org.checkerframework#checker-qual;3.5.0 in central
:: resolution report :: resolve 1951ms :: artifacts dl 158ms
	:: modules in us

1. Create Delta tables by reading data stored in CSV file formats 

In [3]:
csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("food.csv")

In [4]:
#Checking dataframe schema
csvdf.printSchema()

root
 |-- FOODNAME: string (nullable = true)
 |-- SCIENTIFICNAME: string (nullable = true)
 |-- GROUP: string (nullable = true)
 |-- SUBGROUP: string (nullable = true)



In [5]:
csvdf.write.format("delta").save("Lakehouse/food",mode='append')

                                                                                

2. Create Delta tables by reading data stored in SQL Server (we will use PostgreSQL)

In [8]:
postgredf = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://192.168.1.100:5432/postgres") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "public.employee_salary") \
    .option("user", "postgres") \
    .option("password", "postgres")\
    .load()

In [9]:
postgredf.printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- department_id: long (nullable = true)



In [10]:
postgredf.write.format("delta").save("Lakehouse/employeeSalary",mode='append')

                                                                                

3. Create Delta tables by reading data stored in the NoSQL database (We will use MongoDB)

In [11]:
mongodf = spark.read.format("mongo") \
    .option("uri", "mongodb://root:mongodb@192.168.1.100:27017/") \
    .option("database", "dataengineering") \
    .option("collection", "employee") \
    .load()

In [12]:
mongodf.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- salary: string (nullable = true)



In [13]:
mongodf.write.format("delta").save("Lakehouse/employee",mode='append')

                                                                                

4. CRUD operations with Delta tables

In [None]:
#Load delta file into DataFrame
deltafile = "Lakehouse/food"
deltadf = spark.read.format("delta").load(deltafile)

In [None]:
deltadf.show(n=10)

+--------------+--------------------+----------------+--------------------+
|      FOODNAME|      SCIENTIFICNAME|           GROUP|            SUBGROUP|
+--------------+--------------------+----------------+--------------------+
|      Angelica|    Angelica keiskei|Herbs and Spices|               Herbs|
| Savoy cabbage|Brassica oleracea...|      Vegetables|            Cabbages|
| Silver linden|      Tilia argentea|Herbs and Spices|               Herbs|
|          Kiwi| Actinidia chinensis|          Fruits|     Tropical fruits|
|Allium (Onion)|              Allium|      Vegetables|Onion-family vege...|
|  Garden onion|         Allium cepa|      Vegetables|Onion-family vege...|
|          Leek|       Allium porrum|      Vegetables|Onion-family vege...|
|        Garlic|      Allium sativum|Herbs and Spices|               Herbs|
|        Chives|Allium schoenoprasum|Herbs and Spices|               Herbs|
| Lemon verbena|   Aloysia triphylla|Herbs and Spices|               Herbs|
+-----------

In [3]:
from delta.tables import *
from pyspark.sql.functions import *

deltafile = "Lakehouse/food"

In [4]:
DeltaTable.isDeltaTable(spark, deltafile)

                                                                                

True

Update Foods with subgroup vegetables_updated where "Group = Vegetables"

In [21]:
deltaTable = DeltaTable.forPath(spark, deltafile)

In [22]:
deltaTable.update(
  condition = "GROUP = 'Vegetables'",
  set = { "SUBGROUP": "'vegetables_updated'" }
)

                                                                                

In [24]:
deltadf.filter("GROUP = 'Vegetables'").show(n=10)

                                                                                

+---------------+--------------------+----------+------------------+
|       FOODNAME|      SCIENTIFICNAME|     GROUP|          SUBGROUP|
+---------------+--------------------+----------+------------------+
|  Savoy cabbage|Brassica oleracea...|Vegetables|vegetables_updated|
| Allium (Onion)|              Allium|Vegetables|vegetables_updated|
|   Garden onion|         Allium cepa|Vegetables|vegetables_updated|
|           Leek|       Allium porrum|Vegetables|vegetables_updated|
|        Burdock|       Arctium lappa|Vegetables|vegetables_updated|
|      Asparagus|Asparagus officin...|Vegetables|vegetables_updated|
|    Common beet|       Beta vulgaris|Vegetables|vegetables_updated|
|Chinese mustard|     Brassica juncea|Vegetables|vegetables_updated|
|          Swede|      Brassica napus|Vegetables|vegetables_updated|
|           Rape|Brassica napus va...|Vegetables|vegetables_updated|
+---------------+--------------------+----------+------------------+
only showing top 10 rows



Delete records whose subgroup is Herbs

In [25]:
deltaTable.delete("SUBGROUP = 'Herbs'")

                                                                                

In [26]:
deltadf.filter("SUBGROUP = 'Herbs'").show(n=10)

                                                                                

+--------+--------------+-----+--------+
|FOODNAME|SCIENTIFICNAME|GROUP|SUBGROUP|
+--------+--------------+-----+--------+
+--------+--------------+-----+--------+



5. Data and Metadata understanding for Delta tables

Data Files 

![image.png](attachment:5c2d1f73-b5d0-4a01-9a92-922731aa5d8b.png)

Metadata files (inside _delta_log folder)

![image.png](attachment:76fe72e0-878a-4953-8335-10073bc685ea.png)

6. Select queries on Delta tables and SQL functions on Delta tables (or Lakehouse)

In [6]:
#Load delta file into DataFrame
deltafile = "Lakehouse/food"
deltadf = spark.read.format("delta").load(deltafile)

In [5]:
deltadf.createOrReplaceTempView("tempFood")

In [6]:
sqlContext.sql("SELECT count(*) FROM tempFood").show(n=5)

[Stage 8:>                                                          (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|     846|
+--------+



                                                                                

In [7]:
sqlContext.sql("SELECT DISTINCT(GROUP) FROM tempFood").show(n=5)



+--------------------+
|               GROUP|
+--------------------+
|Cereals and cerea...|
|        Baking goods|
|Coffee and coffee...|
|     Confectioneries|
|                Teas|
+--------------------+
only showing top 5 rows



                                                                                

In [8]:
sqlContext.sql("SELECT SUBGROUP,count(*) FROM tempFood GROUP BY SUBGROUP ORDER BY count(*) DESC").show(n=10)

                                                                                

+------------------+--------+
|          SUBGROUP|count(1)|
+------------------+--------+
|vegetables_updated|     130|
|            Fishes|     111|
|           Berries|      55|
|   Tropical fruits|      43|
|              Nuts|      27|
|            Spices|      23|
|          Mollusks|      18|
|           Cereals|      18|
|            Gourds|      17|
|     Oilseed crops|      16|
+------------------+--------+
only showing top 10 rows



7. Versioning and time travel with delta lake 

In [7]:
deltaTable = DeltaTable.forPath(spark, deltafile)
deltaTable.history().show(truncate=False)

+-------+-----------------------+------+--------+---------+-------------------------------------------+----+--------+---------+-----------+-----------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                        |job |notebook|clusterId|readVersion|isolationLevel   |isBlindAppend|operationMetrics                                                                                                                                                                                          |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+-------------------------------------------+----+--------+---------+-----------+-------------

In [8]:
deltaTable.haistory().select("version", "timestamp", "operation").show(truncate=False)

+-------+-----------------------+---------+
|version|timestamp              |operation|
+-------+-----------------------+---------+
|3      |2023-04-18 21:47:32.794|OPTIMIZE |
|2      |2023-04-16 22:07:32.114|DELETE   |
|1      |2023-04-16 22:04:55.009|UPDATE   |
|0      |2023-04-16 21:49:46.211|WRITE    |
+-------+-----------------------+---------+



In [20]:
spark.read.format("delta").option("versionAsOf", "0").load(deltafile).show()

                                                                                

+--------------+--------------------+----------------+--------------------+
|      FOODNAME|      SCIENTIFICNAME|           GROUP|            SUBGROUP|
+--------------+--------------------+----------------+--------------------+
|      Angelica|    Angelica keiskei|Herbs and Spices|               Herbs|
| Savoy cabbage|Brassica oleracea...|      Vegetables|            Cabbages|
| Silver linden|      Tilia argentea|Herbs and Spices|               Herbs|
|          Kiwi| Actinidia chinensis|          Fruits|     Tropical fruits|
|Allium (Onion)|              Allium|      Vegetables|Onion-family vege...|
|  Garden onion|         Allium cepa|      Vegetables|Onion-family vege...|
|          Leek|       Allium porrum|      Vegetables|Onion-family vege...|
|        Garlic|      Allium sativum|Herbs and Spices|               Herbs|
|        Chives|Allium schoenoprasum|Herbs and Spices|               Herbs|
| Lemon verbena|   Aloysia triphylla|Herbs and Spices|               Herbs|
|    Cashew 

In [21]:
spark.read.format("delta").option("versionAsOf", "1").load(deltafile).show()

                                                                                

+--------------+--------------------+----------------+------------------+
|      FOODNAME|      SCIENTIFICNAME|           GROUP|          SUBGROUP|
+--------------+--------------------+----------------+------------------+
|      Angelica|    Angelica keiskei|Herbs and Spices|             Herbs|
| Savoy cabbage|Brassica oleracea...|      Vegetables|vegetables_updated|
| Silver linden|      Tilia argentea|Herbs and Spices|             Herbs|
|          Kiwi| Actinidia chinensis|          Fruits|   Tropical fruits|
|Allium (Onion)|              Allium|      Vegetables|vegetables_updated|
|  Garden onion|         Allium cepa|      Vegetables|vegetables_updated|
|          Leek|       Allium porrum|      Vegetables|vegetables_updated|
|        Garlic|      Allium sativum|Herbs and Spices|             Herbs|
|        Chives|Allium schoenoprasum|Herbs and Spices|             Herbs|
| Lemon verbena|   Aloysia triphylla|Herbs and Spices|             Herbs|
|    Cashew nut|Anacardium occide...| 

In [22]:
spark.read.format("delta").option("timestampAsOf", "2023-04-18").load(deltafile).show()

                                                                                

+---------------+--------------------+--------------------+------------------+
|       FOODNAME|      SCIENTIFICNAME|               GROUP|          SUBGROUP|
+---------------+--------------------+--------------------+------------------+
|  Savoy cabbage|Brassica oleracea...|          Vegetables|vegetables_updated|
|           Kiwi| Actinidia chinensis|              Fruits|   Tropical fruits|
| Allium (Onion)|              Allium|          Vegetables|vegetables_updated|
|   Garden onion|         Allium cepa|          Vegetables|vegetables_updated|
|           Leek|       Allium porrum|          Vegetables|vegetables_updated|
|     Cashew nut|Anacardium occide...|                Nuts|              Nuts|
|      Pineapple|      Ananas comosus|              Fruits|   Tropical fruits|
|  Custard apple|   Annona reticulata|              Fruits|   Tropical fruits|
|    Wild celery|    Apium graveolens|    Herbs and Spices|            Spices|
|         Peanut|    Arachis hypogaea|              

In [29]:
spark.read.format("delta").option("timestampAsOf", "2023-04-16 22:00:00").load(deltafile).show()

                                                                                

+--------------+--------------------+----------------+--------------------+
|      FOODNAME|      SCIENTIFICNAME|           GROUP|            SUBGROUP|
+--------------+--------------------+----------------+--------------------+
|      Angelica|    Angelica keiskei|Herbs and Spices|               Herbs|
| Savoy cabbage|Brassica oleracea...|      Vegetables|            Cabbages|
| Silver linden|      Tilia argentea|Herbs and Spices|               Herbs|
|          Kiwi| Actinidia chinensis|          Fruits|     Tropical fruits|
|Allium (Onion)|              Allium|      Vegetables|Onion-family vege...|
|  Garden onion|         Allium cepa|      Vegetables|Onion-family vege...|
|          Leek|       Allium porrum|      Vegetables|Onion-family vege...|
|        Garlic|      Allium sativum|Herbs and Spices|               Herbs|
|        Chives|Allium schoenoprasum|Herbs and Spices|               Herbs|
| Lemon verbena|   Aloysia triphylla|Herbs and Spices|               Herbs|
|    Cashew 

8. Optimization features associated with Delta Lake

In [9]:
deltaTable = DeltaTable.forPath(spark, deltafile)

Optimize Compaction

In [10]:
deltaTable.optimize().executeCompaction()

                                                                                

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint>]

Optimize Z Order

In [11]:
deltaTable.optimize().executeZOrderBy('FOODNAME')

                                                                                

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint>]

![image.png](attachment:687b4f54-ada1-4b60-bf62-97451ca7f0e9.png)

![image.png](attachment:e9cc5525-cee0-4a31-ace4-c58526e1aae6.png)

Vaccum 

In [9]:
deltaTable.vacuum()



Deleted 3 files and directories in a total of 1 directories.


                                                                                

DataFrame[]

![image.png](attachment:c4825979-1edd-4110-8262-c79a7f14bdc9.png)

![image.png](attachment:2b0415f2-2605-4c80-ad00-dd8b4e312d46.png)

In [14]:
deltaTable.history().select("version", "timestamp", "operation").show(truncate=False)

+-------+-----------------------+---------+
|version|timestamp              |operation|
+-------+-----------------------+---------+
|3      |2023-04-18 21:47:32.794|OPTIMIZE |
|2      |2023-04-16 22:07:32.114|DELETE   |
|1      |2023-04-16 22:04:55.009|UPDATE   |
|0      |2023-04-16 21:49:46.211|WRITE    |
+-------+-----------------------+---------+



![image.png](attachment:37d8be9c-d6f6-4b22-924b-53e112264fa9.png)

9. Create Lakehouse in Azure Data Lake Services or AWS S3 bucket
<ul><li>With Azure Data Lake services </li></ul>

In [None]:
TableEndPoint = "/Food"
DeltaTableLocationInADLS = "abfss://containerName@storageName.dfs.core.windows.net/FolderName"
StorageAccountName = "storageName"
StorageAccountKey = "******************************"

deltaFile = DeltaTableLocationInADLS + TableEndPoint 

#Setting Spark Context 
spark.conf.set("fs.azure.account.key." + StorageAccountName + ".dfs.core.windows.net",StorageAccountKey)

#Writing from Postgre Dataframe to cloud Data Lake in delta format 
postgredf.write.format("delta").save(deltaFile,mode='append')