# Delta Lake - Part1
This notebook covers Read, Write, Adding column, Adding Row, Conditional Delete, Conditional update, Upsert, Time Travel and History

## Adding delta jar in Jupyter

In [55]:
%AddJar https://repo1.maven.org/maven2/io/delta/delta-core_2.11/0.4.0/delta-core_2.11-0.4.0.jar


Using cached version of delta-core_2.11-0.4.0.jar


In [56]:
val deltaPath = "/home/jovyan/Ezhil/data/deltalake/"

deltaPath = /home/jovyan/Ezhil/data/deltalake/


/home/jovyan/Ezhil/data/deltalake/

## Reading CSV file

In [57]:
val salesRecordDf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("100SalesRecords.csv")
println("Count is "+ salesRecordDf.count())
salesRecordDf.printSchema

Count is 100
root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Sales_Channel: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Order_ID: integer (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Unit_Cost: double (nullable = true)
 |-- Total_Revenue: double (nullable = true)
 |-- Total_Cost: double (nullable = true)
 |-- Total_Profit: double (nullable = true)



salesRecordDf = [Region: string, Country: string ... 12 more fields]


[Region: string, Country: string ... 12 more fields]

## Reading and Writing Delta table

In [107]:
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{SaveMode, SparkSession, DataFrame}

def writeDeltaTable(df: DataFrame, tableName: String,saveMode: SaveMode): Unit = {
    df
      .write
      .format("delta")
      .mode(saveMode)
      .option("mergeSchema", "true")
      .save(deltaPath + tableName)
  }

def readDeltaTable(tableName: String): DataFrame = {
   val deltaDf = spark
      .read
      .format("delta")
      .load(deltaPath + tableName)
     deltaDf
  }

lastException: Throwable = null
writeDeltaTable: (df: org.apache.spark.sql.DataFrame, tableName: String, saveMode: org.apache.spark.sql.SaveMode)Unit
readDeltaTable: (tableName: String)org.apache.spark.sql.DataFrame


In [60]:
writeDeltaTable(df,"sales_order",SaveMode.Overwrite)

In [61]:
val salesOrderDf = readDeltaTable("sales_order")
print("Delta count for sales order " + salesOrderDf.count())
salesOrderDf.show(5)

Delta count for sales order 100+--------------------+--------------------+---------------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|      Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID|Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+--------------------+---------------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|Australia and Oce...|              Tuvalu|      Baby Food|      Offline|             H| 5/28/2010|669165933|6/27/2010|      9925|    255.28|   159.42|    2533654.0| 1582243.5|    951410.5|
|Central America a...|             Grenada|         Cereal|       Online|             C| 8/22/2012|963881480|9/15/2012|      2804|     205.7|   117.11|     576782.8| 328376.44|   248406.36|
|              Euro

salesOrderDf = [Region: string, Country: string ... 12 more fields]


[Region: string, Country: string ... 12 more fields]

## Adding new column to existing Delta table

In [64]:
def addColumn(data: DataFrame,tableName: String): Unit = {
  data
      .write
      .format("delta")
      .mode("append")
      .option("mergeSchema", "true")
      .save(deltaPath + tableName)
  }

addColumn: (data: org.apache.spark.sql.DataFrame, tableName: String)Unit


In [69]:
import org.apache.spark.sql.functions._
val newSalesOrderDf = salesOrderDf.withColumn("create_time",lit(current_timestamp()))

newSalesOrderDf = [Region: string, Country: string ... 13 more fields]


[Region: string, Country: string ... 13 more fields]

In [72]:
println(newSalesOrderDf.count())
newSalesOrderDf.show(5,false)

100
+---------------------------------+---------------------+---------------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+-----------------------+
|Region                           |Country              |Item_Type      |Sales_Channel|Order_Priority|Order_Date|Order_ID |Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|create_time            |
+---------------------------------+---------------------+---------------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+-----------------------+
|Australia and Oceania            |Tuvalu               |Baby Food      |Offline      |H             |5/28/2010 |669165933|6/27/2010|9925      |255.28    |159.42   |2533654.0    |1582243.5 |951410.5    |2019-11-15 10:43:58.421|
|Central America and the Caribbean|Grenada              |Cereal         |Online     

## Adding new row in existing delta table

In [112]:
import java.text.SimpleDateFormat
val sdf = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss")
val sdf1 = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss")
val create_time = sdf1.format(sdf.parse("2019-11-15 10:43:58"))

val newRowDf = Seq(("Asia","India","Alcohol","Offline","H","11/14/2019",779165933,"11/14/2019",100,200.10,150.10,20010.0,15010.0,5000.0,create_time))
.toDF("Region","Country","Item_Type","Sales_Channel","Order_Priority","Order_Date","Order_ID","Ship_Date","Units_Sold","Unit_Price","Unit_Cost","Total_Revenue","Total_Cost","Total_Profit","create_time")

newRowDf.show()
writeDeltaTable(newRowDf,"sales_order",SaveMode.Append)
var updatedSalesOrderDf = readDeltaTable("sales_order") 
println(updatedSalesOrderDf.count())
updatedSalesOrderDf.show(5)


+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+-------------------+
|Region|Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|        create_time|
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+-------------------+
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       100|     200.1|    150.1|      20010.0|   15010.0|      5000.0|2019-11-15 10:43:58|
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+-------------------+

103
+--------------------+--------------------+---------------+-------------+--------------+----------+------

sdf = java.text.SimpleDateFormat@cc43d9ee
sdf1 = java.text.SimpleDateFormat@cc43d9ee
create_time = 2019-11-15 10:43:58
newRowDf = [Region: string, Country: string ... 13 more fields]
updatedSalesOrderDf = [Region: string, Country: string ... 13 more fields]


[Region: string, Country: string ... 13 more fields]

In [115]:
/*Same records appended thrice, so count became 103*/
updatedSalesOrderDf.filter("Region = 'Asia' and Country = 'India'").show()
updatedSalesOrderDf.filter("Country = 'India'").show()

+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+-------------------+
|Region|Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|        create_time|
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+-------------------+
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       100|     200.1|    150.1|      20010.0|   15010.0|      5000.0|2019-11-15 10:43:58|
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       100|     200.1|    150.1|      20010.0|   15010.0|      5000.0|2019-11-15 10:43:58|
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       100|     200.1| 

## Conditional Update in Delta table (Without Overwrite, affects directly in delta table)

In [2]:
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaPath + "sales_order")

