This is based on this video :
https://www.youtube.com/watch?v=u1VfOiHVeMI

In [2]:
import findspark
import os

findspark.init(os.environ['SPARK_HOME'])

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import *

In [4]:
spark = SparkSession.builder.appName("DeltaLake Transaction Logs") \
    .master("local[4]") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

In [121]:
spark.conf.set('spark.sql.shuffle.partitions', '4')
spark.conf.set('spark.default.parallelism', '4')

In [6]:
# Load helper functions

%run Helpers.ipynb

In [8]:


# Source data
# Change paths to match your environment!

inputPath    = "/tmp/spark/data/source/"
sourceData   = inputPath + "online-retail-dataset.csv"

# Base location for all saved data
basePath     = "/tmp/spark/data" 

# Path for Parquet formatted data
parquetPath  = basePath + "/parquet/online_retail_data"

# Path for Delta formatted data
deltaPath    = basePath + "/delta/online_retail_data"
deltaLogPath = deltaPath + "/_delta_log"

# Clean up from last run.
! rm -Rf $deltaPath 2>/dev/null
print("Deleted path " + deltaPath)

! rm -Rf $parquetPath 2>/dev/null
print("Deleted path " + parquetPath)



Deleted path /tmp/spark/data/delta/online_retail_data
Deleted path /tmp/spark/data/parquet/online_retail_data


In [10]:
# Data sourced from "Spark - The Definitive Guide", located at: https://github.com/databricks/Spark-The-Definitive-Guide
# Data origin: http://archive.ics.uci.edu/ml/datasets/Online+Retail
import os.path

file_exists = os.path.isfile(f'{sourceData}')
 
if not file_exists:
    print("-> Downloading dataset.")
    os.system(f'mkdir -p {inputPath}')
    os.system(f'curl https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/retail-data/all/online-retail-dataset.csv -o {sourceData}')
    file_exists = os.path.isfile(f'{sourceData}')
    
if file_exists:
    print("-> Dataset is present.\n")
    
    fileSize = ! du -m "$sourceData" | cut -f1 # Posix compliant
    print(f"File [{sourceData}] is {fileSize} MB in size.")

-> Downloading dataset.
-> Dataset is present.

File [/tmp/spark/data/source/online-retail-dataset.csv] is ['44'] MB in size.


In [11]:
! head -5 {sourceData}

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom


In [12]:
# Provide schema for source data
# Schema source: http://archive.ics.uci.edu/ml/datasets/Online+Retail#

# SQL DDL method
schemaDDL = """InvoiceNo Integer, StockCode String, Description String, Quantity Integer, 
               InvoiceDate String, UnitPrice Double, CustomerID Integer, Country String """

In [16]:
rawSalesDataDF = spark.read \
.schema(schemaDDL) \
.option("header", True) \
.csv(sourceData)

rwCnt = rawSalesDataDF.count()
rwPart = rawSalesDataDF.rdd.getNumPartitions()

print(f'Count is {rwCnt} and Partition cnt os {rwPart}')

Count is 541909 and Partition cnt os 4


In [24]:
# Identify Columns with null values

rawSalesDataDF.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in rawSalesDataDF.columns]).show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|     9291|        0|       1454|       0|          0|        0|    135080|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [33]:
cleanSalesDataDF = rawSalesDataDF.filter("InvoiceNo is not null and CustomerID is not null")
print(f" Total count is {cleanSalesDataDF.count()}")

cleanSalesDataDF.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in cleanSalesDataDF.columns]).show()

 Total count is 397924
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          0|       0|          0|        0|         0|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [34]:
# Define new dataframe based on cleansed data but only use a subset of the data to make things run faster

# Random sample of 25%, with seed and without replacement
retailSalesData1 = cleanSalesDataDF.sample(withReplacement=False, fraction=.25, seed=75)

# Count rows and partitions
rowCount = retailSalesData1.count() 
partCount = retailSalesData1.rdd.getNumPartitions()

print(f'Row Count: {rowCount} Partition Count: {partCount}')

Row Count: 99226 Partition Count: 4


In [35]:
retailSalesData1.show(5, truncate= False)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |21730    |GLASS STAR FROSTED T-LIGHT HOLDER  |6       |12/1/2010 8:26|4.25     |17850     |United Kingdom|
|536367   |22745    |POPPY'S PLAYHOUSE BEDROOM          |6       |12/1/2010 8:34|2.1      |13047     |United Kingdom|
|536367   |84969    |BOX OF 6 ASSORTED COLOUR TEASPOONS |6       |12/1/2010 8:34|4.25     |13047     |United Kingdom|
+---------+---------+-----------------------------------

In [37]:


# Create database to hold demo objects
spark.sql("CREATE DATABASE IF NOT EXISTS deltademo")
spark.sql("SHOW DATABASES").show()

# Current DB should be deltademo
spark.sql("USE deltademo")
spark.sql("SELECT CURRENT_DATABASE()").show()
#spark.sql("DESCRIBE DATABASE deltademo").show(truncate = False)

# Clean-up from last run
spark.sql("DROP TABLE IF EXISTS SalesParquetFormat")
spark.sql("DROP TABLE IF EXISTS SalesDeltaFormat")
spark.sql("DROP TABLE IF EXISTS tbl_CheckpointFile")
spark.sql("SHOW TABLES").show()



+---------+
|namespace|
+---------+
|  default|
|deltademo|
+---------+

+------------------+
|current_database()|
+------------------+
|         deltademo|
+------------------+

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [38]:
retailSalesData1.write.saveAsTable('SalesParquetFormat', format='parquet', mode='overwrite', path=parquetPath)

