#Welcome to Databricks
</br>
<img src="https://cdn.sanity.io/images/92ui5egz/production/7c1c60e9afaaaa3cce61e5101717796d21b7f914-1426x1080.svg?auto=format" width=200/>
</br>

### We will begin by discussing the foundation of the Databricks Lakehouse, Delta Lake!

# What can you do with Delta Lake?
</br>
<img src="https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-logo-whitebackground.png" width=200/>

Delta Lake is an <a href="https://delta.io/" target="_blank">open-source</a> storage layer that brings Reliability and increased Performance to Apache Spark™ and big data workloads. 

## In this notebook, we will create a simple multi-step data pipeline by demonstrating the following features:

0. Reliable batch and streaming capability.
0. Full DML support for Appends, Deletes, Updates and Merge statements.
0. A <a href="https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" target="_black"> Schema evolution </a> and enforcement scenario.
0. Review the <a href="https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html" target="_black"> Time Travel</a> which automatically versions the data that you store in your data lake.
0. Explore some of the key Delta Lake's <a href="https://docs.databricks.com/delta/optimizations/index.html" target="_black">Optimization features</a>.
0. ...and more!



* This notebook was last tested with *Databricks Runtime: 10.4 LTS, Python 3*

# Build a Reliable Data Lake at Scale

<img src="https://pages.databricks.com/rs/094-YMS-629/images/Delta Lake Light BG.png" alt='Make all your data ready for BI and ML' width=1000/> 

## With ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Delta Lake Optimizations

<img src="https://pages.databricks.com/rs/094-YMS-629/images/Delta medallion icon.png" alt='Make all your data ready for BI and ML' width=1000/>

## The Data

The data used is **retail** data. Let's see how we can get access to this!

### Mount an ADLS directory

In [0]:
BLOB_CONTAINER = ""
BLOB_ACCOUNT = ""
ACCOUNT_KEY = ""

In [0]:
DIRECTORY = "/"
MOUNT_PATH = "/mnt/demo"

dbutils.fs.mount(
  source = f"wasbs://{BLOB_CONTAINER}@{BLOB_ACCOUNT}.blob.core.windows.net/",
  mount_point = MOUNT_PATH,
  extra_configs = {
    f"fs.azure.account.key.{BLOB_ACCOUNT}.blob.core.windows.net":ACCOUNT_KEY
  }
)

##### Use the below to clean up resources/data from previous runs of this notebook.

In [0]:
%sql
DROP TABLE IF EXISTS demo.bronze_retail_delta

In [0]:
%sql
DROP TABLE IF EXISTS demo.silver_retail_delta

In [0]:
%sql
DROP TABLE IF EXISTS demo.gold_retail_delta

In [0]:
%fs rm -r dbfs:/mnt/demo/bronze_retail_delta/

In [0]:
%fs rm -r dbfs:/mnt/demo/silver_retail_delta/

In [0]:
%fs rm -r dbfs:/mnt/demo/gold_retail_delta/

##### Cleanup code ends here.

In [0]:
base_data_path = "dbfs:/mnt/demo/"
ingest_data_path = base_data_path + "online_retail_data.csv"
retail_df = spark.read.option("header", "true").csv(ingest_data_path)
display(retail_df)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/10 8:26,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/10 8:26,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/10 8:26,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/10 8:26,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/10 8:26,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/10 8:26,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/10 8:26,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/10 8:28,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/10 8:28,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/10 8:34,1.69,13047.0,United Kingdom


## A few housekeeping tasks

In [0]:
%sql
show databases;

databaseName
default
demo


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS demo;
USE demo;

In [0]:
# Create table 
retail_df.createOrReplaceTempView("retail_tmp_table")

### ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Creating a Delta Lake Table is simple as ...


<img src="https://databricks.com/wp-content/uploads/2020/02/simplysaydelta.png" width=600/>

In [0]:
%sql
DROP TABLE IF EXISTS bronze_retail_delta;

CREATE TABLE bronze_retail_delta
USING delta
LOCATION "dbfs:/mnt/demo/bronze_retail_delta/"
AS SELECT * FROM retail_tmp_table;

-- View bronze Delta Lake table
SELECT * FROM bronze_retail_delta

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/10 8:26,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/10 8:26,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/10 8:26,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/10 8:26,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/10 8:26,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/10 8:26,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/10 8:26,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/10 8:28,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/10 8:28,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/10 8:34,1.69,13047.0,United Kingdom


In [0]:
%sql 
DESCRIBE DETAIL bronze_retail_delta

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,61fbae11-cd1f-432b-bbab-c38d09181463,demo.bronze_retail_delta,,dbfs:/mnt/demo/bronze_retail_delta,2022-03-30T15:31:39.171+0000,2022-03-30T15:31:46.000+0000,List(),4,3631992,Map(),1,2


In [0]:
bronze_delta_path = "dbfs:/mnt/demo/bronze_retail_delta/"

dbutils.fs.ls(bronze_delta_path)

