In [None]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [None]:
# Load the DF

bike_df = (spark
           .read          
           .option("header", True)
           .option("inferSchema", True)
           .option("delimiter", ",")
            .csv("file:///data/bike/bike-retail.csv"))
bike_df.show(5, truncate=False)

In [None]:
# Persist as delta table

bike_df.write.format("delta").save("file:///data/bike/retail-bike")

In [None]:
# Load DF from delta table

ddf = spark.read.format("delta").load("file:///data/bike/retail-bike")
ddf.show(5, truncate=False)

# Essential Delta Lake Operations

## Create

In [None]:
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Create a database for the examples
spark.sql("create database if not exists exampleDB")

In [None]:
# Make sure the table doesn't already exist
spark.sql("""
DROP TABLE if exists exampleDB.countries
""")

In [None]:
# Create an empty table using sql
spark.sql("""
CREATE TABLE exampleDB.scountries (
  id LONG,
  country STRING,
  capital STRING
) USING DELTA;
""")

In [None]:
# Create a DeltaTable object using python 
from pyspark.sql.types import *
from delta.tables import DeltaTable

delta_table = (
    DeltaTable.create(spark)
    .tableName("exampleDB.pcountries")
    .addColumn("id", dataType=LongType(), nullable=False)
    .addColumn("country", dataType=StringType(), nullable=False)
    .addColumn("capital", dataType=StringType(), nullable=False)
    .execute())

In [None]:
# Load data using insert into

spark.sql("""
    INSERT INTO exampleDB.scountries VALUES
    (1, 'United Kingdom', 'London'),
    (2, "Canada", "Ottawa");
""")

In [None]:
# Load data using python

data = [
(1, "United Kingdom", "London"),
(2, "Canada", "Toronto")
]

# Create a schema
schema = ["id", "country", "capital"]

# Create a dataframe
df = spark.createDataFrame(data, schema=schema)

df.write.format("delta").insertInto("exampleDB.pcountries")


In [None]:
# Append

data = [(3, "United States", "Washington, D.C.")]

schema = ["id", "country", "capital"]

df = spark.createDataFrame(data, schema=schema)

df.write.format("delta").mode("append").saveAsTable("exampleDB.pcountries")

## Read

In [None]:
# Read

from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "exampleDB.pcountries")
delta_table_df = delta_table.toDF()

delta_table_df.show()

In [None]:
delta_table_df.select("id", "capital").show()

In [None]:
# filtering

delta_table_df.filter(delta_table_df.capital == 'London').show()

### Reading with Time Travel

In [49]:
# Supported using one of qualifiers: _VERSION AS OF_ and _TIMESTAMP AS OF_
# Python - Notice that the DeltaTable API doesn't support time travel read yet

# VERSION AS OF 1 show version 1 of the table

df = (spark.read.option("versionAsOf", "1")
         .load("file:///data/spark-warehouse/exampledb.db/pcountries")
         .select("id")
         .distinct())
df.show()

+---+
| id|
+---+
|  3|
|  1|
|  2|
+---+



In [50]:
# Supported using one of qualifiers: _VERSION AS OF_ and _TIMESTAMP AS OF_
# Sql

# VERSION AS OF 1 show version 1 of the table
df = spark.sql("SELECT DISTINCT id FROM exampleDB.scountries VERSION AS OF 1")
df.show()


+---+
| id|
+---+
|  1|
+---+



In [51]:
# Supported using one of qualifiers: _VERSION AS OF_ and _TIMESTAMP AS OF_
# Python - Notice that the DeltaTable API doesn't support time travel read yet

# TIMESTAMP AS OF <date> shows the number of records before the given date

df = (spark.read.option("timestampAsOf", "2025-01-01")
         .load("file:///data/spark-warehouse/exampledb.db/pcountries")
         .select("id")
         .distinct())
df.show()

+---+
| id|
+---+
|  3|
|  1|
|  2|
+---+



In [59]:
# Supported using one of qualifiers: _VERSION AS OF_ and _TIMESTAMP AS OF_
# Sql

# TIMESTAMP AS OF <date> shows the number of records before the given date
df = spark.sql("SELECT DISTINCT id FROM exampleDB.scountries TIMESTAMP AS OF '2025-01-02 04:30'")
df.show()


+---+
| id|
+---+
+---+



## Update

In [63]:
# Update sql

spark.sql("UPDATE exampleDB.scountries SET country = 'U.K.' WHERE id = 1;")

DataFrame[num_affected_rows: bigint]

In [64]:
# update python

delta_table.update(
    condition = "id = 1",
    set = { "country": "'U.K.'"})

## Delete

In [65]:
# Delete sql

spark.sql("DELETE FROM exampleDB.scountries WHERE id = 1;")

DataFrame[num_affected_rows: bigint]

In [68]:
# Delete python

from pyspark.sql.functions import col

delta_table.delete("id = 1") # Using SQL Expression
delta_table.delete(col("id") == 2) # Using PySpark 

## Overwriting Data in a Delta Lake Table

In [69]:
# Using overwrite mode
# python

spark.createDataFrame([(1, 'India', 'New Delhi'),(4, 'Australia', 'Canberra'), (3, 'U.S.', 'Washington, D.C.')], schema=["id", "country", "capital"]) \
.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("exampleDB.pcountries")

In [71]:
# Using INSERT OVERWRITE
# sql

spark.sql("INSERT OVERWRITE exampleDB.scountries VALUES (3, 'U.S.', 'Washington, D.C.'), (1, 'India', 'New Delhi'), (4, 'Australia', 'Canberra')")

DataFrame[]

## Merge

In [74]:
# Merge
# python
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "exampleDB.pcountries")

idf = (
    spark
    .createDataFrame([(1, 'India', 'New Delhi'),(4, 'Australia', 'Canberra'), (3, 'U.S.', 'Washington, D.C.')], schema=["id", "country", "capital"])
)

delta_table.alias("target").merge(
    source = idf.alias("source"),
    condition = "source.id = target.id"
).whenMatchedUpdate(set = {"country": "source.country", "capital": "source.capital"}
                   ).whenNotMatchedInsert(values = {"id": "source.id", "country": "source.country", "capital": "source.capital"}
                                         ).execute()

## Delta Lake Metadata and History

In [75]:
delta_table.history()

DataFrame[version: bigint, timestamp: timestamp, userId: string, userName: string, operation: string, operationParameters: map<string,string>, job: struct<jobId:string,jobName:string,jobRunId:string,runId:string,jobOwnerId:string,triggerType:string>, notebook: struct<notebookId:string>, clusterId: string, readVersion: bigint, isolationLevel: string, isBlindAppend: boolean, operationMetrics: map<string,string>, userMetadata: string, engineInfo: string]

In [76]:
delta_table.detail()

DataFrame[format: string, id: string, name: string, description: string, location: string, createdAt: timestamp, lastModified: timestamp, partitionColumns: array<string>, clusteringColumns: array<string>, numFiles: bigint, sizeInBytes: bigint, properties: map<string,string>, minReaderVersion: int, minWriterVersion: int, tableFeatures: array<string>]

In [77]:
delta_table.toDF().show()

+---+---------+----------------+
| id|  country|         capital|
+---+---------+----------------+
|  1|    India|       New Delhi|
|  3|     U.S.|Washington, D.C.|
|  4|Australia|        Canberra|
+---+---------+----------------+

