# Example of using Delta Lake 0.4 without Databricks

[Delta Lake](https://delta.io/) is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads. But it is **way more** then that!

This is a rewritten notebook example from this [blog post](https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html) by Databricks. The intension is to show why Delta Lake is a big deal and how to run Delta Lake without a Databricks services.

Delta Lake examples in this notebook:
* Convert data to as Delta Lake format
* Create Delta Lake table
* Spark SQL capabilities
* Delete data
* Update data
* View audit history of table
* Merge (union) of two tables which remove duplicates, updates rows and add a new row

For testing this docker can be used: ```docker run -it --rm -p 8888:8888 -p 4040:4040 jupyter/pyspark-notebook```

### Author
Anders Boje Larsen - [alarsen@deloitte.dk](alarsen@deloitte.dk) - [LinkedIn](https://www.linkedin.com/in/andersboje/)

### Data Preparation
Configure locations for the source file and where the Delta Lake Table will be stored

In [10]:
tripdelaysFilePath = "departuredelays.csv" 
pathToEventsTable = "departureDelays.delta"
flightdata = "https://raw.githubusercontent.com/drabastomek/learningPySpark/master/Chapter03/flight-data/departuredelays.csv"

In [11]:
!pip install --upgrade pyspark

Requirement already up-to-date: pyspark in /Users/alarsen/miniconda3/envs/py36/lib/python3.6/site-packages (2.4.4)


In [12]:
#Download data flight dataset
!rm -fr departureDelays.delta
!wget https://raw.githubusercontent.com/drabastomek/learningPySpark/master/Chapter03/flight-data/departuredelays.csv

--2019-10-24 20:56:58--  https://raw.githubusercontent.com/drabastomek/learningPySpark/master/Chapter03/flight-data/departuredelays.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)...199.232.40.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|199.232.40.133|:443... connected.
HTTP request sent, awaiting response...200 OK
Length: 33396236 (32M) [text/plain]
Saving to: ‘departuredelays.csv.2’


2019-10-24 20:57:02 (9.79 MB/s) - ‘departuredelays.csv.2’ saved [33396236/33396236]



In [13]:
import pandas as pd
from pyspark.sql import SQLContext, SparkSession
from pyspark import SparkContext, SparkConf
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.11:0.4.0 pyspark-shell'

sc_conf = SparkConf()
sc_conf.set('spark.databricks.delta.retentionDurationCheck.enabled', 'false')
sc_conf.set('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
print(sc_conf.getAll())

try:
    sc.stop()
    sc = SparkContext(conf=sc_conf)
except:
    sc = SparkContext(conf=sc_conf)

spark = SparkSession(sc)

[('spark.submit.pyFiles', '/Users/alarsen/.ivy2/jars/io.delta_delta-core_2.11-0.4.0.jar,/Users/alarsen/.ivy2/jars/org.antlr_antlr4-4.7.jar,/Users/alarsen/.ivy2/jars/org.antlr_antlr4-runtime-4.7.jar,/Users/alarsen/.ivy2/jars/org.antlr_antlr-runtime-3.5.2.jar,/Users/alarsen/.ivy2/jars/org.antlr_ST4-4.0.8.jar,/Users/alarsen/.ivy2/jars/org.abego.treelayout_org.abego.treelayout.core-1.0.3.jar,/Users/alarsen/.ivy2/jars/org.glassfish_javax.json-1.0.4.jar,/Users/alarsen/.ivy2/jars/com.ibm.icu_icu4j-58.2.jar'), ('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension'), ('spark.repl.local.jars', 'file:///Users/alarsen/.ivy2/jars/io.delta_delta-core_2.11-0.4.0.jar,file:///Users/alarsen/.ivy2/jars/org.antlr_antlr4-4.7.jar,file:///Users/alarsen/.ivy2/jars/org.antlr_antlr4-runtime-4.7.jar,file:///Users/alarsen/.ivy2/jars/org.antlr_antlr-runtime-3.5.2.jar,file:///Users/alarsen/.ivy2/jars/org.antlr_ST4-4.0.8.jar,file:///Users/alarsen/.ivy2/jars/org.abego.treelayout_org.abego.treelayout.core-

Create `departureDelays` DataFrame

In [14]:
departureDelays = spark.read.option("header", "true").option("inferSchema", "true").csv(tripdelaysFilePath)

Save table as Delta Lake (update `pathToEventsTable` to match the following location

In [15]:
departureDelays.write.format("delta").mode("overwrite").save(pathToEventsTable)

Load Delta Lake table

In [16]:
delays_delta = spark.read.format("delta").load(pathToEventsTable)
delays_delta.createOrReplaceTempView("delays_delta")

Get count of rows

In [17]:
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").toPandas()

Unnamed: 0,count(1)
0,1698


**Review File System**: Note there are four files initially created as part of the table creation.

In [18]:
%ls $pathToEventsTable

[1m[36m_delta_log[m[m/
part-00000-a7d6a844-14a1-46dc-b4cd-b77a0ad81e86-c000.snappy.parquet
part-00001-8074dff5-6d8c-401a-ae68-e96e7421fa9d-c000.snappy.parquet
part-00002-b197711b-ac23-4545-9a21-b23f75f2d967-c000.snappy.parquet
part-00003-d695c1b4-996a-4ccd-ab04-98b2379f8dbb-c000.snappy.parquet
part-00004-be481f0f-5371-4d12-9610-726e288ffc3d-c000.snappy.parquet
part-00005-c51aa017-7e57-453c-a152-e1b1402c71e2-c000.snappy.parquet
part-00006-b213286a-d1dd-4305-b91b-aaa3798cde1d-c000.snappy.parquet
part-00007-f04adaab-04cb-4a8a-a6ea-9022a31bfe43-c000.snappy.parquet


### Deletes
With Delta Lake, you can delete data with the Python API

In [19]:
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, pathToEventsTable)
deltaTable.delete("delay < 0") 

In [20]:
# Get Row Count
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").toPandas()

Unnamed: 0,count(1)
0,837


**Review File System**: Note that while we deleted early (and on-time) flights, there are now eight files (instead of the four files initially created as part of the table creation).

In [21]:
%ls $pathToEventsTable

[1m[36m_delta_log[m[m/
part-00000-a7d6a844-14a1-46dc-b4cd-b77a0ad81e86-c000.snappy.parquet
part-00000-eb445dee-f7d4-45e1-b0fd-773120453d92-c000.snappy.parquet
part-00001-8074dff5-6d8c-401a-ae68-e96e7421fa9d-c000.snappy.parquet
part-00001-c77c66d0-a97d-4caf-b136-e79d272505fe-c000.snappy.parquet
part-00002-b197711b-ac23-4545-9a21-b23f75f2d967-c000.snappy.parquet
part-00002-e6e7e427-a510-4f8d-9bf1-43bfabbb524f-c000.snappy.parquet
part-00003-39ce1e03-9535-4243-beff-59b336223c6f-c000.snappy.parquet
part-00003-d695c1b4-996a-4ccd-ab04-98b2379f8dbb-c000.snappy.parquet
part-00004-be481f0f-5371-4d12-9610-726e288ffc3d-c000.snappy.parquet
part-00004-d497d609-0b2a-49b9-8bbc-46bc2e48ad04-c000.snappy.parquet
part-00005-9490e34c-6886-4f8b-80c3-5ae852738196-c000.snappy.parquet
part-00005-c51aa017-7e57-453c-a152-e1b1402c71e2-c000.snappy.parquet
part-00006-0fa92e12-eb12-4498-ba6e-0f26e5286f5e-c000.snappy.parquet
part-00006-b213286a-d1dd-4305-b91b-aaa3798cde1d-c000.snappy.parquet
part-00007-e6801601-2

### Updates
Update flights originating from Detroit (DTW) to now be from Seattle (SEA)

In [22]:
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } ) 

In [23]:
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").toPandas()

Unnamed: 0,count(1)
0,986


### View History
View the table history (note the create table, insert, and update operations)

In [24]:
deltaTable.history().toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend
0,2,2019-10-24 20:57:14,,,UPDATE,{'predicate': '(origin#1503 = DTW)'},,,,1.0,,False
1,1,2019-10-24 20:57:09,,,DELETE,"{'predicate': '[""(`delay` < 0)""]'}",,,,0.0,,False
2,0,2019-10-24 20:57:05,,,WRITE,"{'mode': 'Overwrite', 'partitionBy': '[]'}",,,,,,False


Calculate counts for each version of the table

In [25]:
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")

cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()

print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))

SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986


**Review File System**: Note the number of files based on the preceding operations.

In [26]:
%ls $pathToEventsTable

[1m[36m_delta_log[m[m/
part-00000-a7d6a844-14a1-46dc-b4cd-b77a0ad81e86-c000.snappy.parquet
part-00000-eb445dee-f7d4-45e1-b0fd-773120453d92-c000.snappy.parquet
part-00000-ec5df012-c7fd-4c34-b3bc-030be8764c2c-c000.snappy.parquet
part-00001-8074dff5-6d8c-401a-ae68-e96e7421fa9d-c000.snappy.parquet
part-00001-a513c0f6-419a-4c5e-8c8b-6bbcc485bd62-c000.snappy.parquet
part-00001-c77c66d0-a97d-4caf-b136-e79d272505fe-c000.snappy.parquet
part-00002-3f59f723-91d5-4be6-bae4-1401130be4fc-c000.snappy.parquet
part-00002-b197711b-ac23-4545-9a21-b23f75f2d967-c000.snappy.parquet
part-00002-e6e7e427-a510-4f8d-9bf1-43bfabbb524f-c000.snappy.parquet
part-00003-39ce1e03-9535-4243-beff-59b336223c6f-c000.snappy.parquet
part-00003-d695c1b4-996a-4ccd-ab04-98b2379f8dbb-c000.snappy.parquet
part-00004-be481f0f-5371-4d12-9610-726e288ffc3d-c000.snappy.parquet
part-00004-d497d609-0b2a-49b9-8bbc-46bc2e48ad04-c000.snappy.parquet
part-00005-9490e34c-6886-4f8b-80c3-5ae852738196-c000.snappy.parquet
part-00005-c51aa017-7

### Vacuum
Remove older data (by default 7 days) 

In [27]:
deltaTable.vacuum(0)

DataFrame[]

In [28]:
%ls $pathToEventsTable

[1m[36m_delta_log[m[m/
part-00000-eb445dee-f7d4-45e1-b0fd-773120453d92-c000.snappy.parquet
part-00000-ec5df012-c7fd-4c34-b3bc-030be8764c2c-c000.snappy.parquet
part-00001-a513c0f6-419a-4c5e-8c8b-6bbcc485bd62-c000.snappy.parquet
part-00001-c77c66d0-a97d-4caf-b136-e79d272505fe-c000.snappy.parquet
part-00002-3f59f723-91d5-4be6-bae4-1401130be4fc-c000.snappy.parquet
part-00003-39ce1e03-9535-4243-beff-59b336223c6f-c000.snappy.parquet
part-00006-0fa92e12-eb12-4498-ba6e-0f26e5286f5e-c000.snappy.parquet
part-00007-e6801601-2659-4577-b51b-c9c962ee10b7-c000.snappy.parquet


And let's not forget, Delta Lake 0.4.0 also includes `MERGE` in the Python API!

### Merge
Let's merge another table with the `departureDelays` table with [data deduplication](https://docs.delta.io/0.4.0/delta-update.html#data-deduplication-when-writing-into-delta-tables).  Let's start by viewing data that will be impacted by the merge.

In [29]:
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' order by date limit 10").toPandas()

Unnamed: 0,date,delay,distance,origin,destination
0,1010521,0,590,SEA,SFO
1,1010710,31,590,SEA,SFO
2,1010730,5,590,SEA,SFO
3,1010955,104,590,SEA,SFO


Next, let's create our `merge_table` which contains three rows:
* 1010710: this row is a duplicate
* 1010521: this row will be updated with a new delay value
* 1010822: this is a new row

In [30]:
items = [(1010521, 10, 590, 'SEA', 'SFO'), (1010710, 31, 590, 'SEA', 'SFO'), (1010832, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination']
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()

Unnamed: 0,date,delay,distance,origin,destination
0,1010521,10,590,SEA,SFO
1,1010710,31,590,SEA,SFO
2,1010832,31,590,SEA,SFO


Let's run our merge statement that will handle the duplicates, updates, and add a new row

In [31]:
deltaTable.alias("flights") \
    .merge(merge_table.alias("updates"),"flights.date = updates.date") \
    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
    .whenNotMatchedInsertAll() \
    .execute()

In [32]:
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' order by date limit 10").toPandas()

Unnamed: 0,date,delay,distance,origin,destination
0,1010521,10,590,SEA,SFO
1,1010710,31,590,SEA,SFO
2,1010730,5,590,SEA,SFO
3,1010832,31,590,SEA,SFO
4,1010955,104,590,SEA,SFO


As noted in the previous cells, notice the following:
* There is only one row for the date `1010710` as `merge` automatically takes care of **data deduplication**
* The row for the date `1010521` has the `delay` value **updated** from 0 to 10.
* The row for the date `1010832` has been added as this date did not exist, hence it was **inserted**.