In [0]:
dbutils.fs.ls(bronze_delta_path + "_delta_log/")

In [0]:
display(spark.read.json(bronze_delta_path +"_delta_log/00000000000000000000.json"))

add,commitInfo,metaData,protocol
,,,"List(1, 2)"
,,"List(1648654299171, List(parquet), 61fbae11-cd1f-432b-bbab-c38d09181463, List(), {""type"":""struct"",""fields"":[{""name"":""InvoiceNo"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""StockCode"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""Description"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""Quantity"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""InvoiceDate"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""UnitPrice"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""CustomerID"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""Country"",""type"":""string"",""nullable"":true,""metadata"":{}}]})",
"List(true, 1648654304000, part-00000-db9d4392-db0a-42a3-aea0-8064f487f3fd-c000.snappy.parquet, 996759, {""numRecords"":149091,""minValues"":{""InvoiceNo"":""536365"",""StockCode"":""10002"",""Description"":"" 4 PURPLE FLOCK DINNER CANDLES"",""Quantity"":""-1"",""InvoiceDate"":""1/10/11 10:04"",""UnitPrice"":""0"",""CustomerID"":""12346"",""Country"":""Australia""},""maxValues"":{""InvoiceNo"":""C549269"",""StockCode"":""m"",""Description"":""wrongly sold sets"",""Quantity"":""99"",""InvoiceDate"":""4/7/11 9:39"",""UnitPrice"":""966.92"",""CustomerID"":""18283"",""Country"":""United Kingdom""},""nullCount"":{""InvoiceNo"":0,""StockCode"":0,""Description"":573,""Quantity"":0,""InvoiceDate"":0,""UnitPrice"":0,""CustomerID"":46285,""Country"":0}}, List(1648654304000000, 268435456))",,,
"List(true, 1648654304000, part-00001-b56e4e1d-c468-49a3-8d1d-6f5db40d323c-c000.snappy.parquet, 1017721, {""numRecords"":149548,""minValues"":{""InvoiceNo"":""549275"",""StockCode"":""10002"",""Description"":"" 4 PURPLE FLOCK DINNER CANDLES"",""Quantity"":""-1"",""InvoiceDate"":""4/10/11 10:10"",""UnitPrice"":""0"",""CustomerID"":""12347"",""Country"":""Australia""},""maxValues"":{""InvoiceNo"":""C562992"",""StockCode"":""gift_0001_50"",""Description"":""wrongly marked. 23343 in box"",""Quantity"":""98"",""InvoiceDate"":""8/9/11 9:41"",""UnitPrice"":""99.96"",""CustomerID"":""18287"",""Country"":""Unspecified""},""nullCount"":{""InvoiceNo"":0,""StockCode"":0,""Description"":530,""Quantity"":0,""InvoiceDate"":0,""UnitPrice"":0,""CustomerID"":37642,""Country"":0}}, List(1648654304000001, 268435456))",,,
"List(true, 1648654304000, part-00002-19aa48e3-06c4-4018-88f6-18da5d413a39-c000.snappy.parquet, 978137, {""numRecords"":147143,""minValues"":{""InvoiceNo"":""563031"",""StockCode"":""10080"",""Description"":"" 4 PURPLE FLOCK DINNER CANDLES"",""Quantity"":""-1"",""InvoiceDate"":""10/10/11 10:00"",""UnitPrice"":""-11062.06"",""CustomerID"":""12347"",""Country"":""Australia""},""maxValues"":{""InvoiceNo"":""C574850"",""StockCode"":""gift_0001_30"",""Description"":""wrongly marked 23343"",""Quantity"":""992"",""InvoiceDate"":""9/9/11 9:52"",""UnitPrice"":""98.79"",""CustomerID"":""18287"",""Country"":""Unspecified""},""nullCount"":{""InvoiceNo"":0,""StockCode"":0,""Description"":268,""Quantity"":0,""InvoiceDate"":0,""UnitPrice"":0,""CustomerID"":26092,""Country"":0}}, List(1648654304000002, 268435456))",,,
"List(true, 1648654304000, part-00003-7eae1417-d283-46a8-be55-ef60f08617c5-c000.snappy.parquet, 639375, {""numRecords"":96127,""minValues"":{""InvoiceNo"":""574862"",""StockCode"":""10080"",""Description"":"" 4 PURPLE FLOCK DINNER CANDLES"",""Quantity"":""-1"",""InvoiceDate"":""11/10/11 10:10"",""UnitPrice"":""0"",""CustomerID"":""12347"",""Country"":""Australia""},""maxValues"":{""InvoiceNo"":""C581569"",""StockCode"":""gift_0001_10"",""Description"":""wrongly marked carton 22804"",""Quantity"":""99"",""InvoiceDate"":""12/9/11 9:57"",""UnitPrice"":""988"",""CustomerID"":""18283"",""Country"":""Unspecified""},""nullCount"":{""InvoiceNo"":0,""StockCode"":0,""Description"":83,""Quantity"":0,""InvoiceDate"":0,""UnitPrice"":0,""CustomerID"":25061,""Country"":0}}, List(1648654304000003, 268435456))",,,
,"List(0330-150634-qdjwo0t5, Databricks-Runtime/10.4.x-scala2.12, true, WriteSerializable, List(3720359278619703), CREATE TABLE AS SELECT, List(4, 3631992, 541909), List(null, false, [], {}), 1648654305931, 0ed78314-75f6-46ee-9e2f-50bf7edc0211, 4066625165483276, odl_user_577628@databrickslabs.com)",,


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Unified Batch and Streaming Source and Sink

