# Feature Store and Data Versioning with Delta Lake

This notebook demonstrates building a simple feature store using Delta Lake for data versioning. It covers ingesting raw CSV data, computing derived features, and using time travel queries.

In [None]:
from pyspark.sql import SparkSession
from delta import *

# Configure Spark session with Delta Lake
spark = SparkSession.builder \
    .appName("FeatureStore") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

print("Spark session with Delta Lake configured.")

In [None]:
# Ingest raw CSV data into Delta table
raw_data_path = "../data/raw_data.csv"
delta_raw_path = "../delta/raw_data"

# Read CSV
df = spark.read.csv(raw_data_path, header=True, inferSchema=True)
df.show()

# Write to Delta table
df.write.format("delta").mode("overwrite").save(delta_raw_path)
print(f"Raw data ingested into Delta table at {delta_raw_path}")

In [None]:
# Compute derived features
# Read from Delta table
raw_df = spark.read.format("delta").load(delta_raw_path)

# Compute aggregations: total sales per product, average sales per month
from pyspark.sql.functions import sum, avg, month, year

features_df = raw_df.groupBy("product").agg(
    sum("sales").alias("total_sales"),
    avg("sales").alias("avg_sales")
).withColumn("month", month("date")).withColumn("year", year("date"))

features_df.show()

In [None]:
# Write features to versioned Delta table
delta_features_path = "../delta/features"

features_df.write.format("delta").mode("overwrite").save(delta_features_path)
print(f"Features written to Delta table at {delta_features_path}")

# To demonstrate versioning, let's add more data and write again
# Simulate new data
new_data = spark.createDataFrame([
    ("2023-03-01", "ProductA", 140),
    ("2023-03-02", "ProductB", 280)
], ["date", "product", "sales"])

# Append to raw
new_data.write.format("delta").mode("append").save(delta_raw_path)

# Recompute features
raw_df_updated = spark.read.format("delta").load(delta_raw_path)
features_df_updated = raw_df_updated.groupBy("product").agg(
    sum("sales").alias("total_sales"),
    avg("sales").alias("avg_sales")
)

features_df_updated.write.format("delta").mode("overwrite").save(delta_features_path)
print("Features updated and versioned.")

In [None]:
# Demonstrate time travel queries
# Get history of the features table
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, delta_features_path)
history = delta_table.history()
history.show()

# Query version 0 (initial features)
version_0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_features_path)
print("Version 0:")
version_0.show()

# Query version 1 (updated features)
version_1 = spark.read.format("delta").option("versionAsOf", 1).load(delta_features_path)
print("Version 1:")
version_1.show()

# Query latest
latest = spark.read.format("delta").load(delta_features_path)
print("Latest:")
latest.show()