## Overview of Delta Lake and some use cases

- https://github.com/delta-io

![images/delta-lake.jpg](images/delta-lake.jpg)

In [1]:
# notebook parameters

import os

os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.9"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.9"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-11.0.12.0.7-4.fc34.x86_64"

spark_master = "local[*]"
app_name = "churn-etl"
input_files = dict(
    billing="billing_events", 
    account_features="customer_account_features", 
    internet_features="customer_internet_features", 
    meta="customer_meta", 
    phone_features="customer_phone_features"
)
output_file = "churn-etl"
output_prefix = ""
output_mode = "overwrite"
output_kind = "parquet"
input_kind = "parquet"
driver_memory = '8g'
executor_memory = '8g'

In [3]:
import pyspark

session = pyspark.sql.SparkSession.builder \
    .master(spark_master) \
    .appName(app_name) \
    .config("spark.eventLog.enabled", True) \
    .config("spark.eventLog.dir", ".") \
    .config("spark.driver.memory", driver_memory) \
    .config("spark.executor.memory", executor_memory) \
    .config("spark.executor.cores", 1) \
    .config("spark.rapids.sql.concurrentGpuTasks", 1) \
    .config("spark.rapids.memory.pinnedPool.size", "2G") \
    .config("spark.locality.wait", "0s") \
    .config("spark.sql.files.maxPartitionBytes", "512m") \
    .config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
    .config("spark.jars", "/opt/sparkRapidsPlugin/cudf-21.08.2-cuda11.jar,/opt/sparkRapidsPlugin/rapids-4-spark_2.12-21.08.0.jar") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
session

21/10/08 07:46:00 WARN Utils: Your hostname, virt resolves to a loopback address: 127.0.0.1; using 192.168.86.109 instead (on interface wlp2s0)
21/10/08 07:46:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/mike/.local/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mike/.ivy2/cache
The jars for the packages stored in: /home/mike/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ed1a0eb0-0841-4bac-97f9-226b64fc54f7;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in local-m2-cache
	found org.antlr#antlr-runtime;3.5.2 in local-m2-cache
	found org.antlr#ST4;4.0.8 in local-m2-cache
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in local-m2-cache
	found org.glassfish#javax.json;1.0.4 in local-m2-cache
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 371ms :: artifacts dl 13ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from local-m2-cache in [default]
	org.antlr#ST4;4.0.8 from loc

### Schema Enforcement

With Delta Lake “schema on write” is followed, so any changes in schema when writing will be tracked and any discrepancy will raise an exception at that time.

Below code will make a dataframe of 1–5 numbers and we will write it as a Delta table.

In [6]:
import shutil
shutil.rmtree("/tmp/data/delta_sample", ignore_errors=True)

data = session.range(1,5)
data.write.format("delta").mode("overwrite").save("/tmp/data/delta_sample")

In [7]:
import glob
print(glob.glob("/tmp/data/delta_sample/*"))

['/tmp/data/delta_sample/part-00000-ad15ad29-9872-4b62-a615-a844b3916668-c000.snappy.parquet', '/tmp/data/delta_sample/part-00003-a07883b7-0d90-4f3f-a624-741d6ea7fa10-c000.snappy.parquet', '/tmp/data/delta_sample/part-00005-fa05c5ac-e8aa-4fe5-9830-44bf2e6af6dd-c000.snappy.parquet', '/tmp/data/delta_sample/part-00001-b6b06197-7bfb-4b30-82cb-a7c944cd6254-c000.snappy.parquet', '/tmp/data/delta_sample/part-00007-0181aff8-c62b-46c9-9c7b-677f78c9a8c7-c000.snappy.parquet', '/tmp/data/delta_sample/_delta_log']


Now, make a dataframe with numbers from 5–10 and will give its datatype as String and append the dataset on our existing dataset.

In [8]:
import pyspark.sql.functions as fn
new_data = session.range(5,10)
new_data = new_data.withColumn("id",fn.col("id").cast("String"))
new_data.write.format("delta").mode("append").save("/tmp/data/delta_sample")

AnalysisException: Failed to merge fields 'id' and 'id'. Failed to merge incompatible data types LongType and StringType

