# Simplify and Scale Data Engineering Pipelines with Delta Lake

Delta Lake: An open-source storage format that brings ACID transactions to Apache Spark™ and big data workloads.

<img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>


* **Open format**: Stored as Parquet format in blob storage.
* **ACID Transactions**: Ensures data integrity and read consistency with complex, concurrent data pipelines.
* **Schema Enforcement and Evolution**: Ensures data cleanliness by blocking writes with unexpected.
* **Audit History**: History of all the operations that happened in the table.
* **Time Travel**: Query previous versions of the table by time or version number.
* **Deletes and upserts**: Supports deleting and upserting into tables with programmatic APIs.
* **Scalable Metadata management**: Able to handle millions of files are scaling the metadata operations with Spark.
* **Unified Batch and Streaming Source and Sink**: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box. 

### Source:
This notebook is a modified version of the [SAIS EU 2019 Delta Lake Tutorial](https://github.com/delta-io/delta/tree/master/examples/tutorials/saiseu19).

### Steps to run this notebook

You can run this notebook in a Databricks environment. Specifically, this notebook has been designed to run in [Databricks Community Edition](http://community.cloud.databricks.com/) as well.
To run this notebook, you have to [create a cluster](https://docs.databricks.com/clusters/create.html) with version **Databricks Runtime 6.1 or later** and [attach this notebook](https://docs.databricks.com/notebooks/notebooks-manage.html#attach-a-notebook-to-a-cluster) to that cluster. <br/>&nbsp;

### Source Data for this notebook

The data used is a modified version of the public data from [Lending Club](https://www.kaggle.com/wendykan/lending-club-loan-data). It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data please view the data dictionary available [here](https://resources.lendingclub.com/LCDataDictionary.xlsx).

## Explore data as a Parquet table
* Initially, let's start by exploring the data as a Parquet table.  
* As we progress, we will showcase how Delta Lake improves up on Parquet.

#### Download the sampled Lending Club data

In [5]:
%sh rm -rf /dbfs/tmp/sais_eu_19_demo/ && mkdir -p /dbfs/tmp/sais_eu_19_demo/loans/ && wget -O /dbfs/tmp/sais_eu_19_demo/loans/SAISEU19-loan-risks.snappy.parquet  https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet && ls -al  /dbfs/tmp/sais_eu_19_demo/loans/ 

#### Create the parquet table "loans_parquet"

In [7]:
from pyspark.sql.functions import * 

parquet_path = "/tmp/sais_eu_19_demo/loans/"

# Create a view on the table called loans_parquet
spark.read.format("parquet").load(parquet_path).createOrReplaceTempView("loans_parquet")
print("Defined view 'loans_parquet'")

#### Let's explore this parquet table.

*Schema of the table*

| Column Name | Description |
| ----------- | ----------- | 
| load_id | unique id for each loan |
| funded_amnt | principal amount of the loan funded to the loanee |
| paid_amnt | amount from the principle that has been paid back (ignoring interests) |
| addr_state | state where this loan was funded |

In [9]:
spark.sql("select * from loans_parquet").show(20)

#### How many records does it have?

In [11]:
spark.sql("select count(*) from loans_parquet").show()

In [12]:
dbutils.notebook.exit("stop") # Stop the notebook before the streaming cell, in case of a "run all" 

stop

#### Let's start appending some new data to it using Structured Streaming

We will generate a stream of data from with randomly generated loan ids and amounts. 
In addition, we are going to define a few more useful utility functions.

In [14]:
import random
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *


def random_checkpoint_dir(): 
  return "/tmp/sais_eu_19_demo/chkpt/%s" % str(random.randint(0, 10000))

# User-defined function to generate random state

states = ["CA", "TX", "NY", "WA"]

@udf(returnType=StringType())
def random_state():
  return str(random.choice(states))

# Function to start a streaming query with a stream of randomly generated data and append to the parquet table
def generate_and_append_data_stream(table_format, table_path):

  stream_data = (spark.readStream.format("rate").option("rowsPerSecond", 5).load() 
    .withColumn("loan_id", 10000 + col("value")) 
    .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) 
    .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) 
    .withColumn("addr_state", random_state()))

  query = (stream_data.writeStream 
    .format(table_format) 
    .option("checkpointLocation", random_checkpoint_dir()) 
    .trigger(processingTime = "10 seconds") 
    .start(table_path))

  return query

# Function to stop all streaming queries 
def stop_all_streams():
  # Stop all the streams
  print("Stopping all streams")
  for s in spark.streams.active:
    s.stop()
  print("Stopped all streams")
  print("Deleting checkpoints")  
  dbutils.fs.rm("/tmp/sais_eu_19_demo/chkpt/", True)
  print("Deleted checkpoints")

#### Let's start a new stream to append data to the Parquet table

In [16]:
stream_query = generate_and_append_data_stream(
    table_format = "parquet", 
    table_path = parquet_path)

#### Let's see if the data is being added to the table or not

In [18]:
spark.read.format("parquet").load(parquet_path).count()

#### What happens if we try to add a second writeStream?

In [20]:
stream_query2 = generate_and_append_data_stream(
    table_format = "parquet", 
    table_path = parquet_path)

#### Where did our existing 14705 rows go? Let's see the data once again

In [22]:
spark.read.format("parquet").load(parquet_path).show() # wrong schema!

#### Where did the two new columns `timestamp` and `value` come from? What happened here!

What really happened is that when the streaming query started adding new data to the Parquet table, it did not properly account for the existing data in the table. Furthermore, the new data files that written out accidentally had two extra columns in the schema. Hence, when reading the table, the 2 different schema from different files were merged together, thus unexpectedly modifying the schema of the table.


Before we move on, **if you are running on Databricks Community Edition, definitely stop the streaming queries.** 

You free account in Databricks Community Edition has quota limits on the number of files and we do not want to hit that quote limit by running the streaming queries for too long.

In [24]:
stop_all_streams()

### Problems with Parquet format

Parquet is only a data layout format within a single file, does not provide any guarantees across an entire table of many parquet files.

#### 1. No schema enforcement 
Schema is not enforced when writing leading to dirty and often corrupted data.

#### 2. No interoperatbility between batch and streaming workloads
Apache Spark's Parquet streaming sink does not maintain enough metadata such that batch workload can seamlessly interact with batch workloads.

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Batch + stream processing and schema enforcement with Delta Lake
Let's understand Delta Lake solves these particular problems (among many others). We will start by creating a Delta table from the original data.

In [27]:
%sh rm -rf /dbfs/tmp/sais_eu_19_demo/ && mkdir -p /dbfs/tmp/sais_eu_19_demo/loans/ && wget -O /dbfs/tmp/sais_eu_19_demo/loans/SAISEU19-loan-risks.snappy.parquet  https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet && ls -al  /dbfs/tmp/sais_eu_19_demo/loans/ && ls -al  /dbfs/tmp/sais_eu_19_demo/

In [28]:
spark.sql("set spark.sql.shuffle.partitions = 1")
spark.sql("set spark.databricks.delta.snapshotPartitions = 1")

# Configure Delta Lake Silver Path
delta_path = "/tmp/sais_eu_19_demo/loans_delta"

# Configurations necessary for running of Databricks Community Edition
spark.sql("set spark.sql.shuffle.partitions = 1")
spark.sql("set spark.databricks.delta.snapshotPartitions = 1")

# Remove folder if it exists
print("Deleting directory " + delta_path)
dbutils.fs.rm(delta_path, recurse=True)

# Create the Delta table with the same loans data
spark.read.format("parquet").load(parquet_path) \
  .write.format("delta").save(delta_path)
print("Created a Delta table at " + delta_path)

spark.read.format("delta").load(delta_path).createOrReplaceTempView("loans_delta")
print("Defined view 'loans_delta'")


#### Let's see the data once again

In [30]:
spark.sql("select count(*) from loans_delta").show()

In [31]:
spark.sql("select * from loans_delta").show()

#### Let's run a streaming count(*) on the table so that the count updates automatically

In [33]:
spark.readStream.format("delta").load(delta_path).createOrReplaceTempView("loans_delta_stream")
display(spark.sql("select count(*) from loans_delta_stream"))

count(1)
27455


#### Now let's try writing the streaming appends once again

In [35]:
stream_query_2 = generate_and_append_data_stream(table_format = "delta", table_path = delta_path)

The writes were blocked because the schema of the new data did not match the schema of table (see the exception details). See more information about how it works [here](https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html).

**Now, let's fix the streaming query by selecting the columns we want to write.**

In [37]:
from pyspark.sql.functions import *

# Generate a stream of randomly generated load data and append to the parquet table
def generate_and_append_data_stream_fixed(table_format, table_path):
    
  stream_data = (spark.readStream.format("rate").option("rowsPerSecond", 50).load() 
    .withColumn("loan_id", 10000 + col("value")) 
    .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) 
    .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) 
    .withColumn("addr_state", random_state()) 
    .select("loan_id", "funded_amnt", "paid_amnt", "addr_state")   # *********** FIXED THE SCHEMA OF THE GENERATED DATA *************
    )

  query = (stream_data.writeStream 
    .format(table_format) 
    .option("checkpointLocation", random_checkpoint_dir()) 
    .trigger(processingTime="10 seconds") 
    .start(table_path))

  return query

