### 🎬 **Versioned Data Lakehouse with Apache Iceberg and Spark**

This notebook demonstrates how to use **Project Nessie** as a transactional catalog for **Apache Iceberg** tables in a data lakehouse. Key features include:

-   **Versioning**: Track changes to your data over time.

-   **Branching and Merging**: Create branches for experimental changes and merge them back into the main branch.

-   **Tags**: Create immutable snapshots of your data for reproducibility and auditing.

* * * * *

### 🎯 **Project Overview**

We will build an **ETL pipeline** for IMDb movie data using Nessie's branching and versioning capabilities.

1.  **Raw Data Ingestion**: Load raw IMDb data into a `raw` branch.

2.  **Data Transformation**: Clean and transform the data in a `dev` branch.

3.  **Data Validation**: Perform quality checks before promoting data to production.

4.  **Promotion to Main**: Merge the validated data into the `main` branch.

5.  **Versioning and Time Travel**: Use tags and commit hashes to track changes and time travel.

Configure Spark to use Nessie as the catalog and Iceberg as the table format.

In [None]:

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Stop existing Spark session if running
if 'spark' in globals():
    spark.stop()

# Initialize Spark with Iceberg and Nessie integrations
spark = SparkSession.builder \
    .appName("NessieIMDbDemo") \
    .getOrCreate()

print("✅ Spark session started with Nessie and Iceberg.")

Create a namespace for IMDb data and a raw branch for ingesting raw data.

In [None]:
# Create the namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS imdb")

# Create a raw branch
spark.sql("CREATE BRANCH IF NOT EXISTS raw FROM main")
spark.sql("USE REFERENCE raw")
print("✅ Created and switched to 'raw' branch for raw data ingestion.")

# List references to verify the branch creation
print("📋 List of references:")
spark.sql("LIST REFERENCES").toPandas()

Load the raw IMDb data into a Spark DataFrame.

In [None]:
raw_df = spark.read.option("header", "true").csv("/home/iceberg/data/imdb-movies.csv")

# Explore the raw dataset
print("📋 First 5 rows of raw IMDb data:")
raw_df.show(5)

print("📋 Schema of raw IMDb data:")
raw_df.printSchema()

Create an Iceberg table in the `raw` branch to store the raw data.

In [None]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS nessie.imdb.movies (
        imdb_title_id STRING,
        title STRING,
        year INT,
        genre STRING,
        director STRING,
        avg_vote DOUBLE,
        votes INT
    )
    USING iceberg
""")

print("✅ Iceberg table 'movies' created in the 'raw' branch.")

Insert the raw data into the `movies` table.

In [None]:
raw_df.select(
    col("imdb_title_id"),
    col("title"),
    col("year").cast("int"),
    col("genre"),
    col("director"),
    col("avg_vote").cast("double"),
    col("votes").cast("int")
).writeTo("nessie.imdb.movies").append()

print("✅ Raw data ingested into 'movies' in the 'raw' branch.")

Verify existince of the `movie` table and data has been ingested.

In [None]:
print("📋 Tables in the 'raw' branch:")
spark.sql("SHOW TABLES IN nessie.imdb").show(truncate=False)

print("📋 Raw data in the 'raw' branch:")
spark.sql("SELECT * FROM nessie.imdb.movies").show(5)

Describe the Iceberg table to view its properties and configurations.

In [None]:
print("📋 Table description for 'movies':")
spark.sql("DESCRIBE TABLE EXTENDED nessie.imdb.movies").show(truncate=False)

Create a `dev` branch from the `raw` branch to perform transformations.

In [None]:
# Create a dev branch from raw
spark.sql("CREATE BRANCH dev FROM raw")
spark.sql("USE REFERENCE dev")
print("✅ Created and switched to 'dev' branch for transformations.")

print("📋 List of references:")
spark.sql("LIST REFERENCES").toPandas()

print("📋 Tables in the 'dev' branch:")
spark.sql("SHOW TABLES IN nessie.imdb").toPandas()

Before performing transformations, record the commit hash to enable time travel.

In [None]:
commit_hash_before_cleaning = spark.sql("SHOW REFERENCE").filter("name = 'dev'").select("hash").collect()[0][0]
print(f"Commit hash before cleaning: {commit_hash_before_cleaning}")

In [None]:
# Query records that will be cleaned (null directors or avg_vote <= 5)
spark.sql("""
    SELECT * FROM nessie.imdb.movies
    WHERE director IS NULL OR avg_vote <= 5
    LIMIT 5