These cells showcase streaming from a Delta table
* This notebook will read from the Bronze Delta table and write to a Silver Delta table

In [0]:
# Read the insertion of data
retail_readStream = spark.readStream.format("delta").load(bronze_delta_path)
retail_readStream.createOrReplaceTempView("retail_readStream")

In [0]:
%sql
select * from retail_readStream where quantity > 10

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
549276,21213,PACK OF 72 SKULL CAKE CASES,24,4/7/11 13:11,0.55,16745.0,United Kingdom
549276,22558,CLOTHES PEGS RETROSPOT PACK 24,20,4/7/11 13:11,1.65,16745.0,United Kingdom
549276,22900,SET 2 TEA TOWELS I LOVE LONDON,15,4/7/11 13:11,3.25,16745.0,United Kingdom
549276,20723,STRAWBERRY CHARLOTTE BAG,30,4/7/11 13:11,0.85,16745.0,United Kingdom
549276,22771,CLEAR DRAWER KNOB ACRYLIC EDWARDIAN,12,4/7/11 13:11,1.25,16745.0,United Kingdom
549278,21498,RED RETROSPOT WRAP,25,4/7/11 13:16,0.42,16255.0,United Kingdom
549278,22197,SMALL POPCORN HOLDER,13,4/7/11 13:16,0.85,16255.0,United Kingdom
549278,22151,PLACE SETTING WHITE HEART,96,4/7/11 13:16,0.42,16255.0,United Kingdom
549280,84789,ENCHANTED BIRD PLANT CAGE,16,4/7/11 13:18,2.95,13854.0,United Kingdom
549280,22423,REGENCY CAKESTAND 3 TIER,16,4/7/11 13:18,10.95,13854.0,United Kingdom


**Wait** until the stream is up and running (map appears) before executing the code below

In [0]:
time.sleep(20)

##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Reliably writes to the Delta table in parallel while streaming

In [0]:
i = 1
while i <= 6:
  # Execute Insert statement
  insert_sql = "INSERT INTO bronze_retail_delta VALUES ('100', null, null, null, null, null, null, null)"
  spark.sql(insert_sql)
  print('bronze retail delta: inserted new row of data, loop: [%s]' % i)
    
  # Loop through
  i = i + 1
  time.sleep(1)

In [0]:
# Stop active stream
for stream in spark.streams.active:
  s = spark.streams.get(stream.id)
  s.stop()

**Verify** that the data exists in the new table

In [0]:
%sql
SELECT 
*
FROM bronze_retail_delta
WHERE InvoiceNo < 115

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
100,,,,,,,
100,,,,,,,
100,,,,,,,
100,,,,,,,
100,,,,,,,
100,,,,,,,


### Write to Silver

</br>

We can now write out to Silver, cleaning up column names as we go.

In [0]:
import pyspark.sql.functions as f

bronze_data = spark.read.format("delta").table("demo.bronze_retail_delta") \
  .withColumn("invoice_no", f.col("InvoiceNo")) \
  .withColumn("stock_code", f.col("StockCode")) \
  .withColumn("description", f.col("Description")) \
  .withColumn("quantity", f.col("Quantity")) \
  .withColumn("invoice_date", f.col("InvoiceDate")) \
  .withColumn("unit_price", f.col("UnitPrice")) \
  .withColumn("customer_id", f.col("CustomerID")) \
  .withColumn("country", f.col("Country")) \
  .select("invoice_no", "stock_code", "description", "quantity", "invoice_date", "unit_price", "customer_id", "country")

bronze_data.write.format("delta").mode("Overwrite").option("path", "dbfs:/mnt/demo/silver_retail_delta/").option("delta.enableChangeDataFeed", True).saveAsTable("demo.silver_retail_delta")

In [0]:
spark.read.table("demo.silver_retail_delta").display()

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
549275,20936,FORKED CACTUS CANDLE,6,4/7/11 13:09,2.95,15916.0,United Kingdom
549275,21457,2 PICTURE BOOK EGGS EASTER DUCKS,8,4/7/11 13:09,1.25,15916.0,United Kingdom
549275,21316,SMALL CHUNKY GLASS ROMAN BOWL,1,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,20801,LARGE PINK GLASS SUNDAE DISH,7,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,48138,DOORMAT UNION FLAG,4,4/7/11 13:09,7.95,15916.0,United Kingdom
549275,22499,WOODEN UNION JACK BUNTING,4,4/7/11 13:09,5.95,15916.0,United Kingdom
549276,21213,PACK OF 72 SKULL CAKE CASES,24,4/7/11 13:11,0.55,16745.0,United Kingdom
549276,22895,SET OF 2 TEA TOWELS APPLE AND PEARS,2,4/7/11 13:11,3.25,16745.0,United Kingdom
549276,22897,OVEN MITT APPLES DESIGN,2,4/7/11 13:11,1.45,16745.0,United Kingdom
549276,22558,CLOTHES PEGS RETROSPOT PACK 24,20,4/7/11 13:11,1.65,16745.0,United Kingdom