We get an error **AnalysisException: Failed to merge fields 'id' and 'id'. Failed to merge incompatible data types LongType and StringType** which is good.

Delta lake stopped the incorrect data to go in our delta lake.

Let's append the dataset with correct schema.

In [9]:
new_data = session.range(5,10)
new_data.write.format("delta").mode("append").save("/tmp/data/delta_sample")



We can check the delta logs and see the we added the part files which were newly written on the dataset along with specifying information such as mode of write and modification time.

In [10]:
os.chdir("/tmp/data/delta_sample/_delta_log")
for file in glob.glob("*.json"):
    print(file)
    print(open(file).read())


00000000000000000001.json
{"commitInfo":{"timestamp":1633643252551,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputBytes":"2611","numOutputRows":"5"}}}
{"add":{"path":"part-00000-f10cca4d-b05e-4344-8f90-22b9f876e10a-c000.snappy.parquet","partitionValues":{},"size":296,"modificationTime":1633643252536,"dataChange":true}}
{"add":{"path":"part-00001-2c74eeac-a084-44d5-bc34-c0e79ec5e238-c000.snappy.parquet","partitionValues":{},"size":463,"modificationTime":1633643252540,"dataChange":true}}
{"add":{"path":"part-00003-485621f9-2150-44ea-88b7-02735ff4b6d4-c000.snappy.parquet","partitionValues":{},"size":463,"modificationTime":1633643252547,"dataChange":true}}
{"add":{"path":"part-00004-decb191e-f6a7-44c1-b51a-1e315ca21b70-c000.snappy.parquet","partitionValues":{},"size":463,"modificationTime":1633643252536,"dataChange":true}}
{"add":{"path":"part-00006-68e8e7a1-2bd0-4482-a17d-e05c

### Deletion

Lets read our table we just wrote in Delta Format.

In [11]:
from delta.tables import *
delta_df = DeltaTable.forPath(session, "/tmp/data/delta_sample")
delta_df

<delta.tables.DeltaTable at 0x7f1e197d3c40>

Now, we will delete the data where id is ≤2.

In [12]:
delta_df.delete("id<=2")



Let’s checkout how the commit log is written for delete operation.

It guides spark to delete the original part files through remove and then add the new part file with predicate as id≤2, the operation performed(DELETE) is specified

In [15]:
os.chdir("/tmp/data/delta_sample/_delta_log")
for file in glob.glob("*.json"):
    print(file)
    print(open(file).read())

00000000000000000002.json
{"commitInfo":{"timestamp":1633643263596,"operation":"DELETE","operationParameters":{"predicate":"[\"(`id` <= 2L)\"]"},"readVersion":1,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"2","numCopiedRows":"2","executionTimeMs":"2082","numDeletedRows":"2","scanTimeMs":"1885","numAddedFiles":"2","rewriteTimeMs":"196"}}}
{"remove":{"path":"part-00005-fa05c5ac-e8aa-4fe5-9830-44bf2e6af6dd-c000.snappy.parquet","deletionTimestamp":1633643263592,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":463}}
{"remove":{"path":"part-00003-485621f9-2150-44ea-88b7-02735ff4b6d4-c000.snappy.parquet","deletionTimestamp":1633643263592,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":463}}
{"add":{"path":"part-00000-796bb328-7daf-4c1c-9df1-d953707886ec-c000.snappy.parquet","partitionValues":{},"size":463,"modificationTime":1633643263580,"dataChange":true}}
{"add":{"path":"part-00001-d5b863e7-c0dd-44d2-b57b-d15be86fb24a-c000.s

### Updates

We will read the dataset again and will update the value from 5 to 500.

In [28]:
delta_df = DeltaTable.forPath(session, "/tmp/data/delta_sample")
delta_df.update(condition = "id = 5", set = { "id": "500" })
delta_df.toDF().show()

+---+
| id|
+---+
|  1|
|  2|
|500|
|  4|
|  7|
|  3|
|  9|
|  8|
|  6|
+---+



The above operation is will set id to 500 where it is 5, the Delta Table is auto refresh as the data is updated. 

As you can see the syntax is very simple.

In [18]:
os.chdir("/tmp/data/delta_sample/_delta_log")
for file in glob.glob("*.json"):
    print(file)
    print(open(file).read())

00000000000000000003.json
{"commitInfo":{"timestamp":1633643484364,"operation":"UPDATE","operationParameters":{"predicate":"(id#2755L = 5)"},"readVersion":2,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"0","executionTimeMs":"1400","scanTimeMs":"1284","numAddedFiles":"1","numUpdatedRows":"1","rewriteTimeMs":"116"}}}
{"remove":{"path":"part-00001-2c74eeac-a084-44d5-bc34-c0e79ec5e238-c000.snappy.parquet","deletionTimestamp":1633643484246,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":463}}
{"add":{"path":"part-00000-4402aca0-1cac-4f9e-97a9-14c20b96e0f4-c000.snappy.parquet","partitionValues":{},"size":463,"modificationTime":1633643484359,"dataChange":true}}

00000000000000000002.json
{"commitInfo":{"timestamp":1633643263596,"operation":"DELETE","operationParameters":{"predicate":"[\"(`id` <= 2L)\"]"},"readVersion":1,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"2","numCopiedRows":"2","executionTimeMs":"2082","numD

### Merge

Now, we will perform the merge operation on our Delta Table. Create a new dataset containing Country, Year and Temperature columns and will write it as a Delta Table.

In [39]:
shutil.rmtree("/tmp/data/delta_merge", ignore_errors=True)

df = session.read.csv("/home/mike/tmp/dataset", inferSchema=True, sep=',', header=True)
df.write.format("delta").save("/tmp/data/delta_merge")



In [40]:
delta_merge_df = DeltaTable.forPath(session, "/tmp/data/delta_merge")
delta_merge_df.toDF().show()

+---------+----+-----------+
|  country|year|temperature|
+---------+----+-----------+
|Australia|2019|      23.34|
| Pakistan|2021|   27.89892|
+---------+----+-----------+



In [41]:
update_df = session.read.csv("/home/mike/tmp/update-dataset", inferSchema=True, sep=',', header=True)
update_df.show()

+-----------+----+-----------+
|    country|year|temperature|
+-----------+----+-----------+
|  Australia|2021|      100.0|
|New Zealand|2019|   19.34534|
+-----------+----+-----------+



In [42]:
delta_merge_df.alias("delta_merge").merge(
    update_df.alias("updates"),
    "delta_merge.country = updates.country") \
  .whenMatchedUpdate(set = { 
        "temperature" : "updates.temperature",
        "year" : "updates.year"
  } ) \
  .whenNotMatchedInsert(values =
    {
      "country": "updates.country",
      "year": "updates.year",
      "temperature": "updates.temperature"
    }
  ) \
  .execute()



Final merged records - Australia got updated to 100.00, and year got updated to 2021

In [43]:
delta_merge_df.toDF().show()

+-----------+----+-----------+
|    country|year|temperature|
+-----------+----+-----------+
|New Zealand|2019|   19.34534|
|  Australia|2021|      100.0|
|   Pakistan|2021|   27.89892|
+-----------+----+-----------+



### Time Travel

Delta Lake we will be able to maintain different versions of our dataset’s and can be reused when needed.

In [44]:
delta_df.history().show(10, False)

+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                      |userMetadata|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|3      |2021-10

Current data looks like this:

In [48]:
delta_df = DeltaTable.forPath(session, "/tmp/data/delta_sample")
delta_df.toDF().show()

+---+
| id|
+---+
|  1|
|  2|
|500|
|  4|
|  7|
|  3|
|  9|
|  8|
|  6|
+---+



Lets get back version 1 of our data

In [56]:
version_1 = session.read.format("delta").option("versionAsOf",1).load("/tmp/data/delta_sample")
version_1.show()

+---+
| id|
+---+
|  1|
|  6|
|  2|
|  3|
|  4|
|  7|
|  9|
|  8|
|  5|
+---+



So we dont blow out our storage, we can use `deltaTable.vacuum()`

    deltaTable.vacuum()     # vacuum files not required by versions more than 7 days old
    deltaTable.vacuum(100)  # vacuum files not required by versions more than 100 hours old

In [None]:
session.stop()