#### Now we can successfully write to the table. Note the count in the above streaming query increasing as we write to this table.

In [39]:
stream_query_2 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)

**Scroll back up to see the numbers change in the `readStream` as more data is being appended by the `writeStream`.** 

**In fact, we can run multiple concurrent streams writing to that table, it will work together.**

In [41]:
stream_query_3 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)

Just for sanity check, let's query as a batch

Note, you can run a read stream, two write streams, and read in batch - concurrently!

In [43]:
spark.sql("select count(*) from loans_delta").show()

#### Again, remember to stop all the streaming queries.

In [45]:
stop_all_streams()

#### Let's take a look at the file system

In [47]:
%fs ls /tmp/sais_eu_19_demo/loans_delta

path,name,size
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/,_delta_log/,0
dbfs:/tmp/sais_eu_19_demo/loans_delta/part-00000-03fa53af-ea62-4990-883c-6806f29bcff1-c000.snappy.parquet,part-00000-03fa53af-ea62-4990-883c-6806f29bcff1-c000.snappy.parquet,164673
dbfs:/tmp/sais_eu_19_demo/loans_delta/part-00000-1661aaca-96df-4655-97b0-ce2926578941-c000.snappy.parquet,part-00000-1661aaca-96df-4655-97b0-ce2926578941-c000.snappy.parquet,1728
dbfs:/tmp/sais_eu_19_demo/loans_delta/part-00000-29d2cacc-a450-4004-beb5-a253ee37623f-c000.snappy.parquet,part-00000-29d2cacc-a450-4004-beb5-a253ee37623f-c000.snappy.parquet,1726
dbfs:/tmp/sais_eu_19_demo/loans_delta/part-00000-390eb5c1-711d-4435-b3d8-3f765721fd3e-c000.snappy.parquet,part-00000-390eb5c1-711d-4435-b3d8-3f765721fd3e-c000.snappy.parquet,1719
dbfs:/tmp/sais_eu_19_demo/loans_delta/part-00000-3e842c21-b524-459e-8489-277378550029-c000.snappy.parquet,part-00000-3e842c21-b524-459e-8489-277378550029-c000.snappy.parquet,1724
dbfs:/tmp/sais_eu_19_demo/loans_delta/part-00000-4722b3f4-3bd3-4991-807a-cb66a843a474-c000.snappy.parquet,part-00000-4722b3f4-3bd3-4991-807a-cb66a843a474-c000.snappy.parquet,554
dbfs:/tmp/sais_eu_19_demo/loans_delta/part-00000-4e783ae9-5ffc-4c88-85bd-368d65223588-c000.snappy.parquet,part-00000-4e783ae9-5ffc-4c88-85bd-368d65223588-c000.snappy.parquet,1725
dbfs:/tmp/sais_eu_19_demo/loans_delta/part-00000-4ed53315-4a76-4870-ac85-1fa47ce1cf35-c000.snappy.parquet,part-00000-4ed53315-4a76-4870-ac85-1fa47ce1cf35-c000.snappy.parquet,1724
dbfs:/tmp/sais_eu_19_demo/loans_delta/part-00000-4fd1a90c-0f9f-4f5a-9574-fcd71c0be639-c000.snappy.parquet,part-00000-4fd1a90c-0f9f-4f5a-9574-fcd71c0be639-c000.snappy.parquet,1729


