# Diving into Delta Lake: DML Internals

This notebook is a modified version of the [SAIS EU 2019 Delta Lake Tutorial](https://github.com/delta-io/delta/tree/master/examples/tutorials/saiseu19). The data used is a modified version of the public data from [Lending Club](https://www.kaggle.com/wendykan/lending-club-loan-data). It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data please view the data dictionary available [here](https://resources.lendingclub.com/LCDataDictionary.xlsx).


### Steps to run this notebook
You can run this notebook in a Databricks environment. Specifically, this notebook has been designed to run in [Databricks Community Edition](http://community.cloud.databricks.com/) as well.
To run this notebook, you have to [create a cluster](https://docs.databricks.com/clusters/create.html) with version **Databricks Runtime 6.5 or later** and [attach this notebook](https://docs.databricks.com/notebooks/notebooks-manage.html#attach-a-notebook-to-a-cluster) to that cluster. <br/>&nbsp;

<img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>

An open-source storage format that brings ACID transactions to Apache Spark™ and big data workloads.
* **Open format**: Stored as Parquet format in blob storage.
* **ACID Transactions**: Ensures data integrity and read consistency with complex, concurrent data pipelines.
* **Schema Enforcement and Evolution**: Ensures data cleanliness by blocking writes with unexpected.
* **Audit History**: History of all the operations that happened in the table.
* **Time Travel**: Query previous versions of the table by time or version number.
* **Deletes and upserts**: Supports deleting and upserting into tables with programmatic APIs.
* **Scalable Metadata management**: Able to handle millions of files are scaling the metadata operations with Spark.
* **Unified Batch and Streaming Source and Sink**: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Loading data in Delta Lake table

First let’s, read this data and save it as a Delta Lake table.

In [4]:
%sh rm -rf /dbfs/tmp/sais_eu_19_demo/ && mkdir -p /dbfs/tmp/sais_eu_19_demo/loans/ && wget -O /dbfs/tmp/sais_eu_19_demo/loans/SAISEU19-loan-risks.snappy.parquet  https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet && ls -al  /dbfs/tmp/sais_eu_19_demo/loans/ 

In [5]:
spark.sql("set spark.sql.shuffle.partitions = 1")

# Configure source data path (TODO: update this path after loading the data into Databricks Datasets)
# sourcePath = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
sourcePath = "/tmp/sais_eu_19_demo/loans/SAISEU19-loan-risks.snappy.parquet"


# Configure Delta Lake Path
deltaPath = "/tmp/loans_delta"

# Remove folder if it exists
dbutils.fs.rm(deltaPath, recurse=True)

# Create the Delta table with the same loans data
(spark.read.format("parquet").load(sourcePath) 
  .write.format("delta").save(deltaPath))

spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")
print("Defined view 'loans_delta'")

Let's explore the data.

In [7]:
spark.sql("SELECT count(*) FROM loans_delta").show()

In [8]:
spark.sql("SELECT * FROM loans_delta LIMIT 5").show()

### Review Underlying Files
* Review the underlying `parquet` files
* Initial log

In [11]:
%sh
ls -lt /dbfs/tmp/loans_delta/

In [12]:
%sh
ls -lt /dbfs/tmp/loans_delta/_delta_log/

#### Review initial log

In [14]:
j0 = spark.read.json("/tmp/loans_delta/_delta_log/00000000000000000000.json")

In [15]:
# Commit Information
display(j0.select("commitInfo").where("commitInfo is not null"))

commitInfo
"List(0127-045215-pined152, true, WriteSerializable, List(6411057), WRITE, List(1, 164672, 14705, 0), List(ErrorIfExists, []), 1587052194069, 100599, denny.lee@databricks.com)"


In [16]:
# Add Information
display(j0.select("add").where("add is not null"))

add
"List(true, 1587052194000, part-00000-a7074cd6-aa24-464d-94de-720bba7be207-c000.snappy.parquet, 164673, {""numRecords"":14705,""minValues"":{""loan_id"":0,""funded_amnt"":1000,""paid_amnt"":0.0,""addr_state"":""AK""},""maxValues"":{""loan_id"":25769805168,""funded_amnt"":40000,""paid_amnt"":40000.0,""addr_state"":""WY""},""nullCount"":{""loan_id"":0,""funded_amnt"":0,""paid_amnt"":0,""addr_state"":0}})"


In [17]:
# Metadata Information
display(j0.select("metadata").where("metadata is not null"))

metadata
"List(1587052192669, List(parquet), b988e857-ace5-49af-8b39-cb31e249ef73, List(), {""type"":""struct"",""fields"":[{""name"":""loan_id"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""funded_amnt"",""type"":""integer"",""nullable"":true,""metadata"":{}},{""name"":""paid_amnt"",""type"":""double"",""nullable"":true,""metadata"":{}},{""name"":""addr_state"",""type"":""string"",""nullable"":true,""metadata"":{}}]})"


