# Set up local SparkSession

In [1]:
from datetime import date

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from delta.tables import DeltaTable

In [2]:
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.repl.eagerEval.maxNumRows", 10)
    .config("spark.sql.repl.eagerEval.truncate", 500)
    .getOrCreate()
)

22/12/23 13:55:55 WARN Utils: Your hostname, HF-LPT-1115A.local resolves to a loopback address: 127.0.0.1; using 192.168.1.33 instead (on interface en0)
22/12/23 13:55:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/gk/Git/jupyter-demo/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/gk/.ivy2/cache
The jars for the packages stored in: /Users/gk/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b0c96c4e-6618-41fa-b647-7058dd1a1d85;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.2.0 in central
	found io.delta#delta-storage;2.2.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 115ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-core_2.12;2.2.0 from central in [default]
	io.delta#delta-storage;2.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |  

22/12/23 13:55:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Read flights data from CSV

In [3]:
df = (
    spark
    .read
    .option("header", True)
    .option("inferSchema", True)
    .csv("flights.csv")
)
df

year,month,day,departure_time,arrival_time,carrier,flight_number,aircraft_registration,flight_time,airport_origin,airport_destination,distance,is_cancelled,is_diverted
2008,1,3,1343,1451,WN,588,N240WN,68,HOU,LIT,393,0,0
2008,1,3,1125,1247,WN,1343,N523SW,82,HOU,MAF,441,0,0
2008,1,3,2009,2136,WN,3841,N280WN,87,HOU,MAF,441,0,0
2008,1,3,903,1203,WN,3,N308SA,120,HOU,MCO,848,0,0
2008,1,3,1423,1726,WN,25,N462WN,123,HOU,MCO,848,0,0
2008,1,3,2024,2325,WN,51,N483WN,121,HOU,MCO,848,0,0
2008,1,3,1753,2053,WN,940,N493WN,120,HOU,MCO,848,0,0
2008,1,3,622,935,WN,2621,N266WN,133,HOU,MCO,848,0,0
2008,1,3,1944,2210,WN,389,N266WN,146,HOU,MDW,937,0,0
2008,1,3,1453,1716,WN,519,N514SW,143,HOU,MDW,937,0,0


# Perform some simple aggregations and transformations

In [4]:
df = (
    df
    .withColumn("date", F.to_date(F.concat("year", F.lit("-"), "month", F.lit("-"), "day")))
    .where("is_cancelled = 0 and is_diverted = 0")
    .groupBy("date", "airport_origin", "airport_destination")
    .agg({"*": "count", "flight_time": "avg"})
    .withColumnRenamed("count(1)", "count_flights")
    .withColumnRenamed("avg(flight_time)", "avg_flight_time")
    .withColumn("avg_flight_time", F.col("avg_flight_time").cast("int"))
)
df.orderBy("count_flights", ascending=False)

date,airport_origin,airport_destination,count_flights,avg_flight_time
2008-01-03,LAX,OAK,20,80
2008-01-03,LAS,PHX,18,64
2008-01-03,LAX,LAS,14,63
2008-01-03,LAS,LAX,14,67
2008-01-03,LAS,SAN,14,63
2008-01-03,LAS,BUR,13,62
2008-01-03,LAX,PHX,13,66
2008-01-03,LAX,SJC,13,64
2008-01-03,LAS,RNO,13,74
2008-01-03,LAS,OAK,13,95


# Write to partitioned Delta table

In [5]:
(
    df
    .write
    .mode("overwrite")
    .partitionBy(["date"])
    .format("delta")
    .saveAsTable("flights")
)

                                                                                

22/12/23 13:56:10 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

# Perform updates, deletes and merges on the table

In [6]:
flights_table = DeltaTable.forName(spark, "flights")

### Update and delete

In [7]:
flights_table.update(
    condition="avg_flight_time = 80",
    set={"avg_flight_time": "null"}
)

flights_table.delete("airport_origin = 'LAS' or airport_destination = 'LAS'")

