## Practical 5a: Delta Lake

This notebook provides an example of updates and the transaction log.

Source: Bennie Haelen, “Delta Lake: Up & Running”

In [0]:
df = spark.createDataFrame([(1, 'P1'),(2, 'P2'),(3, 'P3'),(4, 'P4') ], ["patientID", "name"])

In [0]:
df.coalesce(2).write.format("delta").mode("overwrite").save( "/tmp/deltaPath")

In [0]:
dbutils.fs.ls("/tmp/deltaPath")

Out[10]: [FileInfo(path='dbfs:/tmp/deltaPath/_delta_log/', name='_delta_log/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/tmp/deltaPath/part-00000-f5fe1f7f-23b9-4cf0-a17a-005a556926a2-c000.snappy.parquet', name='part-00000-f5fe1f7f-23b9-4cf0-a17a-005a556926a2-c000.snappy.parquet', size=847, modificationTime=1677205162000),
 FileInfo(path='dbfs:/tmp/deltaPath/part-00001-00120c57-c3fb-459f-aa67-1a34b3d5bea5-c000.snappy.parquet', name='part-00001-00120c57-c3fb-459f-aa67-1a34b3d5bea5-c000.snappy.parquet', size=847, modificationTime=1677205162000)]

In [0]:
dbutils.fs.ls("/tmp/deltaPath/_delta_log")

Out[11]: [FileInfo(path='dbfs:/tmp/deltaPath/_delta_log/00000000000000000000.crc', name='00000000000000000000.crc', size=2922, modificationTime=1677205167000),
 FileInfo(path='dbfs:/tmp/deltaPath/_delta_log/00000000000000000000.json', name='00000000000000000000.json', size=1934, modificationTime=1677205163000)]

In [0]:
delta_log0=spark.read.format("json").load("/tmp/deltaPath/_delta_log/00000000000000000000.json")
delta_log0.show()