In [45]:
files_in_dir(parquetPath, "parquet")

195KB    2020-09-26 10:09:11  part-00003-038c8e77-b750-4f8a-8240-7bbce1bacf37-c000.snappy.parquet
282KB    2020-09-26 10:09:11  part-00000-038c8e77-b750-4f8a-8240-7bbce1bacf37-c000.snappy.parquet
312KB    2020-09-26 10:09:11  part-00001-038c8e77-b750-4f8a-8240-7bbce1bacf37-c000.snappy.parquet
314KB    2020-09-26 10:09:11  part-00002-038c8e77-b750-4f8a-8240-7bbce1bacf37-c000.snappy.parquet

Number of file/s: 4 | Total size: 1.2M


In [44]:
spark.sql("describe formatted SalesParquetFormat").show(truncate=False)

+----------------------------+-----------------------------------------------+-------+
|col_name                    |data_type                                      |comment|
+----------------------------+-----------------------------------------------+-------+
|InvoiceNo                   |int                                            |null   |
|StockCode                   |string                                         |null   |
|Description                 |string                                         |null   |
|Quantity                    |int                                            |null   |
|InvoiceDate                 |string                                         |null   |
|UnitPrice                   |double                                         |null   |
|CustomerID                  |int                                            |null   |
|Country                     |string                                         |null   |
|                            |             

In [46]:
spark.sql("select * from SalesParquetFormat").show(3, truncate=False)

+---------+---------+----------------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                 |Quantity|InvoiceDate    |UnitPrice|CustomerID|Country       |
+---------+---------+----------------------------+--------+---------------+---------+----------+--------------+
|563016   |21497    |FANCY FONTS BIRTHDAY WRAP   |25      |8/11/2011 12:44|0.42     |15358     |United Kingdom|
|563016   |22993    |SET OF 4 PANTRY JELLY MOULDS|12      |8/11/2011 12:44|1.25     |15358     |United Kingdom|
|563016   |22961    |JAM MAKING SET PRINTED      |12      |8/11/2011 12:44|1.45     |15358     |United Kingdom|
+---------+---------+----------------------------+--------+---------------+---------+----------+--------------+
only showing top 3 rows



In [47]:
spark.sql(""" insert into SalesParquetFormat 
              VALUES(963316, 2291, "WORLD'S BEST JAM MAKING SET", 5, "08/13/2011 07:58", 1.45, 15358, "United Kingdom")
          """)

DataFrame[]

In [48]:
files_in_dir(parquetPath, "parquet")

195KB    2020-09-26 10:09:11  part-00003-038c8e77-b750-4f8a-8240-7bbce1bacf37-c000.snappy.parquet
282KB    2020-09-26 10:09:11  part-00000-038c8e77-b750-4f8a-8240-7bbce1bacf37-c000.snappy.parquet
312KB    2020-09-26 10:09:11  part-00001-038c8e77-b750-4f8a-8240-7bbce1bacf37-c000.snappy.parquet
314KB    2020-09-26 10:09:11  part-00002-038c8e77-b750-4f8a-8240-7bbce1bacf37-c000.snappy.parquet
2KB      2020-09-26 10:16:08  part-00000-98d508b8-040b-487f-9586-cd402154f430-c000.snappy.parquet

Number of file/s: 5 | Total size: 1.2M


In [62]:
retailSalesData1.write.mode('overwrite').format('delta').save(deltaPath)

# Query directly into the location with delta
spark.sql(f"select * from delta.`{deltaPath}` limit 3").show(truncate=False)

+---------+---------+----------------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                 |Quantity|InvoiceDate    |UnitPrice|CustomerID|Country       |
+---------+---------+----------------------------+--------+---------------+---------+----------+--------------+
|563016   |21497    |FANCY FONTS BIRTHDAY WRAP   |25      |8/11/2011 12:44|0.42     |15358     |United Kingdom|
|563016   |22993    |SET OF 4 PANTRY JELLY MOULDS|12      |8/11/2011 12:44|1.25     |15358     |United Kingdom|
|563016   |22961    |JAM MAKING SET PRINTED      |12      |8/11/2011 12:44|1.45     |15358     |United Kingdom|
+---------+---------+----------------------------+--------+---------------+---------+----------+--------------+



In [63]:
deltaTable = DeltaTable.forPath(spark, deltaPath)

In [64]:
history = deltaTable.history().select('version','timestamp','operation', 'operationParameters', \
                                      'operationMetrics') \
                              .withColumnRenamed("version", "ver")

history.show(truncate=False)

+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+
|ver|timestamp          |operation|operationParameters                   |operationMetrics                                                  |
+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+
|0  |2020-09-26 10:55:56|WRITE    |[mode -> Overwrite, partitionBy -> []]|[numFiles -> 4, numOutputBytes -> 1129731, numOutputRows -> 99226]|
+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+



In [66]:
files_in_dir(deltaLogPath, 'json')

2KB      2020-09-26 10:55:56  00000000000000000000.json

Number of file/s: 1 | Total size: 4.0K


In [69]:
spark.read.format('json').load(deltaLogPath).collect()

