## Overview of Delta Lake and some use cases

- https://github.com/delta-io

Use Cases:

- Schema Enforcement
- Deletion
- Updates
- Merge
- Time Travel

![images/delta-lake.jpg](https://raw.githubusercontent.com/eformat/telco-churn-augmentation/develop/images/delta-lake.jpg)

In [1]:
# notebook parameters
import os, socket
from urllib.parse import urlparse

# Get the S3 URL information and use it in Spark Context
# NOTE: S3 Hadoop API for spark does not work with domain name, use IP address instead
def domain_to_ip(url):
    domain = urlparse(url).netloc.split(":")[0]
    ip_address = socket.gethostbyname(domain)
    ip_url = url.replace(domain, ip_address)
    return ip_url

In [2]:
import spark_util

os.environ['SPARK_CLUSTER'] = "spark-cluster-mhepburn"
os.environ['WORKER_NODES'] = "2"
os.environ['S3_ENDPOINT'] = "http://minio-ml-workshop:9000"

submit_args = f"--conf spark.hadoop.fs.s3a.endpoint={domain_to_ip(os.environ['S3_ENDPOINT_URL'])} \
--conf spark.hadoop.fs.s3a.access.key=minio \
--conf spark.hadoop.fs.s3a.secret.key=minio123 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.multipart.size=104857600 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.jars.ivy=/tmp \
--packages io.delta:delta-core_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.2.0"

session = spark_util.getOrCreateSparkSession("Deltalake Demo", submit_args)
session

Initializing environment variables for Spark
Cluter name: spark-cluster-mhepburn
PYSPARK_SUBMIT_ARGS: --conf spark.hadoop.fs.s3a.endpoint=http://172.30.29.255:9000 --conf spark.hadoop.fs.s3a.access.key=minio --conf spark.hadoop.fs.s3a.secret.key=minio123 --conf spark.hadoop.fs.s3a.path.style.access=true --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.multipart.size=104857600 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.jars.ivy=/tmp --packages io.delta:delta-core_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.2.0 --master spark://spark-cluster-mhepburn:7077 pyspark-shell 
Driver IP address: 10.130.3.93
Creating a spark session...
Spark session created


### Schema Enforcement

With Delta Lake “schema on write” is followed, so any changes in schema when writing will be tracked and any discrepancy will raise an exception at that time.

Below code will make a dataframe of 1–5 numbers and we will write it as a Delta table.

In [3]:
# delete files
myPath = 's3a://delta-sample/*'
hadoopPath = session._jvm.org.apache.hadoop.fs.Path(myPath)

config = session._jvm.org.apache.hadoop.conf.Configuration()
config.set("fs.s3a.endpoint", "http://minio-ml-workshop:9000")
config.set("fs.s3a.access.key","minio")
config.set("fs.s3a.secret.key","minio123")
config.set("fs.s3a.path.style.access","true")
config.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

hadoopFs = hadoopPath.getFileSystem(config)
files = hadoopFs.globStatus(hadoopPath)
for file in files:
    print(file.getPath())
    hadoopFs.delete(file.getPath())

s3a://delta-sample/_delta_log
s3a://delta-sample/delta_merge
s3a://delta-sample/part-00000-2f2298f8-d19d-494c-bb09-b91c396fd64d-c000.snappy.parquet
s3a://delta-sample/part-00000-3d72de99-6909-4ad1-9019-378e99d4aa2f-c000.snappy.parquet
s3a://delta-sample/part-00000-67e07194-933d-461f-a46e-0fdc51bbf875-c000.snappy.parquet
s3a://delta-sample/part-00000-7f2a635b-3419-40af-b453-83e874b5c755-c000.snappy.parquet
s3a://delta-sample/part-00001-a46db623-883d-470f-9264-5096c2fa082d-c000.snappy.parquet
s3a://delta-sample/part-00001-a4aa1b57-711c-4729-9579-1f8858408756-c000.snappy.parquet
s3a://delta-sample/part-00002-47e7ae8f-c643-43ed-86c6-5eeae69de3bb-c000.snappy.parquet
s3a://delta-sample/part-00002-b8e53f38-2045-4f75-bc37-5876d9bdbc72-c000.snappy.parquet
s3a://delta-sample/part-00003-aee13190-b28a-4f4e-99a5-5b39ffadc7ae-c000.snappy.parquet
s3a://delta-sample/part-00003-b200eb0b-31e0-479a-b64b-f7b3940b18a9-c000.snappy.parquet


In [4]:
import shutil

data = session.range(1,5)
data.write.format("delta").mode("overwrite").save("s3a://delta-sample/")

In [5]:
# list all files in bucket
myPath = 's3a://delta-sample/*'
hadoopPath = session._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(session._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)

for status in statuses:
    status.getPath().toUri().getRawPath()
    print(status)

S3AFileStatus{path=s3a://delta-sample/_delta_log; isDirectory=true; modification_time=0; access_time=0; owner=1000760000; group=1000760000; permission=rwxrwxrwx; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} isEmptyDirectory=FALSE
S3AFileStatus{path=s3a://delta-sample/part-00000-05d9f0d5-cb9c-4854-84e8-36d63352ab22-c000.snappy.parquet; isDirectory=false; length=463; replication=1; blocksize=33554432; modification_time=1633995107045; access_time=0; owner=1000760000; group=1000760000; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} isEmptyDirectory=FALSE
S3AFileStatus{path=s3a://delta-sample/part-00001-65cdfd54-c254-4694-b22f-b2bf6b4faf44-c000.snappy.parquet; isDirectory=false; length=463; replication=1; blocksize=33554432; modification_time=1633995107090; access_time=0; owner=1000760000; group=1000760000; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} isEmptyDirectory=F

Now, make a dataframe with numbers from 5–10 and will give its datatype as String and append the dataset on our existing dataset. This _should_ error:

In [6]:
import pyspark.sql.functions as fn
new_data = session.range(5,10)
new_data = new_data.withColumn("id",fn.col("id").cast("String"))
new_data.write.format("delta").mode("append").save("s3a://delta-sample/")

AnalysisException: Failed to merge fields 'id' and 'id'. Failed to merge incompatible data types LongType and StringType

We get an error **AnalysisException: Failed to merge fields 'id' and 'id'. Failed to merge incompatible data types LongType and StringType** which is good.

Delta lake stopped the incorrect data to go in our delta lake.

Let's append the dataset with correct schema.

In [7]:
new_data = session.range(5,10)
new_data.write.format("delta").mode("append").save("s3a://delta-sample/")

We can check the delta logs and see the we added the part files which were newly written on the dataset along with specifying information such as mode of write and modification time.

In [8]:
myPath = 's3a://delta-sample/_delta_log/*.json'
hadoopPath = session._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(session._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)

for status in statuses:
    session.read.text('s3a://delta-sample/' + status.getPath().toUri().getRawPath()).show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"commitInfo":{"timestamp":1633995111113,"operation":"WRITE","operationParameters":{"mode":"Overwrite","part

### Deletion

Lets read our table we just wrote in Delta Format.

In [9]:
from delta.tables import *
delta_df = DeltaTable.forPath(session, "s3a://delta-sample/")
delta_df

<delta.tables.DeltaTable at 0x7f57c024f6d0>

Now, we will delete the data where id is ≤2.

In [10]:
delta_df.delete("id<=2")

Let’s checkout how the commit log is written for delete operation.

It guides spark to delete the original part files through remove and then add the new part file with predicate as id≤2, the operation performed(DELETE) is specified

In [11]:
myPath = 's3a://delta-sample/_delta_log/*.json'
hadoopPath = session._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(session._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)

for status in statuses:
    session.read.text('s3a://delta-sample/' + status.getPath().toUri().getRawPath()).show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"commitInfo":{"timestamp":1633995111113,"operation":"WRITE","operationParameters":{"mode":"Overwrite","part

### Updates

We will read the dataset again and will update the value from 5 to 500.

In [12]:
delta_df = DeltaTable.forPath(session, "s3a://delta-sample/")
delta_df.update(condition = "id = 5", set = { "id": "500" })
delta_df.toDF().show()

+---+
| id|
+---+
|  8|
|  9|
|500|
|  6|
|  4|
|  3|
|  7|
+---+



The above operation is will set id to 500 where it is 5, the Delta Table is auto refresh as the data is updated. 

As you can see the syntax is very simple.

In [13]:
myPath = 's3a://delta-sample/_delta_log/*.json'
hadoopPath = session._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(session._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)

for status in statuses:
    session.read.text('s3a://delta-sample/' + status.getPath().toUri().getRawPath()).show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"commitInfo":{"timestamp":1633995111113,"operation":"WRITE","operationParameters":{"mode":"Overwrite","part

### Merge

Now, we will perform the merge operation on our Delta Table. Create a new dataset containing Country, Year and Temperature columns and will write it as a Delta Table.

In [14]:
data = [
    ("Australia", 2019, 23.34),
    ("Pakistan", 2021, 27.89892)
]
df = session.createDataFrame(data, ["country", "year", "temperature"])
df.write.format("delta").save("s3a://delta-sample/delta_merge")

In [15]:
delta_merge_df = DeltaTable.forPath(session, "s3a://delta-sample/delta_merge")
delta_merge_df.toDF().show()

+---------+----+-----------+
|  country|year|temperature|
+---------+----+-----------+
|Australia|2019|      23.34|
| Pakistan|2021|   27.89892|
+---------+----+-----------+



In [16]:
update_data = [
    ("Australia", 2021, 100.00),
    ("New Zealand", 2019, 19.34534)
]
update_df = session.createDataFrame(update_data, ["country", "year", "temperature"])
update_df.show()

+-----------+----+-----------+
|    country|year|temperature|
+-----------+----+-----------+
|  Australia|2021|      100.0|
|New Zealand|2019|   19.34534|
+-----------+----+-----------+



In [17]:
delta_merge_df.alias("delta_merge").merge(
    update_df.alias("updates"),
    "delta_merge.country = updates.country") \
  .whenMatchedUpdate(set = { 
        "temperature" : "updates.temperature",
        "year" : "updates.year"
  } ) \
  .whenNotMatchedInsert(values =
    {
      "country": "updates.country",
      "year": "updates.year",
      "temperature": "updates.temperature"
    }
  ) \
  .execute()

Final merged records - Australia got updated to 100.00, and year got updated to 2021

In [18]:
delta_merge_df.toDF().show()

+-----------+----+-----------+
|    country|year|temperature|
+-----------+----+-----------+
|New Zealand|2019|   19.34534|
|  Australia|2021|      100.0|
|   Pakistan|2021|   27.89892|
+-----------+----+-----------+



### Time Travel

Delta Lake we will be able to maintain different versions of our dataset’s and can be reused when needed.

In [19]:
delta_df.history().show(10, False)

+-------+-------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|version|timestamp          |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                      |userMetadata|
+-------+-------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|3      |2021-10-11 23:33:34

Current data looks like this:

In [20]:
delta_df = DeltaTable.forPath(session, "s3a://delta-sample/")
delta_df.toDF().show()

+---+
| id|
+---+
|  8|
|  9|
|500|
|  6|
|  4|
|  3|
|  7|
+---+



Lets get back version 1 of our data

In [21]:
version_1 = session.read.format("delta").option("versionAsOf",1).load("s3a://delta-sample/")
version_1.show()

+---+
| id|
+---+
|  8|
|  9|
|  6|
|  1|
|  4|
|  2|
|  3|
|  7|
|  5|
+---+



So we dont blow out our storage, we can use `deltaTable.vacuum()`

    deltaTable.vacuum()     # vacuum files not required by versions more than 7 days old
    deltaTable.vacuum(100)  # vacuum files not required by versions more than 100 hours old

In [22]:
session.stop()