### Working with delta lakes and spark via Databricks

In [0]:
from delta import *
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [0]:
spark

In [0]:
# creates delta table
data = spark.range(0, 10)
data.write.format("delta").save("/tmp/delta-table")

In [0]:
del data

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  3|
|  4|
|  8|
|  9|
|  0|
|  1|
|  2|
|  5|
|  6|
|  7|
+---+



In [0]:
# example of updating data
data = spark.range(10, 20)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
| 13|
| 14|
| 18|
| 19|
| 11|
| 12|
| 15|
| 16|
| 17|
| 10|
+---+



In [0]:
# time travel to read old versions of data
df0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df0.show()

+---+
| id|
+---+
|  3|
|  4|
|  8|
|  9|
|  0|
|  1|
|  2|
|  5|
|  6|
|  7|
+---+



In [0]:
# updating odd values by adding 100

from delta.tables import *
from pyspark.sql.functions import *

# set the path
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# update logic for the delta
deltaTable.update(
  condition = expr("id % 2 == 1"),
  set = { "id": expr("id + 100") })

deltaTable.toDF().show()

+---+
| id|
+---+
|113|
| 14|
| 18|
|119|
| 12|
| 16|
|111|
|115|
|117|
| 10|
+---+



In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation

In [0]:
fullHistoryDF.show()

+-------+-------------------+----------------+-------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+
|version|          timestamp|          userId|           userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+----------------+-------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+
|      2|2022-01-28 18:09:12|6584847049774054|dbw2tn@virginia.edu|   UPDATE|{predicate -> ((i...|null|{3205054796946383}|0128-165643-erqkptyh|          1|WriteSerializable|        false|{numRemovedFiles ...|        null|
|      1|2022-01-28 18:08:55|6584847049774054|dbw2tn@virginia.edu|    WRITE|{mode -> Overwrit...|null|{3205054796946

In [0]:
fullHistoryDF.select('version','operation','operationMetrics').show(truncate=False)

+-------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationMetrics                                                                                                                                                                 |
+-------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2      |UPDATE   |{numRemovedFiles -> 5, numCopiedRows -> 2, numAddedChangeFiles -> 0, executionTimeMs -> 6432, scanTimeMs -> 3737, numAddedFiles -> 5, numUpdatedRows -> 5, rewriteTimeMs -> 2694}|
|1      |WRITE    |{numFiles -> 8, numOutputBytes -> 4415, numOutputRows -> 10}                                                                                                                     |
|0      |W

In [0]:
%sql

DESCRIBE DETAIL "/tmp/delta-table"

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,6b6a802a-cca8-480f-a6e6-f5ddee06eb56,,,dbfs:/tmp/delta-table,2022-01-28T18:08:37.012+0000,2022-01-28T18:09:12.000+0000,List(),8,4415,Map(),1,2


In [0]:
dbutils.fs.rm('/tmp/delta-table',recurse=True)

Out[30]: True