In [48]:
%fs ls /tmp/sais_eu_19_demo/loans_delta/_delta_log/

path,name,size
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/.s3-optimization-0,.s3-optimization-0,0
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/.s3-optimization-1,.s3-optimization-1,0
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/.s3-optimization-2,.s3-optimization-2,0
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/00000000000000000000.crc,00000000000000000000.crc,91
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/00000000000000000000.json,00000000000000000000.json,1396
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/00000000000000000001.crc,00000000000000000001.crc,91
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/00000000000000000001.json,00000000000000000001.json,731
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/00000000000000000002.crc,00000000000000000002.crc,92
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/00000000000000000002.json,00000000000000000002.json,8480
dbfs:/tmp/sais_eu_19_demo/loans_delta/_delta_log/00000000000000000003.crc,00000000000000000003.crc,92


In [49]:
%sh
head /dbfs/tmp/sais_eu_19_demo/loans_delta/_delta_log/00000000000000000026.json

In [50]:
%sql
describe history delta.`/tmp/sais_eu_19_demo/loans_delta`

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
31,2020-03-12T17:41:11.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 05928668-07f5-4bde-bd41-b7fe48662d5f, epochId -> 17)",,List(5918501),0127-045215-pined152,30.0,WriteSerializable,True,
30,2020-03-12T17:41:03.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 05928668-07f5-4bde-bd41-b7fe48662d5f, epochId -> 16)",,List(5918501),0127-045215-pined152,28.0,WriteSerializable,True,
29,2020-03-12T17:41:02.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7dd716af-b214-4990-a725-a2a5ba37f230, epochId -> 12)",,List(5918501),0127-045215-pined152,28.0,WriteSerializable,True,
28,2020-03-12T17:40:54.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 05928668-07f5-4bde-bd41-b7fe48662d5f, epochId -> 15)",,List(5918501),0127-045215-pined152,26.0,WriteSerializable,True,
27,2020-03-12T17:40:51.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7dd716af-b214-4990-a725-a2a5ba37f230, epochId -> 11)",,List(5918501),0127-045215-pined152,26.0,WriteSerializable,True,
26,2020-03-12T17:40:43.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 05928668-07f5-4bde-bd41-b7fe48662d5f, epochId -> 14)",,List(5918501),0127-045215-pined152,24.0,WriteSerializable,True,
25,2020-03-12T17:40:41.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7dd716af-b214-4990-a725-a2a5ba37f230, epochId -> 10)",,List(5918501),0127-045215-pined152,24.0,WriteSerializable,True,
24,2020-03-12T17:40:33.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 05928668-07f5-4bde-bd41-b7fe48662d5f, epochId -> 13)",,List(5918501),0127-045215-pined152,22.0,WriteSerializable,True,
23,2020-03-12T17:40:31.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7dd716af-b214-4990-a725-a2a5ba37f230, epochId -> 9)",,List(5918501),0127-045215-pined152,22.0,WriteSerializable,True,
22,2020-03-12T17:40:24.000+0000,100599,denny.lee@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 7dd716af-b214-4990-a725-a2a5ba37f230, epochId -> 8)",,List(5918501),0127-045215-pined152,20.0,WriteSerializable,True,