flights_table.toDF().orderBy("count_flights", ascending=False)

date,airport_origin,airport_destination,count_flights,avg_flight_time
2008-01-03,LAX,OAK,20,
2008-01-03,LAX,PHX,13,66.0
2008-01-03,LAX,SJC,13,64.0
2008-01-03,MCI,MDW,12,81.0
2008-01-03,MDW,MCI,12,79.0
2008-01-03,HOU,MSY,11,58.0
2008-01-03,MHT,BWI,11,79.0
2008-01-03,LAX,SMF,10,71.0
2008-01-03,MDW,STL,10,59.0
2008-01-03,HRL,HOU,9,54.0


### Merge

In [8]:
incoming_df = spark.createDataFrame([
    {"date": date(2008, 1, 3), "airport_origin": "LAX", "airport_destination": "OAK", "count_flights": 21, "avg_flight_time": 83},
    {"date": date(2008, 1, 4), "airport_origin": "JFK", "airport_destination": "EZE", "count_flights": 22, "avg_flight_time": 660},
])
incoming_df.createOrReplaceTempView("flights_incoming_data")

(
    flights_table
    .alias("existing")
    .merge(
        source=incoming_df.alias("incoming"),
        condition="existing.date = incoming.date "
                  "AND existing.airport_origin = incoming.airport_origin "
                  "AND existing.airport_destination = incoming.airport_destination"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

flights_table.toDF().orderBy("count_flights", ascending=False)

                                                                                

date,airport_origin,airport_destination,count_flights,avg_flight_time
2008-01-04,JFK,EZE,22,660
2008-01-03,LAX,OAK,21,83
2008-01-03,LAX,PHX,13,66
2008-01-03,LAX,SJC,13,64
2008-01-03,MCI,MDW,12,81
2008-01-03,MDW,MCI,12,79
2008-01-03,MHT,BWI,11,79
2008-01-03,HOU,MSY,11,58
2008-01-03,MDW,STL,10,59
2008-01-03,LAX,SMF,10,71


# Time travel

### Query table history

In [9]:
flights_table.history()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2022-12-23 13:56:49,,,MERGE,"{predicate -> (((existing.date = incoming.date) AND (existing.airport_origin = incoming.airport_origin)) AND (existing.airport_destination = incoming.airport_destination)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}]}",,,,2.0,Serializable,False,"{numTargetRowsCopied -> 177, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 1922, numTargetRowsInserted -> 1, scanTimeMs -> 1286, numTargetRowsUpdated -> 1, numOutputRows -> 179, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 601}",,Apache-Spark/3.3.1 Delta-Lake/2.2.0
2,2022-12-23 13:56:24,,,DELETE,"{predicate -> [""((spark_catalog.default.flights.airport_origin = 'LAS') OR (spark_catalog.default.flights.airport_destination = 'LAS'))""]}",,,,1.0,Serializable,False,"{numRemovedFiles -> 1, numCopiedRows -> 178, numAddedChangeFiles -> 0, executionTimeMs -> 681, numDeletedRows -> 65, scanTimeMs -> 453, numAddedFiles -> 1, rewriteTimeMs -> 227}",,Apache-Spark/3.3.1 Delta-Lake/2.2.0
1,2022-12-23 13:56:23,,,UPDATE,{predicate -> (avg_flight_time#1027 = 80)},,,,0.0,Serializable,False,"{numRemovedFiles -> 1, numCopiedRows -> 241, numAddedChangeFiles -> 0, executionTimeMs -> 1055, scanTimeMs -> 826, numAddedFiles -> 1, numUpdatedRows -> 2, rewriteTimeMs -> 228}",,Apache-Spark/3.3.1 Delta-Lake/2.2.0
0,2022-12-23 13:56:10,,,CREATE OR REPLACE TABLE AS SELECT,"{isManaged -> true, description -> null, partitionBy -> [""date""], properties -> {}}",,,,,Serializable,False,"{numFiles -> 1, numOutputRows -> 243, numOutputBytes -> 3206}",,Apache-Spark/3.3.1 Delta-Lake/2.2.0


### Read an older version

In [10]:
df = spark.read.option("versionAsOf", 0).table("flights")
df.orderBy("count_flights", ascending=False)

date,airport_origin,airport_destination,count_flights,avg_flight_time
2008-01-03,LAX,OAK,20,80
2008-01-03,LAS,PHX,18,64
2008-01-03,LAX,LAS,14,63
2008-01-03,LAS,LAX,14,67
2008-01-03,LAS,SAN,14,63
2008-01-03,LAS,BUR,13,62
2008-01-03,LAX,PHX,13,66
2008-01-03,LAX,SJC,13,64
2008-01-03,LAS,RNO,13,74
2008-01-03,LAS,OAK,13,95


### Rollback to a previous version

In [11]:
flights_table.restoreToVersion(0)

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
3206,1,2,1,4038,3206


In [12]:
flights_table.history()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2022-12-23 13:57:31,,,RESTORE,"{version -> 0, timestamp -> null}",,,,3.0,Serializable,False,"{numRestoredFiles -> 1, removedFilesSize -> 4038, numRemovedFiles -> 2, restoredFilesSize -> 3206, numOfFilesAfterRestore -> 1, tableSizeAfterRestore -> 3206}",,Apache-Spark/3.3.1 Delta-Lake/2.2.0
3,2022-12-23 13:56:49,,,MERGE,"{predicate -> (((existing.date = incoming.date) AND (existing.airport_origin = incoming.airport_origin)) AND (existing.airport_destination = incoming.airport_destination)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}]}",,,,2.0,Serializable,False,"{numTargetRowsCopied -> 177, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 1922, numTargetRowsInserted -> 1, scanTimeMs -> 1286, numTargetRowsUpdated -> 1, numOutputRows -> 179, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 601}",,Apache-Spark/3.3.1 Delta-Lake/2.2.0
2,2022-12-23 13:56:24,,,DELETE,"{predicate -> [""((spark_catalog.default.flights.airport_origin = 'LAS') OR (spark_catalog.default.flights.airport_destination = 'LAS'))""]}",,,,1.0,Serializable,False,"{numRemovedFiles -> 1, numCopiedRows -> 178, numAddedChangeFiles -> 0, executionTimeMs -> 681, numDeletedRows -> 65, scanTimeMs -> 453, numAddedFiles -> 1, rewriteTimeMs -> 227}",,Apache-Spark/3.3.1 Delta-Lake/2.2.0
1,2022-12-23 13:56:23,,,UPDATE,{predicate -> (avg_flight_time#1027 = 80)},,,,0.0,Serializable,False,"{numRemovedFiles -> 1, numCopiedRows -> 241, numAddedChangeFiles -> 0, executionTimeMs -> 1055, scanTimeMs -> 826, numAddedFiles -> 1, numUpdatedRows -> 2, rewriteTimeMs -> 228}",,Apache-Spark/3.3.1 Delta-Lake/2.2.0
0,2022-12-23 13:56:10,,,CREATE OR REPLACE TABLE AS SELECT,"{isManaged -> true, description -> null, partitionBy -> [""date""], properties -> {}}",,,,,Serializable,False,"{numFiles -> 1, numOutputRows -> 243, numOutputBytes -> 3206}",,Apache-Spark/3.3.1 Delta-Lake/2.2.0


# Vacuum

In [13]:
# Keep only last 7 days of data
flights_table.vacuum(retentionHours=168)

                                                                                

Deleted 0 files and directories in a total of 3 directories.


# Compact small files

In [14]:
flights_table.optimize().executeCompaction()

path,metrics
file:/Users/gk/Git/jupyter-demo/notebooks/delta-lake/spark-warehouse/flights,"{0, 0, {null, null, 0.0, 0, 0}, {null, null, 0.0, 0, 0}, 0, null, 0, 1, 1, false, 0, 0, 1671814702973, 0, 8, 0, null}"