##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Full DML Support

Delta Lake supports standard DML including UPDATE, DELETE and MERGE INTO providing developers more controls to manage large datasets.

Let's start by creating a traditional Parquet table

In [0]:
# Clean up old parquet resources, if they exist
dbutils.fs.rm('dbfs:/mnt/demo/silver_retail_parquet/', True)

In [0]:
%sql 
-- Creating a new parquet table
DROP TABLE IF EXISTS silver_retail_parquet;

CREATE TABLE silver_retail_parquet
USING parquet
LOCATION "dbfs:/mnt/demo/silver_retail_parquet/"
AS SELECT * FROM silver_retail_delta;

-- View Parquet table
SELECT * FROM silver_retail_parquet

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
549275,20936,FORKED CACTUS CANDLE,6,4/7/11 13:09,2.95,15916.0,United Kingdom
549275,21457,2 PICTURE BOOK EGGS EASTER DUCKS,8,4/7/11 13:09,1.25,15916.0,United Kingdom
549275,21316,SMALL CHUNKY GLASS ROMAN BOWL,1,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,20801,LARGE PINK GLASS SUNDAE DISH,7,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,48138,DOORMAT UNION FLAG,4,4/7/11 13:09,7.95,15916.0,United Kingdom
549275,22499,WOODEN UNION JACK BUNTING,4,4/7/11 13:09,5.95,15916.0,United Kingdom
549276,21213,PACK OF 72 SKULL CAKE CASES,24,4/7/11 13:11,0.55,16745.0,United Kingdom
549276,22895,SET OF 2 TEA TOWELS APPLE AND PEARS,2,4/7/11 13:11,3.25,16745.0,United Kingdom
549276,22897,OVEN MITT APPLES DESIGN,2,4/7/11 13:11,1.45,16745.0,United Kingdom
549276,22558,CLOTHES PEGS RETROSPOT PACK 24,20,4/7/11 13:11,1.65,16745.0,United Kingdom


In [0]:
%sql
DESCRIBE DETAIL silver_retail_parquet

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
parquet,,demo.silver_retail_parquet,,dbfs:/mnt/demo/silver_retail_parquet,2022-03-30T16:05:46.000+0000,,List(),,,Map(),,


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) DELETE Support

In [0]:
%sql
-- Attempting to run `DELETE` on the Parquet table
DELETE FROM silver_retail_parquet WHERE invoice_no = 100

**Note**: This command fails because the `DELETE` statements are not supported in Parquet, but are supported in Delta Lake.

In [0]:
%sql
-- Running `DELETE` on the Delta Lake table
DELETE FROM silver_retail_delta WHERE invoice_no = 100

num_affected_rows
6


In [0]:
%sql
-- Review silver delta lake table
select * from silver_retail_delta where invoice_no = 100

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) UPDATE Support

In [0]:
%sql
-- Attempting to run `UPDATE` on the Parquet table
UPDATE silver_retail_parquet SET customer_id = -1 WHERE invoice_no = 100

**Note**: This command fails because the `UPDATE` statements are not supported in Parquet, but are supported in Delta Lake.

In [0]:
%sql
-- Running `UPDATE` on the Delta Lake table
INSERT INTO silver_retail_delta VALUES ('100', null, null, null, null, null, null, null);
UPDATE silver_retail_delta SET customer_id = -1 WHERE invoice_no = 100;

num_affected_rows
1


In [0]:
%sql
-- Review silver delta lake table
select * from silver_retail_delta where invoice_no = 100

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
100,,,,,,-1,


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) MERGE INTO Support

#### INSERT or UPDATE parquet: 7-step process

With a legacy data pipeline, to insert or update a table, you must:
1. Identify the new rows to be inserted
2. Identify the rows that will be replaced (i.e. updated)
3. Identify all of the rows that are not impacted by the insert or update
4. Create a new temp based on all three insert statements
5. Delete the original table (and all of those associated files)
6. "Rename" the temp table back to the original table name
7. Drop the temp table

<img src="https://pages.databricks.com/rs/094-YMS-629/images/merge-into-legacy.gif" alt='Merge process' width=700/>


#### INSERT or UPDATE with Delta Lake

2-step process: 
1. Identify rows to insert or update
2. Use `MERGE`

