

# Delta.io with fake Customer Data

In [2]:
spark

In [3]:
# make sure to have clean start for demo!
!rm -rf mock_delta

In [5]:
# read directory of JSON datasets with multiple datasets per file 

customersClassic = spark.read.format("json").load("mock/")

# then save as delta. [parquet files could also be updated in place]
customersClassic.write.format("delta").save("mock_delta")

# for simplicity we read from and write to the fs, but it could be S3 or another object store

In [4]:
!ls mock

people1.json people2.json people3.json


In [6]:
#reread customer data in delta format
customers = spark.read.format("delta").load("mock_delta")

In [7]:
customers.count()

30

In [8]:
customers.printSchema()

root
 |-- car_make: string (nullable = true)
 |-- car_year: long (nullable = true)
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)



In [9]:
customers.show(5)

+--------+--------+--------------------+----------+---+----------+
|car_make|car_year|               email|first_name| id| last_name|
+--------+--------+--------------------+----------+---+----------+
|    Jeep|    2008|  creddihoughk@de.vu|     Charo| 21|Reddihough|
|   Lexus|    2006|mbrewinl@yolasite...|    Maddie| 22|    Brewin|
|   Dodge|    1997|cblaskettm@siteme...|   Chelsea| 23|  Blaskett|
|Cadillac|    1994|kdullardn@indiego...|     Karee| 24|   Dullard|
|    Saab|    2006|etunsleyo@reuters...|    Eduard| 25|   Tunsley|
+--------+--------+--------------------+----------+---+----------+
only showing top 5 rows



# Check for oldtimers from Audi

In [10]:
customers.filter(customers['car_year'] < 2002).filter(customers['car_make'] == "Audi").show()

+--------+--------+--------------------+----------+---+----------+
|car_make|car_year|               email|first_name| id| last_name|
+--------+--------+--------------------+----------+---+----------+
|    Audi|    1991|binkin5@washingto...|    Briana|  6|     Inkin|
|    Audi|    1986|fbuckthorpe6@syma...|   Findley|  7|Buckthorpe|
|    Audi|    1987|  cmoehled@imgur.com|     Camel| 14|    Moehle|
+--------+--------+--------------------+----------+---+----------+



In [12]:
# register DF as table
customers.createOrReplaceTempView("customers")

In [13]:
# now we can use sql
spark.sql('SELECT email,car_make FROM customers WHERE car_make == "Audi" AND car_year < 2002 ').show()

+--------------------+--------+
|               email|car_make|
+--------------------+--------+
|binkin5@washingto...|    Audi|
|fbuckthorpe6@syma...|    Audi|
|  cmoehled@imgur.com|    Audi|
+--------------------+--------+



# Let's buy an old Audi

In [14]:
# NOTE, there is no UPDATE for Spark tables. But delta enables MERGE with UPSERT.
# this update uses ACID transactions and the data is potentially in different JSON files
spark.sql('UPDATE customers SET email = "hello@world.com" WHERE car_make = "Audi" ')

DataFrame[]

In [15]:
# verify update
spark.sql('SELECT email, car_make FROM customers WHERE car_make == "Audi" AND car_year < 2002 ').show()

+---------------+--------+
|          email|car_make|
+---------------+--------+
|hello@world.com|    Audi|
|hello@world.com|    Audi|
|hello@world.com|    Audi|
+---------------+--------+



# Lets change the Audi to a Porsche (all Audis ...)

In [16]:
spark.sql('UPDATE customers SET car_make = "Porsche" WHERE car_make = "Audi" ')

DataFrame[]

# Time travel (backwards!)

In [19]:
# try different version of the table 0,1,2 
df = spark.read.format("delta").option("versionAsOf", 2).load("mock_delta")

df.show(30)


+-----------+--------+--------------------+----------+---+------------+
|   car_make|car_year|               email|first_name| id|   last_name|
+-----------+--------+--------------------+----------+---+------------+
|       Jeep|    2008|  creddihoughk@de.vu|     Charo| 21|  Reddihough|
|      Lexus|    2006|mbrewinl@yolasite...|    Maddie| 22|      Brewin|
|      Dodge|    1997|cblaskettm@siteme...|   Chelsea| 23|    Blaskett|
|   Cadillac|    1994|kdullardn@indiego...|     Karee| 24|     Dullard|
|       Saab|    2006|etunsleyo@reuters...|    Eduard| 25|     Tunsley|
|     Nissan|    1992|dyakobovitzp@beha...|    Drusie| 26|  Yakobovitz|
|       Ford|    1984|  tvondraq@google.de|      Tome| 27|      Vondra|
|      Mazda|    1992|hmcasterr@census.gov|  Halimeda| 28|     McAster|
|      Dodge|    2006|     coneils@sun.com|    Cassie| 29|      O'Neil|
|      Acura|    1996|avoset@amazonaws.com|     Angie| 30|        Vose|
|      Eagle|    1994|jmissona@craigsli...|     James| 11|      

# Use Python API to show full history

In [20]:
from delta.tables import *
from pyspark.sql.functions import *

# Access the Delta Lake table
deltaTable = DeltaTable.forPath(spark,"mock_delta")
# Delete all on-time and early flights
deltaTable.history().show(5)

