# <img src="https://files.training.databricks.com/images/DeltaLake-logo.png" width=80px> Open Source Delta Lake

[Delta Lake](https://delta.io/) is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads.

<img src="https://www.evernote.com/l/AAF4VIILJtFNZLuvZjGGhZTr2H6Z0wh6rOYB/image.png" width=900px>

### Key Features

[Quick start intro to Delta Lake.](https://docs.delta.io/latest/quick-start.html#)

**ACID Transactions**:
Data lakes typically have multiple data pipelines reading and writing data concurrently, and data engineers have to go through a tedious process to ensure data integrity, due to the lack of transactions. Delta Lake brings ACID transactions to your data lakes. It provides serializability, the strongest level of isolation level.

**Scalable Metadata Handling**:
In big data, even the metadata itself can be "big data". Delta Lake treats metadata just like data, leveraging Spark's distributed processing power to handle all its metadata. As a result, Delta Lake can handle petabyte-scale tables with billions of partitions and files at ease.

**Time Travel (data versioning)**:
Delta Lake provides snapshots of data enabling developers to access and revert to earlier versions of data for audits, rollbacks or to reproduce experiments.

**Open Format**:
All data in Delta Lake is stored in Apache Parquet format enabling Delta Lake to leverage the efficient compression and encoding schemes that are native to Parquet.

**Unified Batch and Streaming Source and Sink**:
A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.

**Schema Enforcement**:
Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption.

**Schema Evolution**:
Big data is continuously changing. Delta Lake enables you to make changes to a table schema that can be applied automatically, without the need for cumbersome DDL.

**100% Compatible with Apache Spark API**:
Developers can use Delta Lake with their existing data pipelines with minimal change as it is fully compatible with Spark, the commonly used big data processing engine.

### Getting Started

You will notice that throughout this course, there is a lot of context switching between PySpark/Scala and SQL.

This is because:
* `read` and `write` operations are performed on DataFrames using PySpark or Scala
* table creates and queries are performed directly off Delta Lake tables using SQL

Run the following cell to configure our "classroom."

In [0]:
%run "./Includes/Classroom-Setup"

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Key Concepts: Delta Lake Architecture</h2>

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> We'll touch on this further in future notebooks.

Throughout our Delta Lake discussions, we'll often refer to the concept of Bronze/Silver/Gold tables. These levels refer to the state of data refinement as data flows through a processing pipeline.

**These levels are conceptual guidelines, and implemented architectures may have any number of layers with various levels of enrichment.** Below are some general ideas about the state of data in each level.

* **Bronze** tables
  * Raw data (or very little processing)
  * Data will be stored in the Delta format (can encode raw bytes as a column)
* **Silver** tables
  * Data that is directly queryable and ready for insights
  * Bad records have been handled, types have been enforced
* **Gold** tables
  * Highly refined views of the data
  * Aggregate tables for BI
  * Feature tables for data scientists

For different workflows, things like schema enforcement and deduplication may happen in different places.

## Delta Lake Batch Operations - Create

Creating Delta Lakes is as easy as changing the file type while performing a write. 

In this section, we'll read from a CSV and write to Delta.


![](https://files.training.databricks.com/images/adbcore/AAFxQkg_SzRC06GvVeatDBnNbDL7wUUgCg4B.png)


Set up relevant paths to the online retail datasets from `/mnt/training/online_retail`

In [0]:
inputPath = "/mnt/training/online_retail/data-001/data.csv"
DataPath = userhome + "/delta/customer-data/"

#remove directory if it exists
dbutils.fs.rm(DataPath, True)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2717365773243481>, line 2[0m
[1;32m      1[0m inputPath [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/mnt/training/online_retail/data-001/data.csv[39m[38;5;124m"[39m
[0;32m----> 2[0m DataPath [38;5;241m=[39m userhome [38;5;241m+[39m [38;5;124m"[39m[38;5;124m/delta/customer-data/[39m[38;5;124m"[39m
[1;32m      4[0m [38;5;66;03m#remove directory if it exists[39;00m
[1;32m      5[0m dbutils[38;5;241m.[39mfs[38;5;241m.[39mrm(DataPath, [38;5;28;01mTrue[39;00m)

[0;31mNameError[0m: name 'userhome' is not defined

Read the data into a DataFrame. We supply the schema.

Use overwrite mode so that there will not be an issue in rewriting the data in case you end up running the cell again.

Partition on `Country` because there are only a few unique countries and because we will use `Country` as a predicate in a `WHERE` clause.

More information on the how and why of partitioning is contained in the links at the bottom of this notebook.

Then write the data to Delta Lake.

In [0]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

inputSchema = StructType([
  StructField("InvoiceNo", IntegerType(), True),
  StructField("StockCode", StringType(), True),
  StructField("Description", StringType(), True),
  StructField("Quantity", IntegerType(), True),
  StructField("InvoiceDate", StringType(), True),
  StructField("UnitPrice", DoubleType(), True),
  StructField("CustomerID", IntegerType(), True),
  StructField("Country", StringType(), True)
])

rawDataDF = (spark.read
  .option("header", "true")
  .schema(inputSchema)
  .csv(inputPath)
)

# write to Delta Lake
rawDataDF.write.mode("overwrite").format("delta").partitionBy("Country").save(DataPath)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2717365773243483>, line 17[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mtypes[39;00m [38;5;28;01mimport[39;00m StructType, StructField, DoubleType, IntegerType, StringType
[1;32m      3[0m inputSchema [38;5;241m=[39m StructType([
[1;32m      4[0m   StructField([38;5;124m"[39m[38;5;124mInvoiceNo[39m[38;5;124m"[39m, IntegerType(), [38;5;28;01mTrue[39;00m),
[1;32m      5[0m   StructField([38;5;124m"[39m[38;5;124mStockCode[39m[38;5;124m"[39m, StringType(), [38;5;28;01mTrue[39;00m),
[0;32m   (...)[0m
[1;32m     11[0m   StructField([38;5;124m"[39m[38;5;124mCountry[39m[38;5;124m"[39m, StringType(), [38;5;28;01mTrue[39;00m)
[1;32m     12[0m ])
[1;32m


<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> While we show creating a table in the next section, Spark SQL queries can run directly on a directory of data, for delta use the following syntax: 
```
SELECT * FROM delta.`/path/to/delta_directory`
```

In [0]:
display(spark.sql("SELECT * FROM delta.`{}` LIMIT 5".format(DataPath)))

### CREATE A Table Using Delta Lake

Create a table called `customer_data_delta` using `DELTA` out of the above data.

The notation is:
> `CREATE TABLE <table-name>` <br>
  `USING DELTA` <br>
  `LOCATION <path-do-data> ` <br>
  
Tables created with a specified `LOCATION` are considered unmanaged by the metastore. Unlike a managed table, where no path is specified, an unmanaged table’s files are not deleted when you `DROP` the table. However, changes to either the registered table or the files will be reflected in both locations.

<img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Managed tables require that the data for your table be stored in DBFS. Unmanaged tables only store metadata in DBFS. 

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Since Delta Lake stores schema (and partition) info in the `_delta_log` directory, we do not have to specify partition columns!

In [0]:
spark.sql("""
  DROP TABLE IF EXISTS customer_data_delta
""")
spark.sql("""
  CREATE TABLE customer_data_delta
  USING DELTA
  LOCATION '{}'
""".format(DataPath))

Perform a simple `count` query to verify the number of records.

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> Notice how the count is right off the bat; no need to worry about table repairs.

In [0]:
%sql
SELECT count(*) FROM customer_data_delta


### Metadata

Since we already have data backing `customer_data_delta` in place,
the table in the Hive metastore automatically inherits the schema, partitioning,
and table properties of the existing data.

Note that we only store table name, path, database info in the Hive metastore,
the actual schema is stored in the `_delta_log` directory as shown below.

In [0]:
display(dbutils.fs.ls(DataPath + "/_delta_log"))

Metadata is displayed through `DESCRIBE DETAIL <tableName>`.

As long as we have some data in place already for a Delta Lake table, we can infer schema.

In [0]:
%sql
DESCRIBE DETAIL customer_data_delta

### Key Takeaways

Saving to Delta Lake is as easy as saving to Parquet, but creates an additional log file.

Using Delta Lake to create tables is straightforward and you do not need to specify schemas.

## Delta Lake Batch Operations - Append

In this section, we'll load a small amount of new data and show how easy it is to append this to our existing Delta table.

We'll start start by setting up our relevant path and loading new consumer product data.

In [0]:
miniDataInputPath = "/mnt/training/online_retail/outdoor-products/outdoor-products-mini.csv"

newDataDF = (spark
  .read
  .option("header", "true")
  .schema(inputSchema)
  .csv(miniDataInputPath)
)

Do a simple count of number of new items to be added to production data.

In [0]:
newDataDF.count()

### APPEND Using Delta Lake

Adding to our existing Delta Lake is as easy as modifying our write statement and specifying the `append` mode. 

Here we save to our previously created Delta Lake at `delta/customer-data/`.

In [0]:
(newDataDF
  .write
  .format("delta")
  .partitionBy("Country")
  .mode("append")
  .save(DataPath)
)

Perform a simple `count` query to verify the number of records and notice it is correct.

Should be `65535`.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> The changes to our files have been immediately reflected in the table that we've registered.

In [0]:
%sql
SELECT count(*) FROM customer_data_delta

### Key Takeaways
With Delta Lake, you can easily append new data without schema-on-read issues.

Changes to Delta Lake files will immediately be reflected in registered Delta tables.

## Delta Lake Batch Operations - Upsert

To UPSERT means to "UPdate" and "inSERT". In other words, UPSERT is literally TWO operations. It is not supported in traditional data lakes, as running an UPDATE could invalidate data that is accessed by the subsequent INSERT operation.

Using Delta Lake, however, we can do UPSERTS. Delta Lake combines these operations to guarantee atomicity to
- INSERT a row 
- if the row already exists, UPDATE the row.

### Scenario
You have a small amount of batch data to write to your Delta table. This is currently staged in a JSON in a mounted blob store.

In [0]:
upsertDF = spark.read.format("json").load("/mnt/training/enb/commonfiles/upsert-data.json")
display(upsertDF)


We'll register this as a temporary view so that this table doesn't persist in DBFS (but we can still use SQL to query it).

In [0]:
upsertDF.createOrReplaceTempView("upsert_data")

Included in this data are:
- Some new orders for customer 20993
- An update to a previous order correcting the country for customer 20993 to Iceland
- Corrections to some records for StockCode 22837 where the Description was incorrect

We can use UPSERT to simultaneously INSERT our new data and UPDATE our previous records.

In [0]:
%sql
MERGE INTO customer_data_delta
USING upsert_data
ON customer_data_delta.InvoiceNo = upsert_data.InvoiceNo
  AND customer_data_delta.StockCode = upsert_data.StockCode
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

Notice how this data is seamlessly incorporated into `customer_data_delta`.

In [0]:
%sql
SELECT * FROM customer_data_delta WHERE CustomerID=20993

In [0]:
%sql
SELECT DISTINCT(Description) 
FROM customer_data_delta 
WHERE StockCode = 22837

## Summary
In this Lesson, we:
- Saved files using Delta Lake
- Used Delta Lake to UPSERT data into existing Delta Lake tables

## Additional Topics & Resources

* <a href="https://docs.databricks.com/delta/delta-batch.html#" target="_blank">Table Batch Read and Writes</a>
