In [None]:
#!pip install delta-spark==1.2.1
#!pyspark --version

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .master('local[*]')
    .appName('Quickstart DeltaLake')
    .config('spark.jars.packages', 'io.delta:delta-core_2.12:1.2.1')
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
    .getOrCreate()
)

In [4]:
sc = spark.sparkContext
sc.setLogLevel('ERROR')

In [5]:
df = spark.read.format('json').load('../data/')

                                                                                

In [7]:
columns = ['user_id', 'email']
df = df.select(columns)
df.columns

['user_id', 'email']

In [8]:
df.show(n=1, vertical=True, truncate=False)

-RECORD 0-------------------------
 user_id | 1703                   
 email   | daron.bailey@email.com 
only showing top 1 row



In [9]:
if df.rdd.getNumPartitions() > 3:
    df.coalesce(1).write.format('delta').mode('overwrite').save('../delta/bronze/user')
else:
    df.write.format('delta').mode('overwrite').save('../delta/bronze/user')

                                                                                

In [10]:
from delta.tables import DeltaTable

DeltaTable.forPath(spark, '../delta/bronze/user').history(1).show(vertical=True, truncate=False)

_ = DeltaTable.forPath(spark, '../delta/bronze/user').toDF()

-RECORD 0-----------------------------------------------------------------------------
 version             | 0                                                              
 timestamp           | 2022-07-11 11:30:51.041                                        
 userId              | null                                                           
 userName            | null                                                           
 operation           | WRITE                                                          
 operationParameters | {mode -> Overwrite, partitionBy -> []}                         
 job                 | null                                                           
 notebook            | null                                                           
 clusterId           | null                                                           
 readVersion         | null                                                           
 isolationLevel      | Serializable        

In [11]:
ids = '1703, 3650'

In [12]:
_.where(f'user_id in ({ids})').show()

                                                                                

+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   1703|daron.bailey@emai...|
|   3650|jonah.barrows@ema...|
+-------+--------------------+



In [13]:
delta_object = DeltaTable.forPath(spark, '../delta/bronze/user')
delta_object

<delta.tables.DeltaTable at 0x103d0e460>

In [14]:
# before delete
_.where('user_id == 1703').show(truncate=False)
_.count()

+-------+----------------------+
|user_id|email                 |
+-------+----------------------+
|1703   |daron.bailey@email.com|
+-------+----------------------+



600

In [15]:
df.select('user_id').distinct().show()

+-------+
|user_id|
+-------+
|   5385|
|    720|
|   8887|
|   9952|
|   7032|
|   4161|
|   2364|
|   5099|
|   2961|
|   6762|
|   3909|
|    343|
|   9000|
|   9045|
|   8222|
|    402|
|   1312|
|   4746|
|   3650|
|   1805|
+-------+
only showing top 20 rows



In [16]:
ids_delete = '5385, 5099, 9000'


In [17]:
# delete
delta_object.delete(condition=f'user_id in ({ids_delete})')

                                                                                

In [18]:
_.count()

596

In [19]:
_.where('user_id == 1805').show()

+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   1805|renee.hahn@email.com|
+-------+--------------------+



In [20]:
# update
delta_object.update(
    condition='email = "renee.hahn@email.com"',
    set = { 'email': '"carlos.barbosa@owshq.com"'}
)

                                                                                

In [21]:
_.where('user_id == 1805').show(truncate=False)

+-------+------------------------+
|user_id|email                   |
+-------+------------------------+
|1805   |carlos.barbosa@owshq.com|
+-------+------------------------+



In [22]:
_.show()

+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   1703|daron.bailey@emai...|
|   3650|jonah.barrows@ema...|
|   8809|carla.hansen@emai...|
|   4606|tomas.ledner@emai...|
|      1|alyse.ortiz@email...|
|   9245|russell.kulas@ema...|
|   3425|armida.lehner@ema...|
|   4264|tad.sanford@email...|
|   1668|rosia.jones@email...|
|    343|candy.conroy@emai...|
|   7393|dulcie.gottlieb@e...|
|   3909|rodrigo.reynolds@...|
|   9952|jenna.bode@email.com|
|   2364|dan.herman@email.com|
|   1611|stanley.witting@e...|
|   1723|clarinda.kilback@...|
|   7032|charley.carroll@e...|
|    549|cameron.harris@em...|
|   4161|reyes.stracke@ema...|
|    503|jolynn.schulist@e...|
+-------+--------------------+
only showing top 20 rows



In [23]:
values = [(1703, 'luan.moreno@owshq.com'), (3650, 'mateus.oliveira@owshq.com')] 
cols   = _.columns

In [24]:
values, cols

([(1703, 'luan.moreno@owshq.com'), (3650, 'mateus.oliveira@owshq.com')],
 ['user_id', 'email'])

In [25]:
new_data = spark.createDataFrame(values, cols)
new_data.show(truncate=False)

+-------+-------------------------+
|user_id|email                    |
+-------+-------------------------+
|1703   |luan.moreno@owshq.com    |
|3650   |mateus.oliveira@owshq.com|
+-------+-------------------------+



In [26]:
# upserts
(
    delta_object.alias('old_data')
        .merge(
            new_data.alias('new_data'), condition = 'old_data.user_id = new_data.user_id'
        )
        .whenMatchedUpdateAll(
            condition = 'old_data.user_id = new_data.user_id'
        )
        .whenNotMatchedInsertAll()
    .execute()
)

                                                                                

In [27]:
ids_update = '1703, 3650'
_.where(f'user_id in ({ids_update})').show(truncate=False)

