## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Diving into Delta Lake

<!-- You can run this notebook in a Databricks environment. Specifically, this notebook has been designed to run in [Databricks Community Edition](http://community.cloud.databricks.com/) as well. -->
To run this notebook, you have to [create a cluster](https://docs.databricks.com/clusters/create.html) with version **Databricks Runtime 7.4 or later** and [attach this notebook](https://docs.databricks.com/notebooks/notebooks-manage.html#attach-a-notebook-to-a-cluster) to that cluster. <br/>

### Source Data for this notebook
The data used is a modified version of the public data from [Lending Club](https://www.kaggle.com/wendykan/lending-club-loan-data). It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data please view the data dictionary available [here](https://resources.lendingclub.com/LCDataDictionary.xlsx).

[link to tutorial](https://databricks.com/discover/demos/delta-lake)

In [0]:
db = "deltadb"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql(f"USE {db}")

spark.sql("SET spark.databricks.delta.formatCheck.enabled = false")
spark.sql("SET spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true")

In [0]:
import random
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *


def my_checkpoint_dir(): 
  return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000))

# User-defined function to generate random state
@udf(returnType=StringType())
def random_state():
  return str(random.choice(["CA", "TX", "NY", "WA"]))


# Function to start a streaming query with a stream of randomly generated load data and append to the parquet table
def generate_and_append_data_stream(table_format, table_name, schema_ok=False, type="batch"):
  
  stream_data = (spark.readStream.format("rate").option("rowsPerSecond", 500).load()
    .withColumn("loan_id", 10000 + col("value"))
    .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer"))
    .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000))
    .withColumn("addr_state", random_state())
    .withColumn("type", lit(type)))
    
  if schema_ok:
    stream_data = stream_data.select("loan_id", "funded_amnt", "paid_amnt", "addr_state", "type", "timestamp")
      
  query = (stream_data.writeStream
    .format(table_format)
    .option("checkpointLocation", my_checkpoint_dir())
    .trigger(processingTime = "5 seconds")
    .table(table_name))

  return query

In [0]:
# Function to stop all streaming queries 
def stop_all_streams():
    print("Stopping all streams")
    for s in spark.streams.active:
        try:
            s.stop()
        except:
            pass
    print("Stopped all streams")
    dbutils.fs.rm("/tmp/delta_demo/chkpt/", True)


def cleanup_paths_and_tables():
    dbutils.fs.rm("/tmp/delta_demo/", True)
    dbutils.fs.rm("file:/dbfs/tmp/delta_demo/loans_parquet/", True)
        
    for table in ["deltadb.loans_parquet", "deltadb.loans_delta", "deltadb.loans_delta2"]:
        spark.sql(f"DROP TABLE IF EXISTS {table}")
    
cleanup_paths_and_tables()

In [0]:
%sh mkdir -p /dbfs/tmp/delta_demo/loans_parquet/; wget -O /dbfs/tmp/delta_demo/loans_parquet/loans.parquet https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet

# Getting started with <img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>

An open-source storage layer for data lakes that brings ACID transactions to Apache Spark™ and big data workloads.

* **ACID Transactions**: Ensures data integrity and read consistency with complex, concurrent data pipelines.
* **Unified Batch and Streaming Source and Sink**: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box. 
* **Schema Enforcement and Evolution**: Ensures data cleanliness by blocking writes with unexpected.
* **Time Travel**: Query previous versions of the table by time or version number.
* **Deletes and upserts**: Supports deleting and upserting into tables with programmatic APIs.
* **Open Format**: Stored as Parquet format in blob storage.
* **Audit History**: History of all the operations that happened in the table.
* **Scalable Metadata management**: Able to handle millions of files are scaling the metadata operations with Spark.

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Convert to Delta Lake format

Delta Lake is 100% compatible with Apache Spark&trade;, which makes it easy to get started with if you already use Spark for your big data workflows.
Delta Lake features APIs for **SQL**, **Python**, and **Scala**, so that you can use it in whatever language you feel most comfortable in.

<img src="https://databricks.com/wp-content/uploads/2020/12/simplysaydelta.png" width=600/>

In **Python**: Read your data into a Spark DataFrame, then write it out in Delta Lake format directly, with no upfront schema definition needed.

In [0]:
parquet_path = "file:/dbfs/tmp/delta_demo/loans_parquet/"