In [0]:
# Let's create a simple table to merge
items = [
  ('549275', '20936', 'null', 'null', 'null', 'null', 'null', 'null'),
  ('100', 'null', 'null', 'null', 'null', 'null', '-2', 'null')
]

cols = [
  "invoice_no", "stock_code", "description", "quantity", "invoice_date", "unit_price", "customer_id", "country"
]

merge_table = spark.createDataFrame(items, cols)
merge_table.createOrReplaceTempView("merge_table")
display(merge_table)

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
549275,20936.0,,,,,,
100,,,,,,-2.0,


Instead of writing separate `INSERT` and `UPDATE` statements, we can use a `MERGE` statement.

In [0]:
from delta.tables import *

target_silver_table = DeltaTable.forName(spark, 'demo.silver_retail_delta')

target_silver_table.alias('target') \
  .merge(
    merge_table.alias('updates'),
    'target.invoice_no <=> updates.invoice_no and target.stock_code <=> updates.stock_code'
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

In [0]:
%sql
-- Here is the SQL syntax for the above
-- MERGE INTO silver_retail_delta as target
-- USING merge_table as updates
-- on target.invoice_no <=> updates.invoice_no and target.stock_code <=> updates.stock_code
-- WHEN MATCHED THEN 
--   UPDATE SET *
-- WHEN NOT MATCHED 
--   THEN INSERT *

In [0]:
%sql
-- Review silver delta lake table
select * from silver_retail_delta where invoice_no = 549275 or invoice_no = 100

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
549275,21457,2 PICTURE BOOK EGGS EASTER DUCKS,8.0,4/7/11 13:09,1.25,15916.0,United Kingdom
549275,21316,SMALL CHUNKY GLASS ROMAN BOWL,1.0,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,20801,LARGE PINK GLASS SUNDAE DISH,7.0,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,48138,DOORMAT UNION FLAG,4.0,4/7/11 13:09,7.95,15916.0,United Kingdom
549275,22499,WOODEN UNION JACK BUNTING,4.0,4/7/11 13:09,5.95,15916.0,United Kingdom
549275,21742,LARGE ROUND WICKER PLATTER,1.0,4/7/11 13:09,5.95,15916.0,United Kingdom
549275,22485,SET OF 2 WOODEN MARKET CRATES,1.0,4/7/11 13:09,12.75,15916.0,United Kingdom
549275,84751B,BLACK MEDIUM GLASS CAKE STAND,2.0,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,21320,GLASS CHALICE GREEN LARGE,11.0,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,22427,ENAMEL FLOWER JUG CREAM,2.0,4/7/11 13:09,5.95,15916.0,United Kingdom


##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Schema Evolution
With the `mergeSchema` option, you can evolve your Delta Lake table schema

In [0]:
# Generate a new column
new_silver_df = spark.read.table("silver_retail_delta") \
  .withColumn("line_item_total", f.round(f.col("quantity") * f.col("unit_price"), 2))

display(new_silver_df)

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country,line_item_total
574862,20668,DISCO BALL CHRISTMAS DECORATION,24,11/7/11 12:27,0.12,12408.0,Belgium,2.88
574862,23345,DOLLY GIRL BEAKER,12,11/7/11 12:27,1.25,12408.0,Belgium,15.0
574862,22725,ALARM CLOCK BAKELIKE CHOCOLATE,16,11/7/11 12:27,3.75,12408.0,Belgium,60.0
574862,22726,ALARM CLOCK BAKELIKE GREEN,16,11/7/11 12:27,3.75,12408.0,Belgium,60.0
574862,22727,ALARM CLOCK BAKELIKE RED,16,11/7/11 12:27,3.75,12408.0,Belgium,60.0
574862,22898,CHILDRENS APRON APPLES DESIGN,8,11/7/11 12:27,1.95,12408.0,Belgium,15.6
574862,23115,RED APPLES CHOPPING BOARD,9,11/7/11 12:27,4.95,12408.0,Belgium,44.55
574862,23114,VINTAGE LEAF CHOPPING BOARD,9,11/7/11 12:27,4.95,12408.0,Belgium,44.55
574862,23206,LUNCH BAG APPLE DESIGN,10,11/7/11 12:27,1.65,12408.0,Belgium,16.5
574862,23208,LUNCH BAG VINTAGE LEAF DESIGN,10,11/7/11 12:27,1.65,12408.0,Belgium,16.5


In [0]:
# Let's write this data out to our Delta table
new_silver_df.write.format("delta").mode("Overwrite").save("dbfs:/mnt/demo/silver_retail_delta/")

**Note**: This command fails because the schema of our new data does not match the schema of our original data

In [0]:
# Add the mergeSchema option
new_silver_df.write.option("mergeSchema","true").format("delta").mode("Overwrite").save("dbfs:/mnt/demo/silver_retail_delta/")

**Note**: With the `mergeSchema` option, we can merge these different schemas together.

In [0]:
%sql
-- Review silver delta lake table
select * from silver_retail_delta 

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country,line_item_total
549275,21457,2 PICTURE BOOK EGGS EASTER DUCKS,8,4/7/11 13:09,1.25,15916.0,United Kingdom,10.0
549275,21316,SMALL CHUNKY GLASS ROMAN BOWL,1,4/7/11 13:09,0.79,15916.0,United Kingdom,0.79
549275,20801,LARGE PINK GLASS SUNDAE DISH,7,4/7/11 13:09,0.79,15916.0,United Kingdom,5.53
549275,48138,DOORMAT UNION FLAG,4,4/7/11 13:09,7.95,15916.0,United Kingdom,31.8
549275,22499,WOODEN UNION JACK BUNTING,4,4/7/11 13:09,5.95,15916.0,United Kingdom,23.8
549276,21213,PACK OF 72 SKULL CAKE CASES,24,4/7/11 13:11,0.55,16745.0,United Kingdom,13.2
549276,22895,SET OF 2 TEA TOWELS APPLE AND PEARS,2,4/7/11 13:11,3.25,16745.0,United Kingdom,6.5
549276,22897,OVEN MITT APPLES DESIGN,2,4/7/11 13:11,1.45,16745.0,United Kingdom,2.9
549276,22558,CLOTHES PEGS RETROSPOT PACK 24,20,4/7/11 13:11,1.65,16745.0,United Kingdom,33.0
549276,85135B,BLUE DRAGONFLY HELICOPTER,2,4/7/11 13:11,7.95,16745.0,United Kingdom,15.9


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Let's Travel back in Time!
Databricks Delta’s time travel capabilities simplify building data pipelines for the following use cases. 

* Audit Data Changes
* Reproduce experiments & reports
* Rollbacks

As you write into a Delta table or directory, every operation is automatically versioned.

<img src="https://pages.databricks.com/rs/094-YMS-629/images/timetravel.png?raw=true" width=250/>

You can query by:
1. Using a timestamp
1. Using a version number

using Python, Scala, and/or Scala syntax; for these examples we will use the SQL syntax.  

For more information, refer to [Introducing Delta Time Travel for Large Scale Data Lakes](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html)

In [0]:
%sql
DESCRIBE HISTORY silver_retail_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2022-03-30T16:08:43.000+0000,4066625165483276,odl_user_577628@databrickslabs.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(3720359278619703),0330-150634-qdjwo0t5,4.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 541911, numOutputBytes -> 4423444)",,Databricks-Runtime/10.4.x-scala2.12
4,2022-03-30T16:06:52.000+0000,4066625165483276,odl_user_577628@databrickslabs.com,MERGE,"Map(predicate -> ((target.invoice_no <=> updates.invoice_no) AND (target.stock_code <=> updates.stock_code)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(3720359278619703),0330-150634-qdjwo0t5,3.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 445781, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 3, executionTimeMs -> 4569, numTargetRowsInserted -> 1, scanTimeMs -> 2289, numTargetRowsUpdated -> 1, numOutputRows -> 445783, numTargetChangeFilesAdded -> 2, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 2202)",,Databricks-Runtime/10.4.x-scala2.12
3,2022-03-30T16:06:25.000+0000,4066625165483276,odl_user_577628@databrickslabs.com,UPDATE,Map(predicate -> (cast(invoice_no#15124 as int) = 100)),,List(3720359278619703),0330-150634-qdjwo0t5,2.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 2, numAddedChangeFiles -> 1, executionTimeMs -> 1175, scanTimeMs -> 60, numAddedFiles -> 1, numUpdatedRows -> 1, rewriteTimeMs -> 1115)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-03-30T16:06:23.000+0000,4066625165483276,odl_user_577628@databrickslabs.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3720359278619703),0330-150634-qdjwo0t5,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 2075)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-03-30T16:06:07.000+0000,4066625165483276,odl_user_577628@databrickslabs.com,DELETE,"Map(predicate -> [""(CAST(spark_catalog.demo.silver_retail_delta.invoice_no AS INT) = 100)""])",,List(3720359278619703),0330-150634-qdjwo0t5,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 3, numCopiedRows -> 96133, numAddedChangeFiles -> 3, executionTimeMs -> 1726, numDeletedRows -> 6, scanTimeMs -> 505, numAddedFiles -> 1, rewriteTimeMs -> 1221)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-03-30T16:05:30.000+0000,4066625165483276,odl_user_577628@databrickslabs.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> false, description -> null, partitionBy -> [], properties -> {""delta.enableChangeDataFeed"":""true""})",,List(3720359278619703),0330-150634-qdjwo0t5,,WriteSerializable,False,"Map(numFiles -> 4, numOutputRows -> 541915, numOutputBytes -> 3568087)",,Databricks-Runtime/10.4.x-scala2.12


### ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Time Travel via Version Number
Below are SQL syntax examples of Delta Time Travel by using a Version Number

In [0]:
%sql
SELECT * FROM silver_retail_delta VERSION AS OF 0

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
549275,20936,FORKED CACTUS CANDLE,6,4/7/11 13:09,2.95,15916.0,United Kingdom
549275,21457,2 PICTURE BOOK EGGS EASTER DUCKS,8,4/7/11 13:09,1.25,15916.0,United Kingdom
549275,21316,SMALL CHUNKY GLASS ROMAN BOWL,1,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,20801,LARGE PINK GLASS SUNDAE DISH,7,4/7/11 13:09,0.79,15916.0,United Kingdom
549275,48138,DOORMAT UNION FLAG,4,4/7/11 13:09,7.95,15916.0,United Kingdom
549275,22499,WOODEN UNION JACK BUNTING,4,4/7/11 13:09,5.95,15916.0,United Kingdom
549276,21213,PACK OF 72 SKULL CAKE CASES,24,4/7/11 13:11,0.55,16745.0,United Kingdom
549276,22895,SET OF 2 TEA TOWELS APPLE AND PEARS,2,4/7/11 13:11,3.25,16745.0,United Kingdom
549276,22897,OVEN MITT APPLES DESIGN,2,4/7/11 13:11,1.45,16745.0,United Kingdom
549276,22558,CLOTHES PEGS RETROSPOT PACK 24,20,4/7/11 13:11,1.65,16745.0,United Kingdom


In [0]:
%sql
SELECT * FROM silver_retail_delta VERSION AS OF 5

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country,line_item_total
549275,21457,2 PICTURE BOOK EGGS EASTER DUCKS,8,4/7/11 13:09,1.25,15916.0,United Kingdom,10.0
549275,21316,SMALL CHUNKY GLASS ROMAN BOWL,1,4/7/11 13:09,0.79,15916.0,United Kingdom,0.79
549275,20801,LARGE PINK GLASS SUNDAE DISH,7,4/7/11 13:09,0.79,15916.0,United Kingdom,5.53
549275,48138,DOORMAT UNION FLAG,4,4/7/11 13:09,7.95,15916.0,United Kingdom,31.8
549275,22499,WOODEN UNION JACK BUNTING,4,4/7/11 13:09,5.95,15916.0,United Kingdom,23.8
549276,21213,PACK OF 72 SKULL CAKE CASES,24,4/7/11 13:11,0.55,16745.0,United Kingdom,13.2
549276,22895,SET OF 2 TEA TOWELS APPLE AND PEARS,2,4/7/11 13:11,3.25,16745.0,United Kingdom,6.5
549276,22897,OVEN MITT APPLES DESIGN,2,4/7/11 13:11,1.45,16745.0,United Kingdom,2.9
549276,22558,CLOTHES PEGS RETROSPOT PACK 24,20,4/7/11 13:11,1.65,16745.0,United Kingdom,33.0
549276,85135B,BLUE DRAGONFLY HELICOPTER,2,4/7/11 13:11,7.95,16745.0,United Kingdom,15.9


# ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Delta Lake Optimizations

In this Section, we'll examine several features of Delta Lake that make it highly performant for reading data.

- Optimize
- Z-Ordering
- Caching
- Data Skipping

NOTE: Reference links at the end of the notebook for the complete list of optimization topics.

####RUN the OPTIMIZE command on our loan_by_state_delta table and let's see how many files are compacted!

In [0]:
%sql
OPTIMIZE silver_retail_delta

path,metrics
dbfs:/mnt/demo/silver_retail_delta,"List(1, 3, List(4408016, 4408016, 4408016.0, 1, 4408016), List(2736, 3632493, 1474481.3333333333, 3, 4423444), 0, null, 1, 3, 0, true)"


### Z-Ordering via Optimize <img src="https://pages.databricks.com/rs/094-YMS-629/images/zorro.jpg?raw=true" width=150/>

Zordering is a technique to colocate related information in the same set of files. Zordering maps multidimensional data to one dimension while preserving locality of the data points. Z ordering sorts the data based on the Zorder column specified with in a partition using the popular algorithm(z-order curve). Z order column should be different from the partition column.

``
OPTIMIZE  loan_by_state_delta ZORDER BY (addr_state)
``
In this example, we have specified only one column (addr_state) for Z-Ordering.  It is possible to specify multiple columns; however, the effectiveness of Z-Ordering diminishes rapidly once we get beyond three or four columns , Z-ordering Works best when data is Partitioned.

## <img src="https://pages.databricks.com/rs/094-YMS-629/images/z-order.png?raw=true" width=450/> 
<a href="https://databricks.com/blog/2018/07/31/processing-petabytes-of-data-in-seconds-with-databricks-delta.html" target="_blank"> Refer this blog</a> also note z-ordering via optimize is a databricks custom feature.

In [0]:
%sql
OPTIMIZE silver_retail_delta ZORDER BY invoice_no;

path,metrics
dbfs:/mnt/demo/silver_retail_delta,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(1, 4408016), 0, List(0, 0), 0, null), 0, 1, 1, false)"


In [0]:
%sql 
DESCRIBE DETAIL silver_retail_delta

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,86b382ee-7abe-4d10-a1c0-0eec8958e67b,demo.silver_retail_delta,,dbfs:/mnt/demo/silver_retail_delta,2022-03-30T16:05:27.783+0000,2022-03-30T16:09:22.000+0000,List(),1,4408016,Map(delta.enableChangeDataFeed -> true),1,4


### Change Data Feed (CDF)

</br>

- This is a property of Delta tables that enables efficient processing in CDC use-cases. </br>
- Data that has changed between Delta versions is accessible using `.option("readChangeFeed", "true")`
- Utilize additional options for either starting/ending `timestamp` or Delta table `version`

In [0]:
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 3) \
  .option("endingVersion", 5) \
  .table("demo.silver_retail_delta") \
  .display()

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country,line_item_total,_change_type,_commit_version,_commit_timestamp
549275,20936,FORKED CACTUS CANDLE,6.0,4/7/11 13:09,2.95,15916.0,United Kingdom,,update_preimage,4,2022-03-30T16:06:52.000+0000
549275,20936,,,,,,,,update_postimage,4,2022-03-30T16:06:52.000+0000
100,,,,,,-2.0,,,insert,4,2022-03-30T16:06:52.000+0000
100,,,,,,,,,update_preimage,3,2022-03-30T16:06:25.000+0000
100,,,,,,-1.0,,,update_postimage,3,2022-03-30T16:06:25.000+0000
549275,21457,2 PICTURE BOOK EGGS EASTER DUCKS,8.0,4/7/11 13:09,1.25,15916.0,United Kingdom,10.0,insert,5,2022-03-30T16:08:43.000+0000
549275,21316,SMALL CHUNKY GLASS ROMAN BOWL,1.0,4/7/11 13:09,0.79,15916.0,United Kingdom,0.79,insert,5,2022-03-30T16:08:43.000+0000
549275,20801,LARGE PINK GLASS SUNDAE DISH,7.0,4/7/11 13:09,0.79,15916.0,United Kingdom,5.53,insert,5,2022-03-30T16:08:43.000+0000
549275,48138,DOORMAT UNION FLAG,4.0,4/7/11 13:09,7.95,15916.0,United Kingdom,31.8,insert,5,2022-03-30T16:08:43.000+0000
549275,22499,WOODEN UNION JACK BUNTING,4.0,4/7/11 13:09,5.95,15916.0,United Kingdom,23.8,insert,5,2022-03-30T16:08:43.000+0000