# select("version","operation","operationParameters")

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      2|2021-06-08 17:38:...|  null|    null|   UPDATE|{predicate -> (ca...|null|    null|     null|          1|          null|        false|{numRemovedFiles ...|        null|
|      1|2021-06-08 17:38:...|  null|    null|   UPDATE|{predicate -> (ca...|null|    null|     null|          0|          null|        false|{numRemovedFiles ...|        null|
|      0|2021-06-08 17:36:...|  null|    null|    WRITE|{mode -> ErrorIfE...|null|    null|     null|       null|  

In [17]:
! ls mock_delta 

[34m_delta_log[m[m
part-00000-27f5e15f-5f55-475d-b821-1be71eec623a-c000.snappy.parquet
part-00000-2a4b36a8-5de0-4412-b772-bebb02719798-c000.snappy.parquet
part-00000-e78901b1-9918-4392-8e2e-fb419f0e1088-c000.snappy.parquet
part-00001-968bf01f-ae30-47c5-9c06-19a26be948c0-c000.snappy.parquet
part-00001-b6c588cb-81ef-41a4-b688-4417a2233643-c000.snappy.parquet
part-00001-cc330c14-350e-4007-8e4e-9d7bac6bed67-c000.snappy.parquet
part-00002-f800694d-159f-4620-8736-614cb6bd77ef-c000.snappy.parquet


In [18]:
# for delta log details,
# see: https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html

! ls -la mock_delta/_delta_log

total 24
drwxr-xr-x   5 frank.munz  staff   160 Jun  8 11:23 [34m.[m[m
drwxr-xr-x  17 frank.munz  staff   544 Jun  8 11:23 [34m..[m[m
-rw-r--r--   1 frank.munz  staff  1484 Jun  8 11:22 00000000000000000000.json
-rw-r--r--   1 frank.munz  staff  1080 Jun  8 11:23 00000000000000000001.json
-rw-r--r--   1 frank.munz  staff  1080 Jun  8 11:23 00000000000000000002.json


In [19]:
df = spark.read.format("delta").option("versionAsOf", 2).load("mock_delta")

df.show(30)

+-----------+--------+--------------------+----------+---+------------+
|   car_make|car_year|               email|first_name| id|   last_name|
+-----------+--------+--------------------+----------+---+------------+
|       Jeep|    2008|  creddihoughk@de.vu|     Charo| 21|  Reddihough|
|      Lexus|    2006|mbrewinl@yolasite...|    Maddie| 22|      Brewin|
|      Dodge|    1997|cblaskettm@siteme...|   Chelsea| 23|    Blaskett|
|   Cadillac|    1994|kdullardn@indiego...|     Karee| 24|     Dullard|
|       Saab|    2006|etunsleyo@reuters...|    Eduard| 25|     Tunsley|
|     Nissan|    1992|dyakobovitzp@beha...|    Drusie| 26|  Yakobovitz|
|       Ford|    1984|  tvondraq@google.de|      Tome| 27|      Vondra|
|      Mazda|    1992|hmcasterr@census.gov|  Halimeda| 28|     McAster|
|      Dodge|    2006|     coneils@sun.com|    Cassie| 29|      O'Neil|
|      Acura|    1996|avoset@amazonaws.com|     Angie| 30|        Vose|
|      Eagle|    1994|jmissona@craigsli...|     James| 11|      

In [20]:
# time is in days, so we will not see effect immediately
deltaTable.vacuum()


# Recursively vacuum directories associated with the Delta table and remove data files 
# that are no longer in the latest state of the transaction log for the table and are 
# older than a retention threshold. default retention is 7d. 

DataFrame[]

# misc 


In [21]:
spark.catalog.listDatabases()
#spark.sql('show databases').show()
spark.catalog.listTables('default')

[Table(name='customers', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [22]:
%lsmagic


Available line magics:
%alias  %alias_magic  %autoawait  %autocall  %automagic  %autosave  %bookmark  %cat  %cd  %clear  %colors  %conda  %config  %connect_info  %cp  %debug  %dhist  %dirs  %doctest_mode  %ed  %edit  %env  %gui  %hist  %history  %killbgscripts  %ldir  %less  %lf  %lk  %ll  %load  %load_ext  %loadpy  %logoff  %logon  %logstart  %logstate  %logstop  %ls  %lsmagic  %lx  %macro  %magic  %man  %matplotlib  %mkdir  %more  %mv  %notebook  %page  %pastebin  %pdb  %pdef  %pdoc  %pfile  %pinfo  %pinfo2  %pip  %popd  %pprint  %precision  %prun  %psearch  %psource  %pushd  %pwd  %pycat  %pylab  %qtconsole  %quickref  %recall  %rehashx  %reload_ext  %rep  %rerun  %reset  %reset_selective  %rm  %rmdir  %run  %save  %sc  %set_env  %store  %sx  %system  %tb  %time  %timeit  %unalias  %unload_ext  %who  %who_ls  %whos  %xdel  %xmode

Available cell magics:
%%!  %%HTML  %%SVG  %%bash  %%capture  %%debug  %%file  %%html  %%javascript  %%js  %%latex  %%markdown  %%perl  %%prun  %%pypy  %%