# Delta Lake

Delta Lake is an open source storage layer that brings reliability to data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. Delta Lake runs on top of your existing data lake and is fully compatible with Apache Spark APIs.

This notebook contains demostration of some basic delta lake functionality.

### Prerequisite

To use Delta Lake, first you need **Apache Spark** to be installed.

### Installation

To install Delta Lake, go to [Delta Core repository on Maven](https://mvnrepository.com/artifact/io.delta/delta-core). Make sure to download the jar file for the correct Scala version. After downloading the jar file, copy it into the `$SPARK_HOME/jars` folder.

To check your Scala version, execute following command in your terminal.

```sh
pyspark --version
```

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

### SparkSession

SparkSession with the following configuration integrates the Delta Lake layer.

In [2]:
spark = SparkSession.builder.appName("delta_lake") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [3]:
from delta.tables import *

### Reading CSV data

I'm using CSV files to start so that different ways of creating Delta Tables can be demonstrated.

In [4]:
movies_df = spark.read.csv('movie.csv', sep=',', inferSchema=True, header=True)
ratings_df = spark.read.csv('ratings.csv', sep=',', inferSchema=True, header=True)

In [5]:
movies_df.show(5, truncate=False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



In [6]:
ratings_df.show(5, truncate=False)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|196   |242    |3     |881250949|
|186   |302    |3     |891717742|
|22    |377    |1     |878887116|
|244   |51     |2     |880606923|
|166   |346    |1     |886397596|
+------+-------+------+---------+
only showing top 5 rows



### Writing the Delta Table

First method of creating Delta Table is to write the Spark DataFrame using `.format('delta')`. This will write data in Parquet file format but with a folder called `_delta_log` which will contain log for all the operations that will occur on this data. This allows you to keep track of the changes and travel back in time to a specific version of the table.

In [7]:
movies_df.write.format('delta').save('movies_delta')

### Writing the Delta Table (cont.)

An alternative way of creating Delta Tables is to using function `convertToDelta` on data that is already in the parquet format. This function will create the `_delta_log` folder for the given data.

Function `isDeltaTable` returns either `True` or `False` depending on whether the given location contains a Delta Table or not.

In [8]:
ratings_df.write.format('parquet').save('ratings_parquet')
DeltaTable.isDeltaTable(spark, 'ratings_parquet')

False

In [9]:
DeltaTable.convertToDelta(spark, "parquet.`ratings_parquet`")
DeltaTable.isDeltaTable(spark, 'ratings_parquet')

True

### Reading the Delta Table

First method of reading a Delta Table is by using function `forPath`. This function creates a Delta Table from the given path.

In [10]:
movies_dt = DeltaTable.forPath(spark, 'movies_delta')

### Reading the Delta Table (cont.)

Alternatively, Delta Tables can be read from the table name used to store the table in memory.

```py
movies_df.write.format('delta').option('save', 'movies_delta').saveAsTable('movies')
movies_dt_by_name = DeltaTable.forName(spark, 'movies')
```

### Working with the Delta Table

On the surface, Delta Tables lack the methods and functionality of the DataFrames since the API contains very limited functionality. Delta Tables, however, can be converted into DataFrame for querying purposes or can be registered as a table which allows Hive compatible queries to be executed on them using the function `SparkSession.sql`.

In [11]:
movies_dt.toDF().registerTempTable('movies_dt')

In [12]:
spark.sql('SELECT * FROM movies_dt').show(5, truncate=False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



### Working with the Delta Table (cont.)

Let's create a DataFrame for an update. I am keeping IDs of all the new entries below zero so changes be observed when sorted.

In [13]:
update_movies_df = spark.createDataFrame([
    [-1, 'Life of Faraz', 'Adventure|Comedy|Thriller|Drama|Sci-Fi'],
    [-2, 'A Very Cool Movie', 'Children|Fantasy'],
    [1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy|Sci-Fi'],
    [2, 'Jumanji (1995)', 'Comedy|Romance|Adventure']
], ['movieId', 'title', 'genres'])

update_movies_df.show(truncate=False)

+-------+-----------------+--------------------------------------------------+
|movieId|title            |genres                                            |
+-------+-----------------+--------------------------------------------------+
|-1     |Life of Faraz    |Adventure|Comedy|Thriller|Drama|Sci-Fi            |
|-2     |A Very Cool Movie|Children|Fantasy                                  |
|1      |Toy Story (1995) |Adventure|Animation|Children|Comedy|Fantasy|Sci-Fi|
|2      |Jumanji (1995)   |Comedy|Romance|Adventure                          |
+-------+-----------------+--------------------------------------------------+



### Working with the Delta Table: Upsert

The starting point of the upsert operation is the function `merge`. It accepts two parameters one being the source which accepts a DataFrame and the other being condition which act like a WHERE clause. This function return DeltaMergeBuilder which provides multiple upsert related functions.

```md
Parameters:	
    source (pyspark.sql.DataFrame) – Source DataFrame
    condition (str or pyspark.sql.Column) – Condition to match sources rows with the Delta table rows.
```

First two are as following:

- Function `whenMatchedUpdate` update rows when the merge conditions are met.
```md
    Parameters:	
        condition (str or pyspark.sql.Column) – Optional condition of the update
        set (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.
```

- Function `whenNotMatchedInsert` inserts new rows when the merge conditions are not met.
```md
    Parameters:	
        condition (str or pyspark.sql.Column) – Optional condition of the insert
        values (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.
```

**Note:** Since the function `merge` returns DeltaMergeBuilder, it requires the function `execute` to execute the upsert operation.

In [14]:
movies_dt.alias('movies_dt') \
    .merge(source=update_movies_df.alias('update'),
           condition='movies_dt.movieId = update.movieId') \
    .whenMatchedUpdate(condition='movies_dt.movieId = 1',
                       set={'title': F.upper('movies_dt.title')}) \
    .whenNotMatchedInsert(condition='update.movieId = -1',
                          values={'movieId': 'update.movieId',
                                  'title': 'update.title',
                                  'genres': 'update.genres'}) \
    .execute()

### Working with the Delta Table (cont.)

You can query the results of the upsert operation using Hive compatible queries in the function `SparkSession.sql`.

One point to note is that unlike DataFrames, Delta Tables get written to disk and memory as they get updated without the need of explicitly writing or registering them. In the next cell, I queried using same table name I registered earlier without making any modifications and I still got updated results.

In [15]:
spark.sql('SELECT * FROM movies_dt ORDER BY movieId').show(5, truncate=False)

+-------+------------------------+-------------------------------------------+
|movieId|title                   |genres                                     |
+-------+------------------------+-------------------------------------------+
|-1     |Life of Faraz           |Adventure|Comedy|Thriller|Drama|Sci-Fi     |
|1      |TOY STORY (1995)        |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)          |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995) |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)|Comedy|Drama|Romance                       |
+-------+------------------------+-------------------------------------------+
only showing top 5 rows



### Working with the Delta Table: History

Since Delta Tables maintain a log, peaking into the history is just a matter of calling the function `history`.

Although, the fucntion returns a DataFrame with a lot of columns, I've decided to show only a few relavent ones.

In [16]:
movies_dt.history().select('version', 'timestamp', 'operation', 'operationParameters').show(truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 1                                                                                                                                                                                 
 timestamp           | 2020-11-29 07:44:07.591                                                                                                                                                           
 operation           | MERGE                                                                                                                                                                             
 operationParameters | [predicate -> (CAST(movies_dt.`movieId` AS BIGINT) = update.`movieId`), updatePredicate -> (movies_dt.`movieId` = 1), insertPredicate -> (update.`movieId` = CAST(-1 AS B

### Working with the Delta Table: Upsert (cont.)

The merge builder provides a few more upsert functions which are as following:

- Function `whenMatchedDelete` deletes rows where merge conditions are met.
```md
    Parameters:	
        condition (str or pyspark.sql.Column) – Optional condition of the delete
```

- Function `whenMatchedUpdateAll` updates all rows where merge conditions are met.
```md
    Parameters:	
        condition (str or pyspark.sql.Column) – Optional condition of the delete
```

- Function `whenNotMatchedInsertAll` insert all rows where merge conditions are not met.
```md
    Parameters:	
        condition (str or pyspark.sql.Column) – Optional condition of the delete
```

**Note:** Each `whenMatched` clause can have an optional condition. However, if there are two `whenMatched clauses`, then the first one must have a condition.

In [17]:
movies_dt.alias('movies_dt') \
    .merge(source=update_movies_df.alias('update'),
           condition='movies_dt.movieId = update.movieId') \
    .whenMatchedDelete(condition='movies_dt.movieId = 1') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

In [18]:
spark.sql('SELECT * FROM movies_dt ORDER BY movieId').show(5, truncate=False)

+-------+------------------------+--------------------------------------+
|movieId|title                   |genres                                |
+-------+------------------------+--------------------------------------+
|-2     |A Very Cool Movie       |Children|Fantasy                      |
|-1     |Life of Faraz           |Adventure|Comedy|Thriller|Drama|Sci-Fi|
|2      |Jumanji (1995)          |Comedy|Romance|Adventure              |
|3      |Grumpier Old Men (1995) |Comedy|Romance                        |
|4      |Waiting to Exhale (1995)|Comedy|Drama|Romance                  |
+-------+------------------------+--------------------------------------+
only showing top 5 rows



In [19]:
movies_dt.history(limit=3) \
    .select('version', 'timestamp', 'operation', 'operationParameters').show(truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 2                                                                                                                                                                                 
 timestamp           | 2020-11-29 07:44:18.637                                                                                                                                                           
 operation           | MERGE                                                                                                                                                                             
 operationParameters | [predicate -> (CAST(movies_dt.`movieId` AS BIGINT) = update.`movieId`), deletePredicate -> (movies_dt.`movieId` = 1)]                                                    

### Working with the Delta Table: Update

The function `update` updates the Delta Table based on given condition, applying provided operation set.
```md
Parameters:	
    condition (str or pyspark.sql.Column) – Optional condition of the update
    set (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.
```

In [20]:
movies_dt \
    .update(condition='movieId = 2',
            set={'genres': F.lit('Adventure|Children|Fantasy')})

In [21]:
spark.sql('SELECT * FROM movies_dt ORDER BY movieId').show(5, truncate=False)

+-------+------------------------+--------------------------------------+
|movieId|title                   |genres                                |
+-------+------------------------+--------------------------------------+
|-2     |A Very Cool Movie       |Children|Fantasy                      |
|-1     |Life of Faraz           |Adventure|Comedy|Thriller|Drama|Sci-Fi|
|2      |Jumanji (1995)          |Adventure|Children|Fantasy            |
|3      |Grumpier Old Men (1995) |Comedy|Romance                        |
|4      |Waiting to Exhale (1995)|Comedy|Drama|Romance                  |
+-------+------------------------+--------------------------------------+
only showing top 5 rows



In [22]:
movies_dt.history(limit=3) \
    .select('version', 'timestamp', 'operation', 'operationParameters').show(truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 3                                                                                                                                                                                 
 timestamp           | 2020-11-29 07:44:24.622                                                                                                                                                           
 operation           | UPDATE                                                                                                                                                                            
 operationParameters | [predicate -> (movieId#1015 = 2)]                                                                                                                                        

### Working with the Delta Table: Delete

The function `delete` deletes rows from the the Delta Table based on given condition.
```md
Parameters:	
    condition (str or pyspark.sql.Column) – Optional condition of the update
```

In [23]:
spark.sql('SELECT COUNT(*) FROM movies_dt').show(5, truncate=False)

+--------+
|count(1)|
+--------+
|27279   |
+--------+



In [24]:
movies_dt.delete(condition='movieId < 0')

In [25]:
spark.sql('SELECT COUNT(*) FROM movies_dt').show(5, truncate=False)

+--------+
|count(1)|
+--------+
|27277   |
+--------+



In [26]:
spark.sql('SELECT * FROM movies_dt ORDER BY movieId').show(5, truncate=False)

+-------+----------------------------------+--------------------------+
|movieId|title                             |genres                    |
+-------+----------------------------------+--------------------------+
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy|
|3      |Grumpier Old Men (1995)           |Comedy|Romance            |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance      |
|5      |Father of the Bride Part II (1995)|Comedy                    |
|6      |Heat (1995)                       |Action|Crime|Thriller     |
+-------+----------------------------------+--------------------------+
only showing top 5 rows



In [27]:
movies_dt.history(limit=3) \
    .select('version', 'timestamp', 'operation', 'operationParameters').show(truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------
 version             | 4                                                                                                                     
 timestamp           | 2020-11-29 07:44:31.314                                                                                               
 operation           | DELETE                                                                                                                
 operationParameters | [predicate -> ["(`movieId` < 0)"]]                                                                                    
-RECORD 1------------------------------------------------------------------------------------------------------------------------------------
 version             | 3                                                                                                                     
 times

### Working with the Delta Table: Vacuum

Recursively delete files and directories in the table that are not needed by the table for maintaining older versions up to the given retention threshold. This method will return an empty DataFrame on successful completion.

```md
Parameters:	
    retentionHours – Optional number of hours retain history. If not specified, then the default retention period of 168 hours (7 days) will be used.
```

In [28]:
movies_dt.vacuum()

DataFrame[]

### Working with Delta Table: Time Travel

Since Delta Tables keep a log of every operation, options `timestampAsOf` and `versionAsOf` can be used to travel back in time and get data from specific Delta Table versions.

In [29]:
# Timestamp of second last version.
timestamp_df = spark.read.format("delta") \
    .option("timestampAsOf", '2020-11-29 07:44:24.622') \
    .load("movies_delta")

# Version after first modification.
version_df = spark.read.format("delta") \
    .option("versionAsOf", 1) \
    .load("movies_delta")

In [30]:
timestamp_df.orderBy('movieId').show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|     -2|   A Very Cool Movie|    Children|Fantasy|
|     -1|       Life of Faraz|Adventure|Comedy|...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
+-------+--------------------+--------------------+
only showing top 5 rows



In [31]:
version_df.orderBy('movieId').show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|     -1|       Life of Faraz|Adventure|Comedy|...|
|      1|    TOY STORY (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
+-------+--------------------+--------------------+
only showing top 5 rows



### Working with Delta Table: symlink_format_manifest

This function generates manifests in symlink format for Presto and Athena read support.

In [32]:
movies_dt.generate(mode='symlink_format_manifest')

## Conclusion

Delta Lake introduces bunch of new features and functionalities in Apache Spark which can be helpful in a lot of applications, a few of which have been mentioned in this notebook. Delta Lake, with introducing new features, sometimes require certain tasks to be done slightly differently than Spark DataFrames.

Please use the following references for more information.

## References

- [Delta Lake documentation](https://docs.delta.io/latest/delta-intro.html)
- [Delta Lake Python API documentation](https://docs.delta.io/latest/api/python/index.html)
- [Delta Core Maven Repository](https://mvnrepository.com/artifact/io.delta/delta-core)
- [Apache Spark documentation](https://spark.apache.org/docs/latest/)
- [Apache Spark Python API documentation](https://spark.apache.org/docs/latest/api/python/index.html)

## Data Source

- [MovieLens-20M by GroupLens](https://grouplens.org/datasets/movielens/20m/)