df = (spark.read.format("parquet").load(parquet_path)
      .withColumn("type", lit("batch"))
      .withColumn("timestamp", current_timestamp()))

df.write.format("delta").mode("overwrite").saveAsTable("loans_delta3")

**SQL:** Use `CREATE TABLE` statement with SQL (no upfront schema definition needed)

In [0]:
%sql
CREATE TABLE loans_delta4
USING delta
AS SELECT * FROM parquet.`/tmp/delta_demo/loans_parquet`

**SQL**: Use `CONVERT TO DELTA` to convert Parquet files to Delta Lake format in place

In [0]:
%sql CONVERT TO DELTA parquet.`/tmp/delta_demo/loans_parquet`

### View the data in the Delta Lake table
**How many records are there, and what does the data look like?**

In [0]:
spark.sql("select count(*) from loans_delta3").show()
spark.sql("select * from loans_delta3").show(3)

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Unified batch + streaming data processing with multiple concurrent readers and writers

### Write 2 different data streams into our Delta Lake table at the same time.

In [0]:
# Set up 2 streaming writes to our table
stream_query_A = generate_and_append_data_stream(table_format="delta", table_name="loans_delta", schema_ok=True, type='stream A')
stream_query_B = generate_and_append_data_stream(table_format="delta", table_name="loans_delta", schema_ok=True, type='stream B')

### Create 2 continuous streaming readers of our Delta Lake table to illustrate streaming progress.

In [0]:
# Streaming read #1
display(spark.readStream.format("delta").table("loans_delta").groupBy("type").count().orderBy("type"))

In [0]:
# Streaming read #2
display(spark.readStream.format("delta").table("loans_delta").groupBy("type", window("timestamp", "10 seconds")).count().orderBy("window"))

### Add a batch query, just for good measure

In [0]:
%sql
SELECT addr_state, COUNT(*)
FROM loans_delta
GROUP BY addr_state

In [0]:
dbutils.notebook.exit("stop")

In [0]:
stop_all_streams()

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) ACID Transactions

View the Delta Lake transaction log

In [0]:
%sql DESCRIBE HISTORY loans_delta

<img src="https://databricks.com/wp-content/uploads/2020/09/delta-lake-medallion-model-scaled.jpg" width=1012/>

##  ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Use Schema Enforcement to protect data quality

To show you how schema enforcement works, let's create a new table that has an extra column -- `credit_score` -- that doesn't match our existing Delta Lake table schema.

#### Write DataFrame with extra column, `credit_score`, to Delta Lake table

In [0]:
# Generate `new_data` with additional column
new_column = [StructField("credit_score", IntegerType(), True)]
new_schema = StructType(spark.table("loans_delta").schema.fields + new_column)
data = [(99997, 10000, 1338.55, "CA", "batch", datetime.now(), 649),
        (99998, 20000, 1442.55, "NY", "batch", datetime.now(), 702)]

new_data = spark.createDataFrame(data, new_schema)
new_data.printSchema()

In [0]:
# Uncommenting this cell will lead to an error because the schemas don't match.
# Attempt to write data with new column to Delta Lake table
new_data.write.format("delta").mode("append").saveAsTable("loans_delta")

**Schema enforcement helps keep our tables clean and tidy so that we can trust the data we have stored in Delta Lake.** The writes above were blocked because the schema of the new data did not match the schema of table (see the exception details). See more information about how it works [here](https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html).

##  ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Use Schema Evolution to add new columns to schema

If we *want* to update our Delta Lake table to match this data source's schema, we can do so using schema evolution. Simply add the following to the Spark write command: `.option("mergeSchema", "true")`

In [0]:
new_data.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("loans_delta")

In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id IN (99997, 99998)

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Delta Lake Time Travel

Delta Lake’s time travel capabilities simplify building data pipelines for use cases including:

* Auditing Data Changes
* Reproducing experiments & reports
* Rollbacks

As you write into a Delta table or directory, every operation is automatically versioned.

<img src="https://github.com/risan4841/img/blob/master/transactionallogs.png?raw=true" width=250/>

You can query snapshots of your tables by:
1. **Version number**, or
2. **Timestamp.**

using Python, Scala, and/or SQL syntax; for these examples we will use the SQL syntax.  