+-------+-------------------------+
|user_id|email                    |
+-------+-------------------------+
|1703   |luan.moreno@owshq.com    |
|3650   |mateus.oliveira@owshq.com|
+-------+-------------------------+



In [28]:
_.count()

596

In [29]:
delta_object.history().show(truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 3                                                                                                                                                                                                                                                                                  
 timestamp           | 2022-07-11 11:31:28.299                                                                                                                                                                                                                                                            
 userId              | null                                                                            

In [30]:
cols_history = ['version', 'timestamp', 'operation']
delta_object.history().select(cols_history).show(truncate=False)

+-------+-----------------------+---------+
|version|timestamp              |operation|
+-------+-----------------------+---------+
|3      |2022-07-11 11:31:28.299|MERGE    |
|2      |2022-07-11 11:31:20.207|UPDATE   |
|1      |2022-07-11 11:31:14.803|DELETE   |
|0      |2022-07-11 11:30:51.041|WRITE    |
+-------+-----------------------+---------+



In [31]:
# time travel per version
time_travel_version_0 = (
    spark
        .read
        .format('delta')
        .option('versionAsOf', '0')
        .load('../delta/bronze/user/')
)

time_travel_version_0.show()
time_travel_version_0.count()

+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   1703|daron.bailey@emai...|
|   3650|jonah.barrows@ema...|
|   8809|carla.hansen@emai...|
|   4606|tomas.ledner@emai...|
|      1|alyse.ortiz@email...|
|   9245|russell.kulas@ema...|
|   3425|armida.lehner@ema...|
|   4264|tad.sanford@email...|
|   1668|rosia.jones@email...|
|    343|candy.conroy@emai...|
|   7393|dulcie.gottlieb@e...|
|   3909|rodrigo.reynolds@...|
|   9952|jenna.bode@email.com|
|   2364|dan.herman@email.com|
|   1611|stanley.witting@e...|
|   1723|clarinda.kilback@...|
|   7032|charley.carroll@e...|
|    549|cameron.harris@em...|
|   4161|reyes.stracke@ema...|
|    503|jolynn.schulist@e...|
+-------+--------------------+
only showing top 20 rows



600

In [33]:
# time travel per timestamp
time_travel_timestamp = (
    spark
        .read
        .format('delta')
        .option('timestampAsOf', '2022-07-05 17:43:43.85')
        .load('../delta/bronze/user/')
)

time_travel_timestamp.where(f'user_id in ({ids_update})').show(truncate=False)
time_travel_timestamp.count()

In [None]:
# verify changes beetween to versions delta
time_travel_version_0.exceptAll(time_travel_timestamp).show(truncate=False)

+-------+-------------------------+
|user_id|email                    |
+-------+-------------------------+
|9000   |josefa.herman@email.com  |
|3650   |jonah.barrows@email.com  |
|9000   |herschel.turner@email.com|
|5099   |chang.senger@email.com   |
|1703   |daron.bailey@email.com   |
|5385   |winston.kirlin@email.com |
|1805   |renee.hahn@email.com     |
+-------+-------------------------+



In [35]:
# evolution schema
df = spark.read.format('json').load('../data/')

columns = ['user_id', 'email', 'gender']
df = df.select(columns)
df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)



In [36]:
df.write.format('delta').mode('overwrite').option('mergeSchema', True).save('../delta/bronze/user') # mergeSchema / overwriteSchema

In [37]:
delta_object.history(1).show(vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------------------------------
 version             | 4                                                              
 timestamp           | 2022-07-11 11:33:30.445                                        
 userId              | null                                                           
 userName            | null                                                           
 operation           | WRITE                                                          
 operationParameters | {mode -> Overwrite, partitionBy -> []}                         
 job                 | null                                                           
 notebook            | null                                                           
 clusterId           | null                                                           
 readVersion         | 3                                                              
 isolationLevel      | Serializable        

In [38]:
_.show()

+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   3395|marcos.collier@em...|
|   1556|elina.hills@email...|
|   1879|enedina.schroeder...|
|   7805|colin.ryan@email.com|
|   3982|dallas.boyle@emai...|
|   7274|grover.towne@emai...|
|   3184|dexter.schmitt@em...|
|    550|novella.weber@ema...|
|   8365|lesley.mccullough...|
|   4942|marti.marks@email...|
|   8327|shawnna.keebler@e...|
|   9464|guillermo.beahan@...|
|   4123|sid.bechtelar@ema...|
|   2281|merrill.upton@ema...|
|   6998|felipe.ward@email...|
|   7440|willie.walsh@emai...|
|   8397|jae.krajcik@email...|
|   9437|wilfredo.bailey@e...|
|   3148|josefa.marvin@ema...|
|   2535|loyd.hintz@email.com|
+-------+--------------------+
only showing top 20 rows



In [39]:
delta_object.generate('symlink_format_manifest')

In [40]:
df.coalesce(1).write.format('delta').mode('overwrite').save('../delta/bronze/user')

In [41]:
df.coalesce(1).write.format('parquet').mode('overwrite').save('../parquet/user')

In [42]:
# migrate data lake parquet to delta
delta_object.convertToDelta(spark, 'parquet.`../parquet/user/`')

<delta.tables.DeltaTable at 0x103fcfa60>

In [43]:
delta_object.history(1).show(truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------------
 version             | 5                                                              
 timestamp           | 2022-07-11 11:33:49.611                                        
 userId              | null                                                           
 userName            | null                                                           
 operation           | WRITE                                                          
 operationParameters | {mode -> Overwrite, partitionBy -> []}                         
 job                 | null                                                           
 notebook            | null                                                           
 clusterId           | null                                                           
 readVersion         | 4                                                              
 isolationLevel      | Serializable        