### Caching  <img src="https://pages.databricks.com/rs/094-YMS-629/images/treasure.png?raw=true" width=200 />

The <a href="https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" target="_black"> Delta cache </a> accelerates data reads by creating copies of remote files in nodes’ local storage using a fast intermediate data format. The data is cached automatically whenever a file has to be fetched from a remote location. Successive reads of the same data are then performed locally, which results in significantly improved reading speed.

Is summary:

- __Spark caching__ takes place in __memory__
- __Delta Lake caching__ takes place on the __SSD drives__ of worker machines, and does not "steal" memory from Spark.  SSD drives provide very fast response times, and avoid network usage.

**Note to Unilever, this is just an example**

In an example workload, data is read from the cache as illustrated below:

<img src="https://pages.databricks.com/rs/094-YMS-629/images/cache_write_and_read.png?raw=true" width=800/>

For this small demo dataset, there isn't much response time difference.  However, the time savings can be significant for a large dataset, since a great deal of network traffic is eliminated.

In [0]:
%scala
spark.conf.set("spark.databricks.io.cache.enabled", "false")

In [0]:
%scala
spark.conf.get("spark.databricks.io.cache.enabled")

After disabling the delta cache, we can go to the following link:

#<img src="https://pages.databricks.com/rs/094-YMS-629/images/disablecache.png?raw=true" width=850/>