[Row(add=None, commitInfo=Row(isBlindAppend=False, operation='WRITE', operationMetrics=Row(numFiles='4', numOutputBytes='1129731', numOutputRows='99226'), operationParameters=Row(mode='Overwrite', partitionBy='[]'), timestamp=1601142956522), metaData=None, protocol=None),
 Row(add=None, commitInfo=None, metaData=None, protocol=Row(minReaderVersion=1, minWriterVersion=2)),
 Row(add=None, commitInfo=None, metaData=Row(createdTime=1601142955491, format=Row(provider='parquet'), id='1cdd1401-0952-4573-acd4-69331ee931e3', partitionColumns=[], schemaString='{"type":"struct","fields":[{"name":"InvoiceNo","type":"integer","nullable":true,"metadata":{}},{"name":"StockCode","type":"string","nullable":true,"metadata":{}},{"name":"Description","type":"string","nullable":true,"metadata":{}},{"name":"Quantity","type":"integer","nullable":true,"metadata":{}},{"name":"InvoiceDate","type":"string","nullable":true,"metadata":{}},{"name":"UnitPrice","type":"double","nullable":true,"metadata":{}},{"name":"

In [70]:
retailSalesData2 = cleanSalesDataDF.sample(withReplacement=False, fraction=.25, seed=31)
retailSalesData2.count()

99149

In [71]:
retailSalesData2.write.mode('append').format('delta').save(deltaPath)

In [75]:
# Have to run this again , cannot directly use the previous history DF and expect to see updated info
# See the readVersion , this will tell the order of transaction , we can use version column also
history = deltaTable.history().select('version','timestamp','operation', 'operationParameters', \
                                      'operationMetrics','readVersion') \
                              .withColumnRenamed("version", "ver")
history.show(truncate=False)

+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+-----------+
|ver|timestamp          |operation|operationParameters                   |operationMetrics                                                  |readVersion|
+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+-----------+
|1  |2020-09-26 11:01:54|WRITE    |[mode -> Append, partitionBy -> []]   |[numFiles -> 4, numOutputBytes -> 1125004, numOutputRows -> 99149]|0          |
|0  |2020-09-26 10:55:56|WRITE    |[mode -> Overwrite, partitionBy -> []]|[numFiles -> 4, numOutputBytes -> 1129731, numOutputRows -> 99226]|null       |
+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+-----------+



In [76]:
spark.catalog.listTables()

[Table(name='salesparquetformat', database='deltademo', description=None, tableType='EXTERNAL', isTemporary=False)]

In [77]:
spark.sql("""
    DROP TABLE IF EXISTS SalesDeltaFormat
  """)
spark.sql("""
    CREATE TABLE SalesDeltaFormat
    USING DELTA
    LOCATION '{}'
  """.format(deltaPath))


DataFrame[]

In [78]:
spark.catalog.listTables()

[Table(name='salesdeltaformat', database='deltademo', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='salesparquetformat', database='deltademo', description=None, tableType='EXTERNAL', isTemporary=False)]

In [84]:
spark.sql("desc formatted SalesDeltaFormat").show(truncate=False)

+----------------------------+---------------------------------------------+-------+
|col_name                    |data_type                                    |comment|
+----------------------------+---------------------------------------------+-------+
|InvoiceNo                   |int                                          |       |
|StockCode                   |string                                       |       |
|Description                 |string                                       |       |
|Quantity                    |int                                          |       |
|InvoiceDate                 |string                                       |       |
|UnitPrice                   |double                                       |       |
|CustomerID                  |int                                          |       |
|Country                     |string                                       |       |
|                            |                                   

In [79]:
# Let's find a Invoice with only 1 count and use it to test DML.
oneRandomInvoice = spark.sql(""" SELECT InvoiceNo, count(*)
                                 FROM SalesDeltaFormat
                                 GROUP BY InvoiceNo
                                 ORDER BY 2 asc
                                 LIMIT 1
                             """).collect()[0][0]

print(f"Random Invoice # => {oneRandomInvoice}")

Random Invoice # => 569522


In [85]:
# Before DML (insert)
# See the function input_file_name , tells from which file the record is coming from.Not a delta specific feature
spark.sql(f"""
              SELECT SUBSTRING(input_file_name(), -67, 67) AS FileName,
                     * FROM SalesDeltaFormat 
              WHERE InvoiceNo = {oneRandomInvoice}
           """).show(truncate = False)

+-------------------------------------------------------------------+---------+---------+-------------------------------+--------+---------------+---------+----------+-------+
|FileName                                                           |InvoiceNo|StockCode|Description                    |Quantity|InvoiceDate    |UnitPrice|CustomerID|Country|
+-------------------------------------------------------------------+---------+---------+-------------------------------+--------+---------------+---------+----------+-------+
|part-00002-bc283a61-7bc1-4f81-ab11-813ad73ae653-c000.snappy.parquet|569522   |84997D   |CHILDRENS CUTLERY POLKADOT PINK|72      |10/4/2011 14:41|3.75     |12664     |Finland|
+-------------------------------------------------------------------+---------+---------+-------------------------------+--------+---------------+---------+----------+-------+



In [88]:
spark.sql(f"""
               INSERT INTO SalesDeltaFormat
               VALUES({oneRandomInvoice}, 2291, "WORLD'S BEST JAM MAKING SET", 5, "08/13/2011 07:58", 1.45, 15358, "France");
          """)

DataFrame[]

In [89]:
files_in_dir(deltaLogPath,"*")

2KB      2020-09-26 10:55:56  00000000000000000000.json
938B     2020-09-26 11:01:54  00000000000000000001.json
410B     2020-09-26 11:15:32  00000000000000000002.json

Number of file/s: 3 | Total size: 12K


In [90]:
logDF = spark.read.format("json").load(deltaLogPath + "/00000000000000000002.json")
#dfLog.printSchema()
logDF.collect()

[Row(add=None, commitInfo=Row(isBlindAppend=True, operation='WRITE', operationMetrics=Row(numFiles='1', numOutputBytes='2369', numOutputRows='1'), operationParameters=Row(mode='Append', partitionBy='[]'), readVersion=1, timestamp=1601144132398)),
 Row(add=Row(dataChange=True, modificationTime=1601144132000, path='part-00000-34347f0c-d248-425a-bcdf-a1d0f9ddf145-c000.snappy.parquet', size=2369), commitInfo=None)]

In [91]:
# After DML (insert)

spark.sql(f"""
              SELECT SUBSTRING(input_file_name(), -67, 67) AS FileName, *
                     FROM SalesDeltaFormat 
                     WHERE InvoiceNo = {oneRandomInvoice}
           """).show(truncate = False)



+-------------------------------------------------------------------+---------+---------+-------------------------------+--------+----------------+---------+----------+-------+
|FileName                                                           |InvoiceNo|StockCode|Description                    |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country|
+-------------------------------------------------------------------+---------+---------+-------------------------------+--------+----------------+---------+----------+-------+
|part-00002-bc283a61-7bc1-4f81-ab11-813ad73ae653-c000.snappy.parquet|569522   |84997D   |CHILDRENS CUTLERY POLKADOT PINK|72      |10/4/2011 14:41 |3.75     |12664     |Finland|
|part-00000-34347f0c-d248-425a-bcdf-a1d0f9ddf145-c000.snappy.parquet|569522   |2291     |WORLD'S BEST JAM MAKING SET    |5       |08/13/2011 07:58|1.45     |15358     |France |
+-------------------------------------------------------------------+---------+---------+--------------------------

In [92]:
# Update one invoice

spark.sql(f"""
              UPDATE SalesDeltaFormat
              SET Quantity = Quantity + 1000
              WHERE InvoiceNo = {oneRandomInvoice}
           """)

#deltaTable.update(
#    condition=("InvoiceNo = oneRandomInvoice"),
#    set={"Quantity": expr("Quantity + 1000")}
#)

DataFrame[]

In [93]:
# After Update
# Creates two new parquet file and one new transaction json file
# Reads parquet file and writes it back with a new file with new data
spark.sql(f"""
              SELECT 
              SUBSTRING(input_file_name(), -67, 67) AS FileName, *
              FROM SalesDeltaFormat 
              WHERE InvoiceNo = {oneRandomInvoice}
           """).show(truncate = False)

+-------------------------------------------------------------------+---------+---------+-------------------------------+--------+----------------+---------+----------+-------+
|FileName                                                           |InvoiceNo|StockCode|Description                    |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country|
+-------------------------------------------------------------------+---------+---------+-------------------------------+--------+----------------+---------+----------+-------+
|part-00000-6a1b9713-b947-44e7-ba46-1be65ce2df31-c000.snappy.parquet|569522   |84997D   |CHILDRENS CUTLERY POLKADOT PINK|1072    |10/4/2011 14:41 |3.75     |12664     |Finland|
|part-00001-b3950e95-b902-47be-8203-792f0d9b62cc-c000.snappy.parquet|569522   |2291     |WORLD'S BEST JAM MAKING SET    |1005    |08/13/2011 07:58|1.45     |15358     |France |
+-------------------------------------------------------------------+---------+---------+--------------------------

In [94]:
history = deltaTable.history().select('version','timestamp','operation', 'operationParameters', \
                                      'operationMetrics','readVersion') \
                              .withColumnRenamed("version", "ver")
history.show(truncate=False)

+---+-------------------+---------+----------------------------------------+---------------------------------------------------------------------------------------+-----------+
|ver|timestamp          |operation|operationParameters                     |operationMetrics                                                                       |readVersion|
+---+-------------------+---------+----------------------------------------+---------------------------------------------------------------------------------------+-----------+
|3  |2020-09-26 11:22:59|UPDATE   |[predicate -> (InvoiceNo#5861 = 569522)]|[numRemovedFiles -> 2, numAddedFiles -> 2, numUpdatedRows -> 2, numCopiedRows -> 29509]|2          |
|2  |2020-09-26 11:15:32|WRITE    |[mode -> Append, partitionBy -> []]     |[numFiles -> 1, numOutputBytes -> 2369, numOutputRows -> 1]                            |1          |
|1  |2020-09-26 11:01:54|WRITE    |[mode -> Append, partitionBy -> []]     |[numFiles -> 4, numOutputBytes -> 11250

In [95]:
files_in_dir(deltaPath, "parquet")

195KB    2020-09-26 10:55:56  part-00003-cb420873-82cb-4d22-b84a-480e369894ce-c000.snappy.parquet
282KB    2020-09-26 10:55:56  part-00000-c031708e-6649-488a-ad2b-7b4988e9e4b4-c000.snappy.parquet
314KB    2020-09-26 10:55:56  part-00002-9e8e8b63-37c5-488d-b11d-6d17c0559893-c000.snappy.parquet
312KB    2020-09-26 10:55:56  part-00001-97510b30-dd26-479f-ab9e-a6372faa609d-c000.snappy.parquet
193KB    2020-09-26 11:01:54  part-00003-168600ad-ee98-43c9-90bf-a8edb8de8716-c000.snappy.parquet
310KB    2020-09-26 11:01:54  part-00001-848f8231-6f8d-47c0-8599-3321cc8f720f-c000.snappy.parquet
281KB    2020-09-26 11:01:54  part-00000-1218c889-b29d-4078-a213-f8fbfbfe7562-c000.snappy.parquet
315KB    2020-09-26 11:01:54  part-00002-bc283a61-7bc1-4f81-ab11-813ad73ae653-c000.snappy.parquet
2KB      2020-09-26 11:15:32  part-00000-34347f0c-d248-425a-bcdf-a1d0f9ddf145-c000.snappy.parquet
2KB      2020-09-26 11:22:59  part-00001-b3950e95-b902-47be-8203-792f0d9b62cc-c000.snappy.parquet
315KB    2020-09-26 

In [96]:
files_in_dir(deltaLogPath,"*")

2KB      2020-09-26 10:55:56  00000000000000000000.json
938B     2020-09-26 11:01:54  00000000000000000001.json
410B     2020-09-26 11:15:32  00000000000000000002.json
902B     2020-09-26 11:22:59  00000000000000000003.json

Number of file/s: 4 | Total size: 16K


In [97]:
# Before DML (delete)

spark.sql(f"""select 
          substring(input_file_name(), -67, 67) as FileName,
          * from SalesDeltaFormat 
          where InvoiceNo = {oneRandomInvoice}""").show(truncate = False)



+-------------------------------------------------------------------+---------+---------+-------------------------------+--------+----------------+---------+----------+-------+
|FileName                                                           |InvoiceNo|StockCode|Description                    |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country|
+-------------------------------------------------------------------+---------+---------+-------------------------------+--------+----------------+---------+----------+-------+
|part-00000-6a1b9713-b947-44e7-ba46-1be65ce2df31-c000.snappy.parquet|569522   |84997D   |CHILDRENS CUTLERY POLKADOT PINK|1072    |10/4/2011 14:41 |3.75     |12664     |Finland|
|part-00001-b3950e95-b902-47be-8203-792f0d9b62cc-c000.snappy.parquet|569522   |2291     |WORLD'S BEST JAM MAKING SET    |1005    |08/13/2011 07:58|1.45     |15358     |France |
+-------------------------------------------------------------------+---------+---------+--------------------------

In [98]:
# https://github.com/delta-io/delta/blob/master/examples/python/quickstart.py
# Delete and invoice (two records)

# This results in one new file being created.  One file had just the one record so it does not have to be re-created
# Each of the two records were in two different files. One of those files had only one record so it did not have to be re-created.

spark.sql(f"DELETE FROM SalesDeltaFormat WHERE InvoiceNo = {oneRandomInvoice}")

# deltaTable.delete(
#    condition=("InvoiceNo = {537617}")
# )

DataFrame[]

In [99]:


# After DML (delete)

spark.sql(f"""
              SELECT 
              SUBSTRING(input_file_name(), -67, 67) as FileName, *
              FROM SalesDeltaFormat 
              WHERE InvoiceNo = {oneRandomInvoice}
          """).show(truncate = False)



+--------+---------+---------+-----------+--------+-----------+---------+----------+-------+
|FileName|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+--------+---------+---------+-----------+--------+-----------+---------+----------+-------+
+--------+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [100]:
history = deltaTable.history().select('version','timestamp','operation', 'operationParameters', \
                                      'operationMetrics','readVersion') \
                              .withColumnRenamed("version", "ver")
history.show(truncate=False)

+---+-------------------+---------+----------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+-----------+
|ver|timestamp          |operation|operationParameters                                                               |operationMetrics                                                                       |readVersion|
+---+-------------------+---------+----------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+-----------+
|4  |2020-09-26 11:36:54|DELETE   |[predicate -> ["(spark_catalog.deltademo.SalesDeltaFormat.`InvoiceNo` = 569522)"]]|[numRemovedFiles -> 2, numDeletedRows -> 2, numAddedFiles -> 1, numCopiedRows -> 29509]|3          |
|3  |2020-09-26 11:22:59|UPDATE   |[predicate -> (InvoiceNo#5861 = 569522)]                                          |[numRe

In [101]:
# Randomy update 5 random invoices to force a checkpoint

count = 0
anInvoice = retailSalesData2.select("InvoiceNo").orderBy(rand()).limit(1).collect()[0][0]

while (count <= 5):
  deltaTable.update(
    condition=(f"InvoiceNo = {anInvoice}"),
    set={"Quantity": expr("Quantity + 100")})

  count = count + 1
  anInvoice = retailSalesData2.select("InvoiceNo").orderBy(rand()).limit(1).collect()[0][0]

In [102]:
# Checkpoint after 10 transaction json file is created
# Checkpoint parquetfile will usually have metadata to show lastest snapshot of the data , if you need time travle we will have to raed the json metadata file.

checkPointDF = spark.read.format("parquet").load(deltaLogPath + "/00000000000000000010.checkpoint.parquet")
checkPointDF.show(10, truncate = False)

+----+---------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+--------+--------+----------+
|txn |add                                                                                                      |remove                                                                                     |metaData|protocol|commitInfo|
+----+---------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+--------+--------+----------+
|null|null                                                                                                     |[part-00000-faebb022-ac41-4e64-b1f1-01155a0a1174-c000.snappy.parquet, 1601145781282, false]|null    |null    |null      |
|null|null                                                      

In [103]:
checkPointDF.printSchema()

root
 |-- txn: struct (nullable = true)
 |    |-- appId: string (nullable = true)
 |    |-- version: long (nullable = true)
 |    |-- lastUpdated: long (nullable = true)
 |-- add: struct (nullable = true)
 |    |-- path: string (nullable = true)
 |    |-- partitionValues: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |    |-- size: long (nullable = true)
 |    |-- modificationTime: long (nullable = true)
 |    |-- dataChange: boolean (nullable = true)
 |    |-- stats: string (nullable = true)
 |    |-- tags: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- remove: struct (nullable = true)
 |    |-- path: string (nullable = true)
 |    |-- deletionTimestamp: long (nullable = true)
 |    |-- dataChange: boolean (nullable = true)
 |-- metaData: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- description:

In [105]:
checkPointFile10 =(
    checkPointDF.select(col("add.path").alias("FileAdded"),
                        col("add.modificationTime").alias("DateAdded"),
                        col("remove.path").alias("FileDeleted"),
                        col("remove.deletionTimestamp").alias("DateDeleted"))
                .orderBy(["DateAdded","DateDeleted"], ascending=[True,False])
)

spark.sql("DROP TABLE IF EXISTS tbl_checkpointfile")
spark.sql("CREATE TABLE IF NOT EXISTS tbl_checkpointfile (Action string, filename string, ActionDate Long)")

checkPointFile10.createOrReplaceTempView("vw_checkpointfile")


In [106]:
spark.sql("select * from vw_checkpointfile limit 100").show(100, truncate=False)

+-------------------------------------------------------------------+-------------+-------------------------------------------------------------------+-------------+
|FileAdded                                                          |DateAdded    |FileDeleted                                                        |DateDeleted  |
+-------------------------------------------------------------------+-------------+-------------------------------------------------------------------+-------------+
|null                                                               |null         |part-00000-faebb022-ac41-4e64-b1f1-01155a0a1174-c000.snappy.parquet|1601145781282|
|null                                                               |null         |part-00001-dd868f0f-7575-4ec2-b939-eb32ca0e5257-c000.snappy.parquet|1601145781282|
|null                                                               |null         |part-00001-16bca056-f8f2-4a12-bc1b-b9201e57172a-c000.snappy.parquet|1601145777891|
|nul

In [107]:
spark.sql("""
          INSERT INTO tbl_checkpointfile
          SELECT "Add", FileAdded, DateAdded
          FROM vw_checkpointfile
          WHERE FileAdded IS NOT NULL
          """)

spark.sql("""
          INSERT INTO tbl_checkpointfile
          SELECT "Delete", FileDeleted, DateDeleted
          FROM vw_checkpointfile
          WHERE FileDeleted IS NOT NULL
          """)

DataFrame[]

In [109]:
spark.sql("""
           SELECT Action, 
                  filename as `File Name`, 
                  from_unixtime(actiondate/1e3) AS `ActionDate`
           FROM tbl_checkpointfile 
           order by ActionDate asc
          """).show(200, truncate=False)



+------+-------------------------------------------------------------------+-------------------+
|Action|File Name                                                          |ActionDate         |
+------+-------------------------------------------------------------------+-------------------+
|Add   |part-00003-cb420873-82cb-4d22-b84a-480e369894ce-c000.snappy.parquet|2020-09-26 10:55:56|
|Add   |part-00003-168600ad-ee98-43c9-90bf-a8edb8de8716-c000.snappy.parquet|2020-09-26 11:01:54|
|Delete|part-00002-bc283a61-7bc1-4f81-ab11-813ad73ae653-c000.snappy.parquet|2020-09-26 11:22:58|
|Delete|part-00000-34347f0c-d248-425a-bcdf-a1d0f9ddf145-c000.snappy.parquet|2020-09-26 11:22:58|
|Delete|part-00001-b3950e95-b902-47be-8203-792f0d9b62cc-c000.snappy.parquet|2020-09-26 11:36:53|
|Delete|part-00000-6a1b9713-b947-44e7-ba46-1be65ce2df31-c000.snappy.parquet|2020-09-26 11:36:53|
|Delete|part-00001-97510b30-dd26-479f-ab9e-a6372faa609d-c000.snappy.parquet|2020-09-26 11:42:42|
|Delete|part-00001-848f8231-6f

In [111]:
history = deltaTable.history().select('version','timestamp','operation', 'operationParameters', \
                                      'operationMetrics','readVersion') \
                              .withColumnRenamed("version", "ver")
history.show(truncate=False)


+---+-------------------+---------+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-----------+
|ver|timestamp          |operation|operationParameters                                                               |operationMetrics                                                                        |readVersion|
+---+-------------------+---------+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-----------+
|10 |2020-09-26 11:43:01|UPDATE   |[predicate -> (InvoiceNo#3843 = 548808)]                                          |[numRemovedFiles -> 2, numAddedFiles -> 2, numUpdatedRows -> 20, numCopiedRows -> 50101]|9          |
|9  |2020-09-26 11:42:58|UPDATE   |[predicate -> (InvoiceNo#3843 = 571184)]                                          |[n

In [112]:
# Let's add some data to our table by doing a merge
# Do this to show how json logs pick up after checkpoint

# Create a tiny dataframe to use with merge
mergeSalesData= cleanSalesDataDF.sample(withReplacement=False, fraction=.0001, seed=13)
mergeSalesData.createOrReplaceTempView("vw_mergeSalesData")

# User-defined commit metadata
# Usermedata of history table will get this info
spark.sql(f"""
               SET spark.databricks.delta.commitInfo.userMetadata=08-25-2020 Data Merge;
          """)

spark.sql("""
          MERGE INTO SalesDeltaFormat
          USING vw_mergeSalesData
          ON SalesDeltaFormat.StockCode = vw_mergeSalesData.StockCode
           AND SalesDeltaFormat.InvoiceNo = vw_mergeSalesData.InvoiceNo
          WHEN MATCHED THEN
            UPDATE SET *
          WHEN NOT MATCHED
            THEN INSERT *
          """)

DataFrame[]

In [113]:
history = deltaTable.history().select('version','timestamp','operation', 'operationParameters', \
                                      'operationMetrics','readVersion') \
                              .withColumnRenamed("version", "ver")
history.show(truncate=False)
# I had not set spark.sql.shuffle.partitions to 4 while running the merge , so 200 files got created.

+---+-------------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|ver|timestamp          |operation|operationParameters                                                                                                                                                                               |operationMetrics                                                                                                                                                                                                         |readVersion|
+---+-------------------+---------+-------------------------------------------

In [118]:
deltaTable.history().select("version","userMetadata").show(truncate=False)

+-------+----------------------+
|version|userMetadata          |
+-------+----------------------+
|11     |08-25-2020 Data Merge;|
|10     |null                  |
|9      |null                  |
|8      |null                  |
|7      |null                  |
|6      |null                  |
|5      |null                  |
|4      |null                  |
|3      |null                  |
|2      |null                  |
|1      |null                  |
|0      |null                  |
+-------+----------------------+



In [119]:
# Count files in deltaPath

numFiles = ! ls $deltaPath/*parquet | wc -l
print(f"There are {numFiles} files.")

There are ['     224'] files.


In [122]:


# Create an artificial "small file" problem

(spark.read
.format("delta")
.load(deltaPath)
.repartition(1000)
.write
.option("dataChange", True)
.format("delta")
.mode("overwrite")
.save(deltaPath)
)



In [123]:
# Count files in deltaPath

numFiles = ! ls $deltaPath/*parquet | wc -l
print(f"There are {numFiles} files.")

There are ['    1224'] files.


In [124]:
%%time

# spark.sql("select * from SalesDeltaFormat limit 2").show()

rowCount = spark.sql(""" SELECT CustomerID, count(Country) AS num_countries
                         FROM SalesDeltaFormat
                         GROUP BY CustomerID 
                     """).count()

print(f"Row Count => {rowCount}\n")

Row Count => 4244

CPU times: user 1.74 ms, sys: 2.12 ms, total: 3.86 ms
Wall time: 1.69 s


In [125]:
# Compact 1000 files to 4

(spark.read
.format("delta")
.load(deltaPath)
.repartition(4)
.write
.option("dataChange", False)
.format("delta")
.mode("overwrite")
.save(deltaPath)
)

In [126]:
# Count files in deltaPath

numFiles = ! ls $deltaPath/*parquet | wc -l
print(f"There are {numFiles} files.")

There are ['    1228'] files.


In [127]:
%%time

# spark.sql("select * from SalesDeltaFormat limit 2").show()
rowCount = spark.sql(""" select CustomerID, count(Country) as num_countries
                         from SalesDeltaFormat
                        group by CustomerID """).count()

print(f"Row Count => {rowCount}")

Row Count => 4244
CPU times: user 1.48 ms, sys: 1.89 ms, total: 3.37 ms
Wall time: 447 ms


In [128]:
# Time Travel Queries

#POO currentVersion = deltaTable.history(1).select("version").collect()[0][0]
# Determine latest version of the Delta table
currentVersion = spark.sql("DESCRIBE HISTORY SalesDeltaFormat LIMIT 1").collect()[0][0]

In [131]:
# Query table as of the current version to attain row count
currentRowCount = spark.read.format("delta").option("versionAsOf", currentVersion).load(deltaPath).count()

print(f"Row Count: {currentRowCount} as of table version {currentVersion}")
print("")

Row Count: 198400 as of table version 13



In [132]:
spark.read.format("delta").load(deltaPath).count()

198400

In [133]:
# Determine difference in record count between the current version and the original version of the table.

origRowCount = spark.read.format("delta").option("versionAsOf", 0).load(deltaPath).count()
print(f"There are {currentRowCount-origRowCount} more rows in version [{currentVersion}] than version [0] of the table.")



There are 99174 more rows in version [13] than version [0] of the table.


In [134]:
# Roll back current table to version 0 (original).
(
    spark
    .read
    .format("delta")
    .option("versionAsOf",0)
    .load(deltaPath)
    .write
    .format("delta")
    .mode("overwrite")
    .save(deltaPath)
)

In [137]:
# Current version should have same record count as version 0.

currentVersion = spark.sql("DESCRIBE HISTORY SalesDeltaFormat LIMIT 1").collect()[0][0]
print(currentVersion)
# If equal it will return "true"
spark.read.format("delta").option("versionAsOf", currentVersion).load(deltaPath).count() == spark.read.format("delta").option("versionAsOf", 0).load(deltaPath).count()

14


True

In [138]:
print("####### HISTORY ########")

# Observe history of actions taken on a Delta table
history = deltaTable.history().select('version','operation', 'operationParameters', \
                                      'operationMetrics') \
                              .withColumnRenamed("version", "ver")

history.show(100, truncate = False)

####### HISTORY ########
+---+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ver|operation|operationParameters                                                                                                                                                                               |operationMetrics                                                                                                                                                                                                         |
+---+---------+------------------------------------------------------------------------------------------------------


## Delta Vacuum - Data Retention

delta.logRetentionDuration - default 30 days   
delta.deletedFileRetentionDuration - default 30 days

    Don't need to set them to be the same. You may want to keep the log files around after the tombstoned files are purged.
    Time travel in order of months/years infeasible
    Initially desinged to correct mistakes



In [139]:
# files_in_dir(deltaPath,"parquet")
# Count files in deltaPath

numFiles = ! ls $deltaPath/*parquet | wc -l
print(f"There are {numFiles} files.")



There are ['    1232'] files.


In [140]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

In [141]:
%%time
# Vacuum Delta table to remove all history

spark.sql("VACUUM SalesDeltaFormat RETAIN 0 HOURS").show(truncate = False)
# ! Can use deltaTable.vacuum(0) against directory

+---------------------------------------------+
|path                                         |
+---------------------------------------------+
|file:/tmp/spark/data/delta/online_retail_data|
+---------------------------------------------+

CPU times: user 6.83 ms, sys: 5.18 ms, total: 12 ms
Wall time: 59.3 s


In [142]:
files_in_dir(deltaPath,"parquet")

195KB    2020-09-26 12:23:40  part-00003-005548cc-1df0-4598-a4c8-f0d374761cc0-c000.snappy.parquet
282KB    2020-09-26 12:23:40  part-00002-f24dfef8-ff99-4f73-980d-fcf7104acf95-c000.snappy.parquet
314KB    2020-09-26 12:23:40  part-00000-865a0c57-3839-4392-82b2-bfc931f75c4f-c000.snappy.parquet
312KB    2020-09-26 12:23:40  part-00001-ca10834b-9660-49de-bf41-9216be06a3ae-c000.snappy.parquet

Number of file/s: 4 | Total size: 1.5M


In [143]:
# So VACUUM DOES NOT CLEAN UP TRANSACTION LOG FILE, default is 30 days
files_in_dir(deltaLogPath,"*")

2KB      2020-09-26 10:55:56  00000000000000000000.json
938B     2020-09-26 11:01:54  00000000000000000001.json
410B     2020-09-26 11:15:32  00000000000000000002.json
902B     2020-09-26 11:22:59  00000000000000000003.json
775B     2020-09-26 11:36:54  00000000000000000004.json
905B     2020-09-26 11:42:43  00000000000000000005.json
905B     2020-09-26 11:42:47  00000000000000000006.json
905B     2020-09-26 11:42:50  00000000000000000007.json
905B     2020-09-26 11:42:54  00000000000000000008.json
905B     2020-09-26 11:42:58  00000000000000000009.json
905B     2020-09-26 11:43:01  00000000000000000010.json
17KB     2020-09-26 11:43:04  00000000000000000010.checkpoint.parquet
35KB     2020-09-26 11:59:58  00000000000000000011.json
196KB    2020-09-26 12:10:11  00000000000000000012.json
141KB    2020-09-26 12:11:36  00000000000000000013.json
2KB      2020-09-26 12:23:40  00000000000000000014.json

Number of file/s: 16 | Total size: 452K


In [144]:
spark.read.format("json").load(deltaLogPath + "/00000000000000000014.json").collect()

[Row(add=None, commitInfo=Row(isBlindAppend=False, operation='WRITE', operationMetrics=Row(numFiles='4', numOutputBytes='1129731', numOutputRows='99226'), operationParameters=Row(mode='Overwrite', partitionBy='[]'), readVersion=13, timestamp=1601148220630, userMetadata='08-25-2020 Data Merge;'), remove=None),
 Row(add=Row(dataChange=True, modificationTime=1601148220000, path='part-00000-865a0c57-3839-4392-82b2-bfc931f75c4f-c000.snappy.parquet', size=321712), commitInfo=None, remove=None),
 Row(add=Row(dataChange=True, modificationTime=1601148220000, path='part-00001-ca10834b-9660-49de-bf41-9216be06a3ae-c000.snappy.parquet', size=319754), commitInfo=None, remove=None),
 Row(add=Row(dataChange=True, modificationTime=1601148220000, path='part-00002-f24dfef8-ff99-4f73-980d-fcf7104acf95-c000.snappy.parquet', size=288465), commitInfo=None, remove=None),
 Row(add=Row(dataChange=True, modificationTime=1601148220000, path='part-00003-005548cc-1df0-4598-a4c8-f0d374761cc0-c000.snappy.parquet', si

In [145]:
# Configure Delta table to keep around 7 days of deleted data and 7 days of older log files
spark.sql("alter table SalesDeltaFormat set tblproperties ('delta.logRetentionDuration' = 'interval 7 days', 'delta.deletedFileRetentionDuration' = 'interval 7 days')")



DataFrame[]

In [146]:
# Verify our changes
spark.sql("describe extended SalesDeltaFormat").show(truncate = False)


+----------------------------+-----------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                      |comment|
+----------------------------+-----------------------------------------------------------------------------------------------+-------+
|InvoiceNo                   |int                                                                                            |       |
|StockCode                   |string                                                                                         |       |
|Description                 |string                                                                                         |       |
|Quantity                    |int                                                                                            |       |
|InvoiceDate                 |string                   

In [147]:
# Clean up from last run.
# Clean up from last run.
! rm -Rf $sourceData 2>/dev/null
print("Deleted path " + deltaPath)


! rm -Rf $deltaPath 2>/dev/null
print("Deleted path " + deltaPath)

! rm -Rf $parquetPath 2>/dev/null
print("Deleted path " + parquetPath)


Deleted path /tmp/spark/data/delta/online_retail_data
Deleted path /tmp/spark/data/delta/online_retail_data
Deleted path /tmp/spark/data/parquet/online_retail_data
