In [1]:
!pip install delta-spark
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.access.key","minio_access_key") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio_secret_key") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://172.24.0.10:9000") \
    .config("spark.databricks.delta.retentionDurationCheck.enabled","false") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()



In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Image saving and versioning demonstration

In the code below, we downloaded a flower dataset from the tensorflow and store it with delta lake. We also captured the metadata from the filename of each picture, and stored them as attributes of the corresponding picture. By doing so, we can query the picture that meets certain requirements (ie flower of a certain type)

In [3]:
import shutil
from urllib import request
import os
# Flowers dataset from the TensorFlow team - https://www.tensorflow.org/datasets/catalog/tf_flowers
imageGzipUrl = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"
imageGzipPath = "/tmp/flower_photos.tgz"
imagePath = "/tmp/image-folder"
deltaPath = "s3a://delta-lake/iris"

# Clear previous run's zipper file, image folder and delta tables
if os.path.exists(imageGzipPath):
  os.remove(imageGzipPath)
shutil.rmtree(imagePath, ignore_errors=True)

request.urlretrieve(imageGzipUrl, imageGzipPath)
shutil.unpack_archive(imageGzipPath, imagePath)

In [4]:
# read the images from the flowers dataset
images = spark.read.format("binaryFile").\
  option("recursiveFileLookup", "true").\
  option("pathGlobFilter", "*.jpg").\
  load(imagePath)

images.show()

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|file:/tmp/image-f...|2016-01-11 06:54:55|281953|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:18:33|277326|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:55:53|265806|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:19:25|257418|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:06:37|248540|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:56:37|246285|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:55:52|244431|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:17:37|243637|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:19:11|242637|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:03:56|240957|[FF D8 FF E0 00 1...|
|file:/tmp/image-f...|2016-01-11 06:57:13|239435|[FF D8 FF E0 00 1...|
|file:

### Capture picture attributes

In [5]:
import pyspark.sql.functions as fn
# Knowing the file path, extract the flower type and filename using substring_index
# Remember, Spark dataframes are immutable, here we are just reusing the images dataframe
images = images.withColumn("flowerType_filename", fn.substring_index(images.path, "/", -2))
images = images.withColumn("flowerType", fn.substring_index(images.flowerType_filename, "/", 1))
images = images.withColumn("filename", fn.substring_index(images.flowerType_filename, "/", -1))
images = images.drop("flowerType_filename")
images.show()

+--------------------+-------------------+------+--------------------+----------+--------------------+
|                path|   modificationTime|length|             content|flowerType|            filename|
+--------------------+-------------------+------+--------------------+----------+--------------------+
|file:/tmp/image-f...|2016-01-11 06:54:55|281953|[FF D8 FF E0 00 1...|    tulips|2431737309_146852...|
|file:/tmp/image-f...|2016-01-11 06:18:33|277326|[FF D8 FF E0 00 1...|sunflowers|4932735362_6e1017...|
|file:/tmp/image-f...|2016-01-11 06:55:53|265806|[FF D8 FF E0 00 1...|    tulips|8717900362_2aa508...|
|file:/tmp/image-f...|2016-01-11 06:19:25|257418|[FF D8 FF E0 00 1...|sunflowers|4341530649_c17bbc...|
|file:/tmp/image-f...|2016-01-11 06:06:37|248540|[FF D8 FF E0 00 1...|     daisy|5693459303_e61d9a...|
|file:/tmp/image-f...|2016-01-11 06:56:37|246285|[FF D8 FF E0 00 1...|    tulips|5674170543_73e3f4...|
|file:/tmp/image-f...|2016-01-11 06:55:52|244431|[FF D8 FF E0 00 1...|   

In [6]:
# Write out the delta table to the given path, this will overwrite any table that is currently there
images.write.format("delta").mode("overwrite").save(deltaPath)

In [None]:
# Reads the delta table that was just written
dfDelta = spark.read.format("delta").load(deltaPath)
dfDelta.show()

### Select only sunflower pictures

In [None]:
dfDelta.filter(dfDelta.flowerType == "dandelion").show()

In [None]:
# Remove saved images
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.delete()

dfDelta = spark.read.format("delta").load(deltaPath)
dfDelta.show()

# Sequential read and history versioning

To showcase the versioning function, we append the flower images one at a time to a new delta table. And, by iterating through the versions, we can see images are being added one at a time. We can also see that the history of delta table recorded 15 append operations after running the code below.

In [None]:
from pathlib import Path

deltaPath = "s3a://delta-lake/iris2"
files = os.listdir(imagePath+"/flower_photos")
path = Path(imagePath)
insert_count = 15

for file_path in path.glob("**/*.jpg"):
    if insert_count == 0:
        break
    
    if file_path.is_file():
        print("Processing:", str(file_path))
        df = spark.read.format("binaryFile").\
          option("pathGlobFilter", "*.jpg").\
          load(str(file_path))
        df.write.format("delta").mode("append").save(deltaPath)
        insert_count -= 1

In [None]:
dfDelta = spark.read.format("delta").load(deltaPath)
print("Delta Table: Iris2")
dfDelta.show()
deltaTable = DeltaTable.forPath(spark, deltaPath)

print("Meta data tracked by Delta Lake Automatically")
detailDF = deltaTable.detail()
detailDF.show(truncate=False)

In [None]:
# Display each version of th edelta table
print("Historical Operations on Iris2")
fullHistoryDF = deltaTable.history()
fullHistoryDF.show(50,truncate=False)

print("Retrieving the first five versions of Iris2")
for version in range(5):
    print(f"version: {version}")
    df = spark.read.format("delta").option("versionAsOf", version).load(deltaPath)
    df.show()