In [75]:
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, IntegerType,StringType

In [76]:
builder = (SparkSession
           .builder
           .master('local[*]')
           .appName('delta lake tutorial')
           .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
           .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog'))
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [77]:
spark.version

'3.2.1'

In [78]:
schema = (StructType([
    StructField('id', IntegerType(), False),
    StructField('name', StringType(), False)
]))
data = [(1, 'zeh'), (2, 'maneh')]
df = spark.createDataFrame(schema=schema, data=data)
df.show()

+---+-----+
| id| name|
+---+-----+
|  1|  zeh|
|  2|maneh|
+---+-----+



## Write sample DF in delta format

In [79]:
delta_table_path = '/tmp/delta-table'
df.write.format('delta').save(delta_table_path)

In [80]:
contact_df = spark.read.format('delta').load('/tmp/delta-table')
contact_df.show()

+---+-----+
| id| name|
+---+-----+
|  2|maneh|
|  1|  zeh|
+---+-----+



## Overwrite

In [81]:
new_data = [(3, 'john'), (4, 'doe')]
new_contact_df = spark.createDataFrame(schema=schema, data=new_data)
new_contact_df.show()

+---+----+
| id|name|
+---+----+
|  3|john|
|  4| doe|
+---+----+



In [82]:
new_contact_df.write.format('delta').mode('overwrite').save(delta_table_path)

In [83]:
contact_df = spark.read.format('delta').load('/tmp/delta-table')
contact_df.show()

+---+----+
| id|name|
+---+----+
|  3|john|
|  4| doe|
+---+----+



## Conditional update without overwrite

In [84]:
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.toDF().show()

+---+----+
| id|name|
+---+----+
|  3|john|
|  4| doe|
+---+----+



### Update every even value by adding 100 to it

In [85]:
delta_table.update(condition=f.expr('id % 2 == 0'), set = {'id': f.expr('id + 100')})
delta_table.toDF().show()

+---+----+
| id|name|
+---+----+
|  3|john|
|104| doe|
+---+----+



### Delete every even value

In [86]:
delta_table.delete(condition=f.expr('id % 2 == 0'))
delta_table.toDF().show()

+---+----+
| id|name|
+---+----+
|  3|john|
+---+----+



### Upsert (merge) new data

In [88]:
more_data = [
    (1, 'stevie'), (2, 'wonder'), (3, 'arnold'), (4, 'schwarzenegger'), (5, 'somebody')
]
more_contact_df = spark.createDataFrame(schema=schema, data=more_data)
more_contact_df.show()

+---+--------------+
| id|          name|
+---+--------------+
|  1|        stevie|
|  2|        wonder|
|  3|        arnold|
|  4|schwarzenegger|
|  5|      somebody|
+---+--------------+



In [89]:
(delta_table.alias('old_data')
 .merge(more_contact_df.alias('new_data'), 'old_data.id == new_data.id')
 .whenMatchedUpdate(set={'name': f.col('new_data.name')})
 .whenNotMatchedInsert(values={'id': f.col('new_data.id'), 'name': f.col('new_data.name')})
 .execute())
delta_table.toDF().show()

+---+--------------+
| id|          name|
+---+--------------+
|  1|        stevie|
|  2|        wonder|
|  3|        arnold|
|  4|schwarzenegger|
|  5|      somebody|
+---+--------------+



## Reading versions of data using time travel

In [97]:
for i in range(0, 5):
    time_machine_df = (spark.read
                       .format('delta')
                       .option('versionAsOf', i)
                       .load(delta_table_path))
    time_machine_df.show()

+---+-----+
| id| name|
+---+-----+
|  2|maneh|
|  1|  zeh|
+---+-----+

+---+----+
| id|name|
+---+----+
|  3|john|
|  4| doe|
+---+----+

+---+----+
| id|name|
+---+----+
|  3|john|
|104| doe|
+---+----+

+---+----+
| id|name|
+---+----+
|  3|john|
+---+----+

+---+--------------+
| id|          name|
+---+--------------+
|  1|        stevie|
|  2|        wonder|
|  3|        arnold|
|  4|schwarzenegger|
|  5|      somebody|
+---+--------------+



In [98]:
spark.stop()