In [None]:
!pip3 install pyspark deltalake polars

# Spark Setup

In [None]:
import pyspark
import pyspark.sql.functions as F
import polars as pl
from pyspark.sql.types import IntegerType, DoubleType
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", False) \
    .config("spark.databricks.delta.vacuum.parallelDelete.enabled", True)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Load data and create table

In [None]:
# Read CSV
df = spark.read.option("header", True).csv("apple.csv")
df

In [None]:
# Cast columns
df = df.withColumn("date", F.to_timestamp(F.col("date"), "MM/dd/yyyy")) \
    .withColumn("volume", df["volume"].cast(IntegerType()))
for c in ["close_last", "open", "high", "low"]:
    df = df.withColumn(c, F.regexp_replace(F.col(c), "\$", "").cast(DoubleType()))
df

In [None]:
# Create table
table_name = "my_table"
table_path = f"/Users/theo/Documents/perso/deltalake-talk/{table_name}"
DeltaTable.create(spark) \
    .tableName(table_name) \
    .location(table_path) \
    .addColumns(df.schema) \
    .addColumn("year", "INT", generatedAlwaysAs="YEAR(date)") \
    .addColumn("month", "INT", generatedAlwaysAs="MONTH(date)") \
    .partitionedBy("year", "month") \
    .execute()

# Write data to table
df.write.format("delta").mode("append").saveAsTable(table_name)

# Display table
df2 = spark.table(table_name)
df2

# Update schema and merge

In [None]:
# Add column 'ticker' for rows with 'year == 2023 AND month == 1'
filter = "year == 2023 AND month == 1"
df3 = df2.where(filter)
df3 = df3.withColumn("ticker", F.lit("AAPL"))

# Write data to table
df3.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .option("partitionOverwriteMode", "dynamic") \
    .saveAsTable(table_name)

# Display table
spark.table(table_name)

In [None]:
df3 = spark.table(table_name).where(filter)
df3

In [None]:
# Merge
df3 = df3.withColumn("volume", F.lit(0))
DeltaTable.forPath(spark, table_path).alias("df").merge(
    df3.alias("df3"),
    "df.date = df3.date") \
  .whenNotMatchedInsertAll() \
  .whenMatchedUpdateAll() \
  .execute()

# Display table
spark.table(table_name).where(filter)

# Table history and time travel

In [None]:
DeltaTable.forPath(spark, table_path).history()

In [None]:
DeltaTable.forPath(spark, table_path).history().select("version", "timestamp", "operation").show(truncate=False)

In [None]:
spark.read.format("delta").option("versionAsOf", 1).load(table_path).where(filter)
# DeltaTable.forPath(spark, table_path).restoreToVersion(3)

# Vacuum

In [None]:
DeltaTable.forPath(spark, table_path).vacuum(0)

# File skipping

In [None]:
spark.table(table_name).where(filter).explain("formatted")

In [None]:
spark.table(table_name).where("date BETWEEN '2023-01-01' AND '2023-01-31'").explain("formatted")

# Read without Spark

In [None]:
pl.read_delta(  
    table_path,
    pyarrow_options={"partitions": [("year", "=", "2023"), ("month", "=", "1")]}, # Faster read by selecting specific partitions
)

In [None]:
pl.read_delta(  
    table_path,
    version=2
)