Inside the "Storage" tab, something similar to the image below would show:

<img src="https://pages.databricks.com/rs/094-YMS-629/images/cache_unused.png?raw=true" width=800 />

### Data Skipping <img src="https://pages.databricks.com/rs/094-YMS-629/images/skippingstone.jpg?raw=true" width=200/>

Databricks' Data Skipping feature takes advantage of the multi-file structure we saw above.  As new data is inserted into a Databricks Delta table, file-level min/max statistics are collected for the first 32 columns. Then, when there’s a lookup query against the table, Databricks Runtime first consults these statistics in order to determine which files can safely be skipped.  The picture below illustrates the process:

<img src="https://pages.databricks.com/rs/094-YMS-629/images/dataskipping.png?raw=true" />

In this example, we skip file 1 because its minimum value is higher than our desired value.  Similarly, we skip file 3 based on its maximum value.  File 2 is the only one we need to access.

For certain classes of queries, Data Skipping can provide dramatically faster response times. Refer to this useful <a https://databricks.com/blog/2018/07/31/processing-petabytes-of-data-in-seconds-with-databricks-delta.html</a>blog.

## How do we automate a similar workload that depends on this one?

Open in new tab: <a href="$./02-Intro-to-Data-Engineering-(job)">Job notebook for automation exercise.</a>