# Simplify Data Lake Reliability with Delta Lake and Python, SQL Utilities, and In-Place Migration

We are excited to announce the release of Delta Lake 0.4.0 which introduces Python APIs for manipulating and managing data in Delta tables. The key features in this release are:

* **Python APIs for DML and utility operations** ([#89](https://github.com/delta-io/delta/issues/89)) - You can now use Python APIs to update/delete/merge data in Delta Lake tables and to run utility operations (i.e., vacuum, history) on them. These are great for building complex workloads in Python, e.g., [Slowly Changing Dimension (SCD)](https://docs.delta.io/0.4.0/delta-update.html#slowly-changing-data-scd-type-2-operation-into-delta-tables) operations, merging [change data](https://docs.delta.io/0.4.0/delta-update.html#write-change-data-into-a-delta-table) for replication, and [upserts from streaming queries](https://docs.delta.io/0.4.0/delta-update.html#upsert-from-streaming-queries-using-foreachbatch). See the [documentation](https://docs.delta.io/0.4.0/delta-update.html) for more details.

* **Convert-to-Delta** ([#78](https://github.com/delta-io/delta/issues/78)) - You can now convert a Parquet table in place to a Delta Lake table without rewriting any of the data. This is great for converting very large Parquet tables which would be costly to rewrite as a Delta table. Furthermore, this process is reversible - you can convert a Parquet table to Delta Lake table, operate on it (e.g., delete or merge), and easily convert it back to a Parquet table. See the [documentation](https://docs.delta.io/0.4.0/delta-utility.html#convert-to-delta) for more details.

* **SQL for utility operations** - You can now use SQL to run utility operations vacuum and history. See the [documentation](https://docs.delta.io/0.4.0/delta-utility.html#enable-sql-commands-within-apache-spark) for more details on how to configure Spark to execute these Delta-specific SQL commands.



### Data Preparation
Configure locations for the source file and where the Delta Lake Table will be stored

In [1]:
import pandas as pd

In [2]:
tripdelaysFilePath = "/usr/local/Cellar/spark/data/departuredelays.csv"
pathToEventsTable = "/usr/local/Cellar/spark/spark-2.4.3-bin-hadoop2.7/departureDelays.delta"

Create `departureDelays` DataFrame

In [3]:
departureDelays = spark.read.option("header", "true").option("inferSchema", "true").csv(tripdelaysFilePath)

Save table as Delta Lake (update `pathToEventsTable` to match the following location

In [4]:
departureDelays.write.format("delta").mode("overwrite").save("departureDelays.delta")

Load Delta Lake table

In [5]:
delays_delta = spark.read.format("delta").load("departureDelays.delta")
delays_delta.createOrReplaceTempView("delays_delta")

Get count of rows

In [6]:
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").toPandas()

Unnamed: 0,count(1)
0,1698


**Review File System**: Note there are four files initially created as part of the table creation.

In [7]:
%ls $pathToEventsTable

[34m_delta_log[m[m/
part-00000-091cff02-ca22-473d-bb22-13164be0846b-c000.snappy.parquet
part-00001-e61a8edb-210a-4692-a45e-388caf74dd62-c000.snappy.parquet
part-00002-9c17ccd7-7c13-429e-b26c-eabad1e1d58a-c000.snappy.parquet
part-00003-bf58ae04-4900-425c-bdd4-c29aa8c454e4-c000.snappy.parquet


### Deletes
With Delta Lake, you can delete data with the Python API

In [8]:
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, pathToEventsTable)
deltaTable.delete("delay < 0") 

In [9]:
# Get Row Count
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").toPandas()

Unnamed: 0,count(1)
0,837


**Review File System**: Note that while we deleted early (and on-time) flights, there are now eight files (instead of the four files initially created as part of the table creation).

In [10]:
%ls $pathToEventsTable

[34m_delta_log[m[m/
part-00000-091cff02-ca22-473d-bb22-13164be0846b-c000.snappy.parquet
part-00000-fb071d3b-9d25-4faf-a04f-26d35716315b-c000.snappy.parquet
part-00001-8db427ad-edd5-4c99-8578-01da4b4ea921-c000.snappy.parquet
part-00001-e61a8edb-210a-4692-a45e-388caf74dd62-c000.snappy.parquet
part-00002-8c19b041-8e4b-4c38-be54-3d32bce76f83-c000.snappy.parquet
part-00002-9c17ccd7-7c13-429e-b26c-eabad1e1d58a-c000.snappy.parquet
part-00003-bf58ae04-4900-425c-bdd4-c29aa8c454e4-c000.snappy.parquet
part-00003-c08cd37e-659b-49c8-9ab1-5a39862dddc9-c000.snappy.parquet


### Updates
Update flights originating from Detroit (DTW) to now be from Seattle (SEA)

In [11]:
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } ) 

In [12]:
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").toPandas()

Unnamed: 0,count(1)
0,986


### View History
View the table history (note the create table, insert, and update operations)

In [13]:
deltaTable.history().toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend
0,2,2019-10-02 17:56:46,,,UPDATE,{'predicate': '(origin#767 = DTW)'},,,,1.0,,False
1,1,2019-10-02 17:56:38,,,DELETE,"{'predicate': '[""(`delay` < 0)""]'}",,,,0.0,,False
2,0,2019-10-02 17:56:25,,,WRITE,"{'mode': 'Overwrite', 'partitionBy': '[]'}",,,,,,False


Calculate counts for each version of the table

In [14]:
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")

cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()

print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))

SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986


**Review File System**: Note the number of files based on the preceding operations.

In [15]:
%ls $pathToEventsTable

[34m_delta_log[m[m/
part-00000-091cff02-ca22-473d-bb22-13164be0846b-c000.snappy.parquet
part-00000-fa942ead-0f90-4a99-8949-510d00a77597-c000.snappy.parquet
part-00000-fb071d3b-9d25-4faf-a04f-26d35716315b-c000.snappy.parquet
part-00001-8db427ad-edd5-4c99-8578-01da4b4ea921-c000.snappy.parquet
part-00001-e2639517-7fea-4bc0-ba74-3af982025112-c000.snappy.parquet
part-00001-e61a8edb-210a-4692-a45e-388caf74dd62-c000.snappy.parquet
part-00002-8c19b041-8e4b-4c38-be54-3d32bce76f83-c000.snappy.parquet
part-00002-9c17ccd7-7c13-429e-b26c-eabad1e1d58a-c000.snappy.parquet
part-00002-c85d41c2-0ea1-451a-9bbf-52e7bf87d569-c000.snappy.parquet
part-00003-bf58ae04-4900-425c-bdd4-c29aa8c454e4-c000.snappy.parquet
part-00003-c08cd37e-659b-49c8-9ab1-5a39862dddc9-c000.snappy.parquet


### Vacuum
Remove older data (by default 7 days) 

In [16]:
deltaTable.vacuum(0)

DataFrame[]

In [17]:
%ls $pathToEventsTable

[34m_delta_log[m[m/
part-00000-fa942ead-0f90-4a99-8949-510d00a77597-c000.snappy.parquet
part-00001-e2639517-7fea-4bc0-ba74-3af982025112-c000.snappy.parquet
part-00002-c85d41c2-0ea1-451a-9bbf-52e7bf87d569-c000.snappy.parquet
part-00003-c08cd37e-659b-49c8-9ab1-5a39862dddc9-c000.snappy.parquet


And let's not forget, Delta Lake 0.4.0 also includes `MERGE` in the Python API!

### Merge
Let's merge another table with the `departureDelays` table with [data deduplication](https://docs.delta.io/0.4.0/delta-update.html#data-deduplication-when-writing-into-delta-tables).  Let's start by viewing data that will be impacted by the merge.

In [18]:
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' order by date limit 10").toPandas()

Unnamed: 0,date,delay,distance,origin,destination
0,1010521,0,590,SEA,SFO
1,1010710,31,590,SEA,SFO
2,1010730,5,590,SEA,SFO
3,1010955,104,590,SEA,SFO


Next, let's create our `merge_table` which contains three rows:
* 1010710: this row is a duplicate
* 1010521: this row will be updated with a new delay value
* 1010822: this is a new row

In [19]:
items = [(1010521, 10, 590, 'SEA', 'SFO'), (1010710, 31, 590, 'SEA', 'SFO'), (1010832, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination']
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()

Unnamed: 0,date,delay,distance,origin,destination
0,1010521,10,590,SEA,SFO
1,1010710,31,590,SEA,SFO
2,1010832,31,590,SEA,SFO


Let's run our merge statement that will handle the duplicates, updates, and add a new row

In [20]:
deltaTable.alias("flights") \
    .merge(merge_table.alias("updates"),"flights.date = updates.date") \
    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
    .whenNotMatchedInsertAll() \
    .execute()

In [21]:
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' order by date limit 10").toPandas()

Unnamed: 0,date,delay,distance,origin,destination
0,1010521,10,590,SEA,SFO
1,1010710,31,590,SEA,SFO
2,1010730,5,590,SEA,SFO
3,1010832,31,590,SEA,SFO
4,1010955,104,590,SEA,SFO


As noted in the previous cells, notice the following:
* There is only one row for the date `1010710` as `merge` automatically takes care of **data deduplication**
* The row for the date `1010521` has the `delay` value **updated** from 0 to 10.
* The row for the date `1010821` has been added as this date did not exist, hence it was **inserted**.