+--------------------+--------------------+--------------------+--------+
|                 add|          commitInfo|            metaData|protocol|
+--------------------+--------------------+--------------------+--------+
|                null|{0224-010716-k9oi...|                null|    null|
|                null|                null|                null|  {1, 2}|
|                null|                null|{1677205160905, {...|    null|
|{true, 1677205162...|                null|                null|    null|
|{true, 1677205162...|                null|                null|    null|
+--------------------+--------------------+--------------------+--------+



In [0]:
delta_log0.select('add').collect()

Out[13]: [Row(add=None),
 Row(add=None),
 Row(add=None),
 Row(add=Row(dataChange=True, modificationTime=1677205162000, path='part-00000-f5fe1f7f-23b9-4cf0-a17a-005a556926a2-c000.snappy.parquet', size=847, stats='{"numRecords":2,"minValues":{"patientID":1,"name":"P1"},"maxValues":{"patientID":2,"name":"P2"},"nullCount":{"patientID":0,"name":0}}', tags=Row(INSERTION_TIME='1677205162000000', MAX_INSERTION_TIME='1677205162000000', MIN_INSERTION_TIME='1677205162000000', OPTIMIZE_TARGET_SIZE='268435456'))),
 Row(add=Row(dataChange=True, modificationTime=1677205162000, path='part-00001-00120c57-c3fb-459f-aa67-1a34b3d5bea5-c000.snappy.parquet', size=847, stats='{"numRecords":2,"minValues":{"patientID":3,"name":"P3"},"maxValues":{"patientID":4,"name":"P4"},"nullCount":{"patientID":0,"name":0}}', tags=Row(INSERTION_TIME='1677205162000001', MAX_INSERTION_TIME='1677205162000001', MIN_INSERTION_TIME='1677205162000001', OPTIMIZE_TARGET_SIZE='268435456')))]

In [0]:
delta_log0.select('commitInfo').collect()

Out[14]: [Row(commitInfo=Row(clusterId='0224-010716-k9oi2cwj', engineInfo='Databricks-Runtime/11.3.x-scala2.12', isBlindAppend=False, isolationLevel='WriteSerializable', notebook=Row(notebookId='2632371252532864'), operation='WRITE', operationMetrics=Row(numFiles='2', numOutputBytes='1694', numOutputRows='4'), operationParameters=Row(mode='Overwrite', partitionBy='[]'), timestamp=1677205162769, txnId='73338f9f-0f3c-4e8d-bee4-68c90baa238f', userId='7450770214914438', userName='aixin@comp.nus.edu.sg')),
 Row(commitInfo=None),
 Row(commitInfo=None),
 Row(commitInfo=None),
 Row(commitInfo=None)]

In [0]:
delta_log0.select('metaData').collect()

Out[15]: [Row(metaData=None),
 Row(metaData=None),
 Row(metaData=Row(createdTime=1677205160905, format=Row(provider='parquet'), id='4fe8f773-d6ab-47f3-bb59-881300fb3676', partitionColumns=[], schemaString='{"type":"struct","fields":[{"name":"patientID","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}')),
 Row(metaData=None),
 Row(metaData=None)]

In [0]:
df = spark.createDataFrame([(5, 'P5'),(6, 'P6')], ["patientID", "name"])
df.coalesce(1).write.format("delta").mode("append").save("/tmp/deltaPath")

In [0]:
delta_log1=spark.read.format("json").load("/tmp/deltaPath/_delta_log/00000000000000000001.json")
delta_log1.show()

+--------------------+--------------------+
|                 add|          commitInfo|
+--------------------+--------------------+
|                null|{0224-010716-k9oi...|
|{true, 1677205246...|                null|
+--------------------+--------------------+



In [0]:
spark.read.format("delta").load("/tmp/deltaPath").show()


+---------+----+
|patientID|name|
+---------+----+
|        1|  P1|
|        2|  P2|
|        3|  P3|
|        4|  P4|
|        5|  P5|
|        6|  P6|
+---------+----+



In [0]:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/tmp/deltaPath")

In [0]:
from pyspark.sql.functions import *
deltaTable.update(col("PatientID") == 1, {"name": lit("P11")})

In [0]:
spark.read.format("delta").load("/tmp/deltaPath").show()

+---------+----+
|patientID|name|
+---------+----+
|        1| P11|
|        2|  P2|
|        3|  P3|
|        4|  P4|
|        5|  P5|
|        6|  P6|
+---------+----+



In [0]:
delta_log2=spark.read.format("json").load("/tmp/deltaPath/_delta_log/00000000000000000002.json")
delta_log2.show()

+--------------------+--------------------+--------------------+
|                 add|          commitInfo|              remove|
+--------------------+--------------------+--------------------+
|                null|{0224-010716-k9oi...|                null|
|                null|                null|{true, 1677205283...|
|{true, 1677205284...|                null|                null|
+--------------------+--------------------+--------------------+



In [0]:
dbutils.fs.ls("/tmp/deltaPath/_delta_log")

Out[23]: [FileInfo(path='dbfs:/tmp/deltaPath/_delta_log/00000000000000000000.crc', name='00000000000000000000.crc', size=2922, modificationTime=1677205167000),
 FileInfo(path='dbfs:/tmp/deltaPath/_delta_log/00000000000000000000.json', name='00000000000000000000.json', size=1934, modificationTime=1677205163000),
 FileInfo(path='dbfs:/tmp/deltaPath/_delta_log/00000000000000000001.crc', name='00000000000000000001.crc', size=3413, modificationTime=1677205249000),
 FileInfo(path='dbfs:/tmp/deltaPath/_delta_log/00000000000000000001.json', name='00000000000000000001.json', size=1014, modificationTime=1677205247000),
 FileInfo(path='dbfs:/tmp/deltaPath/_delta_log/00000000000000000002.crc', name='00000000000000000002.crc', size=3414, modificationTime=1677205288000),
 FileInfo(path='dbfs:/tmp/deltaPath/_delta_log/00000000000000000002.json', name='00000000000000000002.json', size=1499, modificationTime=1677205284000)]