// Update Country India -> Pakistan
// predicate and update expressions using SQL formatted string
deltaTable.updateExpr(
  "Country = 'India'",
  Map("Country" -> "'Pakistan'"))

deltaTable = io.delta.tables.DeltaTable@7085a50f


io.delta.tables.DeltaTable@7085a50f

In [5]:
updatedSalesOrderDf = readDeltaTable("sales_order") 
println(updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'Pakistan'").count())
updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'Pakistan'").show()

3
+------+--------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+-------------------+
|Region| Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|        create_time|
+------+--------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+-------------------+
|  Asia|Pakistan|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       100|     200.1|    150.1|      20010.0|   15010.0|      5000.0|2019-11-15 10:43:58|
|  Asia|Pakistan|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       100|     200.1|    150.1|      20010.0|   15010.0|      5000.0|2019-11-15 10:43:58|
|  Asia|Pakistan|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       100|    

updatedSalesOrderDf = [Region: string, Country: string ... 13 more fields]


[Region: string, Country: string ... 13 more fields]

In [6]:
// Update Country Pakistan -> India
// predicate using Spark SQL functions and implicits
deltaTable.update(                
  expr(" Region= 'Asia' and Country = 'Pakistan' "),
  Map("Country" -> lit("India")))

In [7]:
println(updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'Pakistan'").count())
updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'Pakistan'").show()

0
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+-----------+
|Region|Country|Item_Type|Sales_Channel|Order_Priority|Order_Date|Order_ID|Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|create_time|
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+-----------+
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+-----------+



In [14]:
println(updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").count())
updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").show()

3
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+------------------+------------+-------------------+
|Region|Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|        Total_Cost|Total_Profit|        create_time|
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+------------------+------------+-------------------+
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       101|     200.1|    150.1|      20210.1|15160.099999999999|      5000.0|2019-11-15 10:43:58|
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       101|     200.1|    150.1|      20210.1|15160.099999999999|      5000.0|2019-11-15 10:43:58|
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|77

In [18]:
/*Update unit sold to 101 if the country is India and Region is Asia*/
deltaTable.update(                
  expr(" Region= 'Asia' and Country = 'India' "),
  Map("Units_Sold" -> lit(101)))

In [19]:
/*Increment unit sold by 1 if the country is India and Region is Asia*/
deltaTable.update(                
  expr(" Region= 'Asia' and Country = 'India' "),
  Map("Units_Sold" -> expr("Units_Sold + 4")))

In [20]:
println(updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").count())
updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").show()

3
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+------------------+------------+-------------------+
|Region|Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|        Total_Cost|Total_Profit|        create_time|
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+------------------+------------+-------------------+
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       105|     200.1|    150.1|      20210.1|15160.099999999999|      5000.0|2019-11-15 10:43:58|
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       105|     200.1|    150.1|      20210.1|15160.099999999999|      5000.0|2019-11-15 10:43:58|
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|77

In [21]:
/*Complex conditional update*/
/**
Delta is not supporting mutiple column update at same time if it's dependent on each other. 
Example : Here Total_Revenue and Total_Cost is dependent on Units_Sold. If you want to change Units_Sold, 
then in single update we can't update Units_Sold, Total_Revenue and Total_Cost as Units_Sold value refers old value.
In order to achieve this correctly, first need to update Units_Sold then we need to calculate Total_Revenue and Total_Cost.
**/
deltaTable.update(                
  expr(" Region= 'Asia' and Country = 'India' "),
  Map("Total_Revenue" -> expr("Units_Sold * Unit_Price")
     ,"Total_Cost" -> expr("Units_Sold * Unit_Cost")))

In [22]:
println(updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").count())
updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").show()

3
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+-------------------+
|Region|Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|        create_time|
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+-------------------+
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       105|     200.1|    150.1|      21010.5|   15760.5|      5000.0|2019-11-15 10:43:58|
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       105|     200.1|    150.1|      21010.5|   15760.5|      5000.0|2019-11-15 10:43:58|
|  Asia|  India|  Alcohol|      Offline|             H|11/14/2019|779165933|11/14/2019|       105|     200.1

## Conditional delete in Delta table

In [23]:
/*Delete if the country is India and Region is Asia*/
deltaTable.delete(condition = expr(" Region= 'Asia' and Country = 'India' "))

In [25]:
println(updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").count())
println("Total count is "+updatedSalesOrderDf.count())
updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").show()

0
Total count is 100
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+-----------+
|Region|Country|Item_Type|Sales_Channel|Order_Priority|Order_Date|Order_ID|Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|create_time|
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+-----------+
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+-----------+



## Upsert in Delta table (Insert + Update)

In [28]:
// Upsert (merge) new data
/*Need to specify all the columns for insert*/
val newData = Seq(("Asia","Sri Lanka","H"),("Asia","India","H")).toDF("Region","Country","Order_Priority")

deltaTable.as("oldData")
  .merge(
    newData.as("newData"),
    "oldData.Region = newData.Region and oldData.Country = newData.Country")
  .whenMatched
  .update(Map("Order_Priority" -> col("newData.Order_Priority")))
  .whenNotMatched
  .insert(Map("Region" -> col("newData.Region"), 
              "Country" -> col("newData.Country"), 
              "Order_Priority" -> col("newData.Order_Priority"),
              "Item_Type" -> lit("") , 
              "Sales_Channel" -> lit("") , 
              "Order_Date" -> lit("") , 
              "Order_ID" -> lit(0) , 
              "Ship_Date" -> lit("") , 
              "Units_Sold" -> lit(0) , 
              "Unit_Price" -> lit(0) , 
              "Unit_Cost" -> lit(0),
              "Total_Revenue" -> lit(0),
              "Total_Cost" -> lit(0),
              "Total_Profit" -> lit(0),
              "create_time" -> lit(""))).execute()


newData = [Region: string, Country: string ... 1 more field]


lastException: Throwable = null


[Region: string, Country: string ... 1 more field]

In [29]:
println(updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").count())
println("Total count is "+updatedSalesOrderDf.count())
updatedSalesOrderDf.filter("Region= 'Asia' and Country = 'India'").show()

1
Total count is 101
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+-----------+
|Region|Country|Item_Type|Sales_Channel|Order_Priority|Order_Date|Order_ID|Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|create_time|
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+-----------+
|  Asia|  India|         |             |             H|          |       0|         |         0|       0.0|      0.0|          0.0|       0.0|         0.0|           |
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+-----------+



## Read older versions of data using Time Travel

In [31]:
val timestamp_string = "2019-11-18"
val version = 2
val timeTravelByTimestamp = spark.read.format("delta").option("timestampAsOf", timestamp_string).load(deltaPath + "sales_order")
val timeTravelByVersion = spark.read.format("delta").option("versionAsOf", version).load(deltaPath + "sales_order")
println(timeTravelByTimestamp.count())
println(timeTravelByVersion.count())

103
100


timestamp_string = 2019-11-18
version = 2
timeTravelByTimestamp = [Region: string, Country: string ... 13 more fields]
timeTravelByVersion = [Region: string, Country: string ... 12 more fields]


[Region: string, Country: string ... 12 more fields]

In [38]:
val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(5) // get the last operation

fullHistoryDF.select("version","timestamp","operation","operationParameters","readVersion").show(20)

+-------+-------------------+---------+--------------------+-----------+
|version|          timestamp|operation| operationParameters|readVersion|
+-------+-------------------+---------+--------------------+-----------+
|     18|2019-11-18 11:56:11|    MERGE|[predicate -> ((o...|         17|
|     17|2019-11-18 11:19:29|   DELETE|[predicate -> ["(...|         16|
|     16|2019-11-18 10:57:21|   UPDATE|[predicate -> ((R...|         15|
|     15|2019-11-18 10:56:59|   UPDATE|[predicate -> ((R...|         14|
|     14|2019-11-18 10:56:47|   UPDATE|[predicate -> ((R...|         13|
|     13|2019-11-18 10:54:25|   UPDATE|[predicate -> ((R...|         12|
|     12|2019-11-18 10:50:23|   UPDATE|[predicate -> ((R...|         11|
|     11|2019-11-18 10:48:12|   UPDATE|[predicate -> ((R...|         10|
|     10|2019-11-18 10:05:52|   UPDATE|[predicate -> ((R...|          9|
|      9|2019-11-18 10:02:43|   UPDATE|[predicate -> (Co...|          8|
|      8|2019-11-18 03:11:55|   UPDATE|[predicate -

fullHistoryDF = [version: bigint, timestamp: timestamp ... 10 more fields]
lastOperationDF = [version: bigint, timestamp: timestamp ... 10 more fields]


[version: bigint, timestamp: timestamp ... 10 more fields]

In [39]:
lastOperationDF.select("version","timestamp","operation","operationParameters","readVersion").show(20)

+-------+-------------------+---------+--------------------+-----------+
|version|          timestamp|operation| operationParameters|readVersion|
+-------+-------------------+---------+--------------------+-----------+
|     18|2019-11-18 11:56:11|    MERGE|[predicate -> ((o...|         17|
|     17|2019-11-18 11:19:29|   DELETE|[predicate -> ["(...|         16|
|     16|2019-11-18 10:57:21|   UPDATE|[predicate -> ((R...|         15|
|     15|2019-11-18 10:56:59|   UPDATE|[predicate -> ((R...|         14|
|     14|2019-11-18 10:56:47|   UPDATE|[predicate -> ((R...|         13|
+-------+-------------------+---------+--------------------+-----------+