In [51]:
%python
(spark.read.format("delta") \
  .option("versionAsOf", 0) \
  .load(delta_path)
  .count())

**We see the same number of fully paid loans that we had seen before delete.**

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Rollback
With Delta Lake’s time travel feature allows you to rollback to a previous versions of the table.

You can query by using either a timestamp or a version number using Python, Scala, and/or SQL syntax. For this examples we will query a specific version using the Python syntax.  For more information, refer to [Introducing Delta Time Travel for Large Scale Data Lakes](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html) and the [docs](https://docs.delta.io/latest/delta-batch.html#deltatimetravel).

**Let's query the table and get the fully paid loans back**

In [55]:
# Count before rollback
spark.sql("SELECT COUNT(1) FROM loans_delta").show()

In [56]:
# Count of previous Version
spark.sql("SELECT COUNT(*) FROM loans_delta_pre_delete").show()

In [57]:
%scala
# Rollback
spark.read.format("delta")
  .option("versionAsOf", previousVersion) 
  .load(delta_path) 
  .write.format("delta") 
  .mode("overwrite") 
  .save(delta_path)

In [58]:
%python
# Rollback
spark.read.format("delta") \
  .option("versionAsOf", previousVersion) \
  .load(delta_path) \
  .write.format("delta") \
  .mode("overwrite") \
  .save(delta_path)

In [59]:
%python
(spark.read.format("delta") \
  .option("versionAsOf", 19) \
  .load(delta_path)
  .count())

In [60]:
# Count after rollback
spark.sql("SELECT COUNT(1) FROM loans_delta").show()

In [61]:
# Deleted data is back
spark.sql("SELECT COUNT(1) FROM loans_delta WHERE funded_amnt = paid_amnt").show()

In [62]:
deltaTable.history().show()

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Vacuum old versions of Delta Lake tables

While it's nice to be able to time travel to any previous version, sometimes you want actually delete the data from storage completely for reducing storage costs or for compliance reasons (example, GDPR).
The Vacuum operation deletes data files that have been removed from the table for a certain amount of time. For more information, check out the [docs](https://docs.delta.io/latest/delta-utility.html#vacuum).

By default, `vacuum()` retains all the data needed for the last 7 days. For this example, since this table does not have 7 days worth of history, we will retain 0 hours, which means to only keep the latest state of the table.

In [65]:
%sh 
ls /dbfs/tmp/sais_eu_19_demo/loans_delta/*.parquet | wc -l

In [66]:
spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = false")
deltaTable.vacuum(retentionHours = 0)

In [67]:
%sh 
ls /dbfs/tmp/sais_eu_19_demo/loans_delta/*.parquet | wc -l

**Same query as before, but it now fails**

In [69]:
spark.read.format("delta").option("versionAsOf", previousVersion).load(delta_path).createOrReplaceTempView("loans_delta_pre_delete")
spark.sql("SELECT COUNT(*) FROM loans_delta_pre_delete WHERE funded_amnt = paid_amnt").show()

##  ![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Upsert into Delta Lake table using Merge
You can upsert data from an Apache Spark DataFrame into a Delta Lake table using the merge operation. This operation is similar to the SQL MERGE command but has additional support for deletes and extra conditions in updates, inserts, and deletes. For more information checkout the [docs](https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge).

#### Upsert with Parquet: 7-step process

With a legacy data pipeline, to insert or update a table, you must:
1. Identify the new rows to be inserted
2. Identify the rows that will be replaced (i.e. updated)
3. Identify all of the rows that are not impacted by the insert or update
4. Create a new temp based on all three insert statements
5. Delete the original table (and all of those associated files)
6. "Rename" the temp table back to the original table name
7. Drop the temp table

![](https://pages.databricks.com/rs/094-YMS-629/images/merge-into-legacy.gif)


#### Upsert using with Delta Lake

1-step process: 
1. [Use `Merge` operation](https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge)

In [72]:
%fs ls /tmp/sais_eu_19_demo/

In [73]:
spark.read.format("parquet").load(parquet_path).where("loan_id < 3").show()

In [74]:
%scala
spark.sql("set spark.sql.shuffle.partitions = 1")
spark.sql("set spark.databricks.delta.snapshotPartitions = 1")

// Configure Delta Lake Silver Path
val deltaSmallPath = "/sais_eu_19_demo/loans_delta_small"

// Remove folder if it exists
println("Deleting directory " + deltaSmallPath)
dbutils.fs.rm(deltaSmallPath, recurse=true)

// Create the Delta table with the same loans data
spark.read.format("parquet").load(parquetPath)
  .where("loan_id < 3")
  .write.format("delta").save(deltaSmallPath)
println("Created a Delta table at " + deltaSmallPath)

spark.read.format("delta").load(deltaSmallPath).createOrReplaceTempView("loans_delta_small")
println("Defined view 'loans_delta_small'")

In [75]:
%python
spark.sql("set spark.sql.shuffle.partitions = 1")
spark.sql("set spark.databricks.delta.snapshotPartitions = 1")

# Configure Delta Lake Silver Path
delta_small_path = "/tmp/sais_eu_19_demo/loans_delta_small"

# Remove folder if it exists
print("Deleting directory " + delta_small_path)
dbutils.fs.rm(delta_small_path, recurse=True)

# Create the Delta table with the same loans data
spark.read.format("parquet").load(parquet_path) \
  .where("loan_id < 3") \
  .write.format("delta").save(delta_small_path)
print("Created a Delta table at " + delta_small_path)

spark.read.format("delta").load(delta_small_path).createOrReplaceTempView("loans_delta_small")
print("Defined view 'loans_delta_small'")

#### Let's focus only on a part of the loans_delta table

In [77]:
spark.sql("select * from loans_delta_small order by loan_id").show()

**Now, let's say we got some new loan information**
1. Duplicate loan_id = 1 was added to the change table due to a delay in processing
1. Existing loan_id = 2 has been fully repaid. The corresponding row needs to be updated.
1. New loan_id = 3 has been funded in CA. This is need to be inserted as a new row.

In [79]:
%scala
val loanUpdates = Seq(
  (1, 1000, 361.19, "WA"), // duplicate information    
  (2, 1000, 1000.0, "TX"), // existing loan's paid_amnt updated, loan paid in full
  (3, 2000, 0.0, "CA"))    // new loan details
  .toDF("loan_id", "funded_amnt", "paid_amnt", "addr_state")

loanUpdates.show()

In [80]:
%python
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state']
items = [
  (1, 1000, 361.19, 'WA'), # duplicate information  
  (2, 1000, 1000.0, 'TX'), # existing loan's paid_amnt updated, loan paid in full
  (3, 2000, 0.0, 'CA')     # new loan details
]

loan_updates = spark.createDataFrame(items, cols)

loan_updates.show()

**Merge can upsert this in a single atomic operation.**

SQL `MERGE` command can do both `UPDATE` and `INSERT`.

```

MERGE INTO target t
USING source s
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ....
```

Since Apache Spark's SQL parser does not have support for parsing MERGE SQL command, we have provided programmatic APIs in Python to perform the same operation with the same semantics as the SQL command.

In [82]:
%scala
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forPath(spark, deltaSmallPath)

deltaTable.alias("t").merge(
  loanUpdates.alias("s"), 
  "t.loan_id = s.loan_id")
  .whenMatched.updateAll()
  .whenNotMatched.insertAll()
  .execute()

In [83]:
%python
from delta.tables import *

delta_table = DeltaTable.forPath(spark, delta_small_path)

(delta_table.alias("t").merge(
    loan_updates.alias("s"), 
    "t.loan_id = s.loan_id") 
  .whenMatchedUpdateAll() 
  .whenNotMatchedInsertAll() 
  .execute())

In [84]:
spark.sql("select * from loans_delta_small order by loan_id").show()

**Note the changes in the table**
- Existing loan_id = 2 should have been updated with paid_amnt set to 1000. 
- New loan_id = 3 have been inserted.

<img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>
<br/>
## Tutorial Summary

#### Full support for batch and streaming workloads
* Delta Lake allows batch and streaming workloads to concurrently read and write to Delta Lake tables with full ACID transactional guarantees.

#### Schema enforcement and schema evolution
* Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption.

#### Table History and Time Travel
* Delta Lake transaction log records details about every change made to data providing a full audit trail of the changes. 
* You can query previous snapshots of data enabling developers to access and revert to earlier versions of data for audits, rollbacks or to reproduce experiments.

#### Delete data and Vacuum old versions
* Delete data from tables using a predicate.
* Fully remove data from previous versions using Vaccum to save storage and satisfy compliance requirements.

#### Upsert data using Merge
* Upsert data into tables from batch and streaming workloads
* Use extended merge syntax for advanced usecases like data deduplication, change data capture, SCD type 2 operations, etc.

## Join the community!


* [Delta Lake on GitHub](https://github.com/delta-io/delta)
* [Delta Lake Slack Channel](https://delta-users.slack.com/) ([Registration Link](https://join.slack.com/t/delta-users/shared_invite/enQtNTY1NDg0ODcxOTI1LWJkZGU3ZmQ3MjkzNmY2ZDM0NjNlYjE4MWIzYjg2OWM1OTBmMWIxZTllMjg3ZmJkNjIwZmE1ZTZkMmQ0OTk5ZjA))
* [Public Mailing List](https://groups.google.com/forum/#!forum/delta-users)