For more information, refer to the [docs](https://docs.delta.io/latest/delta-utility.html#history), or [Introducing Delta Time Travel for Large Scale Data Lakes](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html)

#### Review Delta Lake Table History for  Auditing & Governance
All the transactions for this table are stored within this table including the initial set of insertions, update, delete, merge, and inserts with schema modification

In [0]:
%sql
DESCRIBE HISTORY loans_delta

#### Use time travel to select and view the original version of our table (Version 0).
As you can see, this version contains the original 14,705 records in it.

In [0]:
spark.sql("SELECT * FROM loans_delta VERSION AS OF 0").show(3)
spark.sql("SELECT COUNT(*) FROM loans_delta VERSION AS OF 0").show()

In [0]:
%sql SELECT COUNT(*) FROM loans_delta

#### Rollback a table to a specific version using `RESTORE`

In [0]:
%sql RESTORE loans_delta VERSION AS OF 0

In [0]:
%sql SELECT COUNT(*) FROM loans_delta

##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Full DML Support: `DELETE`, `UPDATE`, `MERGE INTO`

Delta Lake brings ACID transactions and full DML support to data lakes.

>Parquet does **not** support these commands - they are unique to Delta Lake.

###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) `DELETE`: Handle GDPR or CCPA Requests on your Data Lake

Imagine that we are responding to a GDPR data deletion request. The user with loan ID #4420 wants us to delete their data. Here's how easy it is.

**View the user's data**

In [0]:
%sql
SELECT * FROM loans_delta WHERE loan_id=4420

**Delete the individual user's data with a single `DELETE` command using Delta Lake.**

Note: The `DELETE` command isn't supported in Parquet.

In [0]:
%sql
DELETE FROM loans_delta WHERE loan_id=4420;
-- Confirm the user's data was deleted
SELECT * FROM loans_delta WHERE loan_id=4420

###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Use time travel and `INSERT INTO` to add the user back into our table

In [0]:
%sql
INSERT INTO loans_delta
SELECT * FROM loans_delta VERSION AS OF 0
WHERE loan_id=4420

In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id=4420

### ![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) `UPDATE`: Modify the existing records in a table in one command

In [0]:
%sql UPDATE loans_delta SET funded_amnt = 22000 WHERE loan_id = 4420

In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id = 4420

###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Support Change Data Capture Workflows & Other Ingest Use Cases via `MERGE INTO`

With a legacy data pipeline, to insert or update a table, you must:
1. Identify the new rows to be inserted
2. Identify the rows that will be replaced (i.e. updated)
3. Identify all of the rows that are not impacted by the insert or update
4. Create a new temp based on all three insert statements
5. Delete the original table (and all of those associated files)
6. "Rename" the temp table back to the original table name
7. Drop the temp table

<img src="https://pages.databricks.com/rs/094-YMS-629/images/merge-into-legacy.gif" alt='Merge process' width=600/>


#### INSERT or UPDATE with Delta Lake

2-step process: 
1. Identify rows to insert or update
2. Use `MERGE`

In [0]:
# Create merge table with 1 row update, 1 insertion
data = [(4420, 22000, 21500.00, "NY", "update", datetime.now()),  # record to update
        (99999, 10000, 1338.55, "CA", "insert", datetime.now())]  # record to insert
schema = spark.table("loans_delta").schema
spark.createDataFrame(data, schema).createOrReplaceTempView("merge_table")
spark.sql("SELECT * FROM merge_table").show()

In [0]:
%sql
MERGE INTO loans_delta AS l
USING merge_table AS m
ON l.loan_id = m.loan_id
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *;

In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id IN (4420, 99999)

## ![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) File compaction and performance optimizations = faster queries

### Vacuum

In [0]:
%sql
-- Vacuum deletes all files no longer needed by the current version of the table.
VACUUM loans_delta

### <img src="https://pages.databricks.com/rs/094-YMS-629/images/dbsquare.png" width=30/> Cache table in memory (Databricks Delta Lake only)

In [0]:
%sql CACHE SELECT * FROM loans_delta

### <img src="https://pages.databricks.com/rs/094-YMS-629/images/dbsquare.png" width=30/> Z-Order Optimize (Databricks Delta Lake only)

In [0]:
%sql OPTIMIZE loans_delta ZORDER BY addr_state

In [0]:
cleanup_paths_and_tables()