In [18]:
jsonStr = j0.select("metadata.schemaString").where("metadata is not null").collect()[0][0]
df = spark.read.json(sc.parallelize([jsonStr]))
display(df)

fields,type
"List(List(loan_id, true, long), List(funded_amnt, true, integer), List(paid_amnt, true, double), List(addr_state, true, string))",struct


## Review Loans by State

In [20]:
%sql
select addr_state, sum(funded_amnt)/1000000 as funded_amnt from loans_delta where funded_amnt <> paid_amnt group by addr_state

addr_state,funded_amnt
CA,18.965925
WA,3.2125
TX,12.695275
PA,5.0747
OH,5.26435
CT,2.371325
NJ,5.7361
NY,13.21
MI,3.333025
AL,1.724425


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Updating data 

You can update the data that matches a predicate from a Delta Lake table. Let's say we want to update all the fully paid loans for `WA` state.

In [22]:
%sql
SELECT COUNT(*) FROM loans_delta WHERE addr_state = 'WA' and funded_amnt <> paid_amnt

count(1)
209


In [23]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.update("addr_state = 'WA'", { "paid_amnt": "funded_amnt" } ) 

In [25]:
display(deltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
1,2020-04-16T15:50:03.000+0000,100599,denny.lee@databricks.com,UPDATE,Map(predicate -> (addr_state#69551 = WA)),,List(6411057),0127-045215-pined152,0.0,WriteSerializable,False,"Map(numTotalRows -> 14705, numFiles -> 1, numRemovedFiles -> 1, numCopiedRows -> 14365, numOutputRows -> 14705, numParts -> 0, numOutputBytes -> 163690, numAddedFiles -> 1, numUpdatedRows -> 340)"
0,2020-04-16T15:49:55.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> ErrorIfExists, partitionBy -> [])",,List(6411057),0127-045215-pined152,,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 164672, numOutputRows -> 14705, numParts -> 0)"


In [26]:
%sql
select addr_state, sum(funded_amnt)/1000000 as funded_amnt from loans_delta where funded_amnt <> paid_amnt group by addr_state

addr_state,funded_amnt
CA,18.965925
TX,12.695275
PA,5.0747
OH,5.26435
CT,2.371325
NJ,5.7361
NY,13.21
MI,3.333025
AL,1.724425
NV,2.109375


##### Review Underlying Files

In [28]:
%sh
ls -lt /dbfs/tmp/loans_delta/

In [29]:
%sh
ls -lt /dbfs/tmp/loans_delta/_delta_log/

##### Review Transaction Log

In [31]:
j1 = spark.read.json("/tmp/loans_delta/_delta_log/00000000000000000001.json")

In [32]:
# Commit Information
display(j1.select("commitInfo").where("commitInfo is not null"))

commitInfo
"List(0127-045215-pined152, false, WriteSerializable, List(6411057), UPDATE, List(1, 14365, 1, 163690, 14705, 0, 1, 14705, 340), List((addr_state#69551 = WA)), 0, 1587052202612, 100599, denny.lee@databricks.com)"


In [33]:
# Remove Information
display(j1.select("remove").where("remove is not null"))

remove
"List(true, 1587052201698, part-00000-a7074cd6-aa24-464d-94de-720bba7be207-c000.snappy.parquet)"


In [34]:
# Remove Information
display(j1.select("add").where("add is not null"))

add
"List(true, 1587052202000, part-00000-483cdc28-a522-46a9-a881-d348848f378a-c000.snappy.parquet, 163691, {""numRecords"":14705,""minValues"":{""loan_id"":0,""funded_amnt"":1000,""paid_amnt"":0.0,""addr_state"":""AK""},""maxValues"":{""loan_id"":25769805168,""funded_amnt"":40000,""paid_amnt"":40000.0,""addr_state"":""WY""},""nullCount"":{""loan_id"":0,""funded_amnt"":0,""paid_amnt"":0,""addr_state"":0}})"


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Upserting change data to a table using merge
A common use cases is Change Data Capture (CDC), where you have to replicate row changes made in an OLTP table to another table for OLAP workloads. To continue with our loan data example, say we have another table of new loan information, some of which are new loans and others are updates to existing loans. In addition, let’s say this changes table has the same schema as the loan_delta table. You can upsert these changes into the table using the DeltaTable.merge() operation which is based on the MERGE SQL command.

#### INSERT or UPDATE parquet: 7-step process

With a legacy data pipeline, to insert or update a table, you must:
1. Identify the new rows to be inserted
2. Identify the rows that will be replaced (i.e. updated)
3. Identify all of the rows that are not impacted by the insert or update
4. Create a new temp based on all three insert statements
5. Delete the original table (and all of those associated files)
6. "Rename" the temp table back to the original table name
7. Drop the temp table

![](https://pages.databricks.com/rs/094-YMS-629/images/merge-into-legacy.gif)


#### INSERT or UPDATE with Delta Lake

2-step process: 
1. Identify rows to insert or update
2. Use `MERGE`

In [38]:
%sql
select * from loans_delta where addr_state = 'NY' and loan_id < 30

loan_id,funded_amnt,paid_amnt,addr_state
11,1000,400.61,NY
21,1000,66.39,NY
28,1200,84.45,NY


Let's say we have some changes to this data, one loan has been paid off, and another new loan has been added.

In [40]:
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']

items = [
  (11, 1000, 1000.0, 'NY', True),   # loan paid off
  (12, 1000, 0.0, 'NY', False),     # new loan
  (28, 1200, 84.45, 'NY', False)    # duplicate loan
]

loanUpdates = spark.createDataFrame(items, cols)

Now, let's update the table with the change data using the `merge` operation.

In [42]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, deltaPath)

(deltaTable
  .alias("t")
  .merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id") 
  .whenMatchedUpdateAll() 
  .whenNotMatchedInsertAll() 
  .execute())

Let's see whether the table has been updated.

In [44]:
%sql
select * from loans_delta where addr_state = 'NY' and loan_id < 30

loan_id,funded_amnt,paid_amnt,addr_state
11,1000,1000.0,NY
12,1000,0.0,NY
21,1000,66.39,NY
28,1200,84.45,NY


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Auditing data changes with operation history

All changes to the Delta table are recorded as commits in the table's transaction log. As you write into a Delta table or directory, every operation is automatically versioned. You can use the HISTORY command to view the table's history.

In [46]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, deltaPath)
display(deltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
2,2020-04-16T16:20:26.000+0000,100599,denny.lee@databricks.com,MERGE,Map(predicate -> (t.`loan_id` = s.`loan_id`)),,List(6411057),0127-045215-pined152,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 14702, numTargetRowsDeleted -> 0, numFiles -> 1, numTargetFilesAfterSkipping -> 1, numTargetFilesAdded -> 1, numTargetRowsInserted -> 0, numTargetRowsUpdated -> 3, numOutputRows -> 14705, numParts -> 0, numOutputBytes -> 163688, numSourceRows -> 3, numTargetFilesRemoved -> 1, numTargetFilesBeforeSkipping -> 1)"
1,2020-04-16T16:15:15.000+0000,100599,denny.lee@databricks.com,UPDATE,Map(predicate -> (addr_state#71822 = WA)),,List(6411057),0127-045215-pined152,0.0,WriteSerializable,False,"Map(numTotalRows -> 14705, numFiles -> 1, numRemovedFiles -> 1, numCopiedRows -> 14365, numOutputRows -> 14705, numParts -> 0, numOutputBytes -> 163690, numAddedFiles -> 1, numUpdatedRows -> 340)"
0,2020-04-16T16:03:36.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> ErrorIfExists, partitionBy -> [])",,List(6411057),0127-045215-pined152,,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 164672, numOutputRows -> 14705, numParts -> 0)"


In [47]:
%sh
ls -lt /dbfs/tmp/loans_delta/

In [48]:
j2 = spark.read.json("/tmp/loans_delta/_delta_log/00000000000000000002.json")

In [49]:
# Commit Information
display(j2.select("commitInfo").where("commitInfo is not null"))

commitInfo
"List(0127-045215-pined152, false, WriteSerializable, List(6411057), MERGE, List(1, 163688, 14705, 0, 3, 1, 1, 1, 1, 14702, 0, 0, 3), List((t.`loan_id` = s.`loan_id`)), 1, 1587054025242, 100599, denny.lee@databricks.com)"


In [50]:
# Add Information
display(j2.select("add").where("add is not null"))

add
"List(true, 1587054025000, part-00000-16a43270-672b-44d7-931c-fce762005f39-c000.snappy.parquet, 163689, {""numRecords"":14705,""minValues"":{""loan_id"":0,""funded_amnt"":1000,""paid_amnt"":0.0,""addr_state"":""AK""},""maxValues"":{""loan_id"":25769805168,""funded_amnt"":40000,""paid_amnt"":40000.0,""addr_state"":""WY""},""nullCount"":{""loan_id"":0,""funded_amnt"":0,""paid_amnt"":0,""addr_state"":0}})"


In [51]:
# Remove Information
display(j2.select("remove").where("remove is not null"))

remove
"List(true, 1587054025241, part-00000-26e8ff35-1a64-4e1c-a422-7c7ff32dd73e-c000.snappy.parquet)"