""").show()

Perform transformations and cleaning directly in the `movies` table in the `dev` branch.

In [None]:
# Transform and clean data
spark.sql("""
    UPDATE nessie.imdb.movies
    SET director = 'Unknown'
    WHERE director IS NULL
""")

spark.sql("""
    DELETE FROM nessie.imdb.movies
    WHERE avg_vote <= 5
""")

print("✅ Data transformed and cleaned directly in 'movies' in the 'dev' branch.")

Nessie and Iceberg support time travel, allowing you to query data as it existed at a specific point in time. Let’s use the commit hash recorded earlier to query the data before transformations.

In [None]:
# Current state of dev
print("Data in 'dev' branch after transformations:")
spark.sql("""
    SELECT * FROM nessie.imdb.movies
    WHERE director IS NULL OR avg_vote <= 5
    LIMIT 5
""").show()

# Time Travel Query to confirm cleaned data is retrievable
print("📋 Data before transformations (using commit hash):")
spark.sql(f"""
    -- Note: The `dev` branch is the current reference, so we don't need to specify it explicitly.
    -- However, you can explicitly reference a branch or commit hash like this: `table@branch#commithash`
    SELECT * FROM imdb.`movies@dev#{commit_hash_before_cleaning}`
    WHERE director IS NULL OR avg_vote <= 5
    LIMIT 5
""").show()

Perform data quality checks before promoting the data to `main`.

In [None]:
# Validate data: Check for data quality issues
validation_df = spark.sql("""
    SELECT COUNT(*) AS total_movies,
           SUM(CASE WHEN director = 'Unknown' THEN 1 ELSE 0 END) AS unknown_director,
           SUM(CASE WHEN avg_vote <= 5 THEN 1 ELSE 0 END) AS low_rated_movies
    FROM nessie.imdb.movies
""")

validation_df.show()

# Check for validation errors
if validation_df.filter(col("unknown_director") > 0).count() > 0:
    print("❌ Validation failed: Some movies have unknown directors.")
elif validation_df.filter(col("low_rated_movies") > 0).count() > 0:
    print("❌ Validation failed: Some movies have low ratings.")
else:
    print("✅ Validation passed: Data is clean and ready for promotion.")

Merge the dev branch into the `main` branch to promote the validated data.

In [None]:
# Merge the dev branch into main
spark.sql("MERGE BRANCH dev INTO main")
print("✅ 'dev' branch merged into 'main'. Transformed data is now in production.")

# Query data in the main branch to verify the merge
print("📋 Data in the 'main' branch after merge:")
spark.sql("SELECT * FROM nessie.imdb.movies").show(5)

Create a tag to mark this version of the data as stable for reporting or auditing.

In [None]:
# Create a tag for reporting
spark.sql("""
    -- Create a tag named 'report_202501' at the current state of 'main'.
    -- This tag can be used to query the data as it exists at this point in time.
    -- Example: SELECT * FROM imdb.`movies@report_202501`
    CREATE TAG IF NOT EXISTS report_202501
""")
print("✅ Tag 'report_202501' created at the current state of 'main'.")

# List references to verify the tag creation
print("📋 List of references:")
spark.sql("LIST REFERENCES").toPandas()

In [None]:
# Clean up: Delete the raw and dev branches
spark.sql("DROP BRANCH IF EXISTS raw")
spark.sql("DROP BRANCH IF EXISTS dev")
print("✅ 'raw' and 'dev' branches deleted.")

# Clean up: Delete the tag
spark.sql("DROP TAG IF EXISTS report_202501")
print("✅ Tag 'report_202501' deleted.")

# Cleanup Spark
spark.stop()
print("✅ Spark session stopped and resources cleaned up.")