# Versioned Data Lakehouse with Nessie, Iceberg, and Spark
This notebook demonstrates how to use Project Nessie as a transactional catalog for Apache Iceberg tables in a data lakehouse. Key features include:

* Versioning: Track changes to your data over time.

* Branching and Merging: Create branches for experimental changes and merge them back into the main branch.

* Tags: Create immutable snapshots of your data for reproducibility and auditing.

# Project Overview
#### We will build an ETL pipeline for IMDb movie data using Nessie's branching and versioning capabilities.

* Raw Data Ingestion: Load raw IMDb data into a raw branch.

* Data Transformation: Clean and transform the data in a dev branch.

* Data Validation: Perform quality checks before promoting data to production.

* Promotion to Main: Merge the validated data into the main branch.

* Versioning and Time Travel: Use tags and commit hashes to track changes and time trave

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Stop existing Spark session if running
if 'spark' in globals():
    spark.stop()

# Initialize Spark with Iceberg and Nessie integrations
spark = SparkSession.builder \
    .appName("NessieIMDbDemo") \
    .config("spark.ui.port", "4041") \
    .getOrCreate()

print("Spark session started with Nessie and Iceberg.")

Spark session started with Nessie and Iceberg.


In [2]:
# Create the namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS imdb")

# Create a raw branch
spark.sql("CREATE BRANCH IF NOT EXISTS raw FROM main")
spark.sql("USE REFERENCE raw")
print("Created and switched to 'raw' branch for raw data ingestion.")

# List references to verify the branch creation
print("List of references:")
spark.sql("LIST REFERENCES").toPandas()

Created and switched to 'raw' branch for raw data ingestion.
List of references:


Unnamed: 0,refType,name,hash
0,Branch,main,3f32c18f6266db12186ddb3a86d994ac7a615709b752c0...
1,Branch,raw,3f32c18f6266db12186ddb3a86d994ac7a615709b752c0...


In [3]:
raw_df = spark.read.option("header", "true").csv("/home/iceberg/data/imdb-movies.csv")
print(" First 5 rows of raw IMDb data:")
raw_df.show(5)
print(" Schema of raw IMDb data:")
raw_df.printSchema()

26/02/13 14:10:19 WARN S3FileIO: Unclosed S3FileIO instance created by:
	org.apache.iceberg.aws.s3.S3FileIO.initialize(S3FileIO.java:359)
	org.apache.iceberg.CatalogUtil.loadFileIO(CatalogUtil.java:350)
	org.apache.iceberg.nessie.NessieCatalog.initialize(NessieCatalog.java:132)
	org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:255)
	org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:309)
	org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:154)
	org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:751)
	org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
	org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
	scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
	org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122)
	

 First 5 rows of raw IMDb data:
+-------------+--------------------+--------------------+----+--------------+--------------------+--------+--------------------+----------+------------+----------+------------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------+------+--------+----------------+---------------------+------------------+
|imdb_title_id|               title|      original_title|year|date_published|               genre|duration|             country|language_1|  language_2|language_3|          director|              writer|              actors|       actors_1|           actors_f2|         description|              desc35|avg_vote| votes|  budget|usa_gross_income|worlwide_gross_income|reviews_from_users|
+-------------+--------------------+--------------------+----+--------------+--------------------+--------+--------------------+----------+------------+----------+------------------+------------

In [4]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS nessie.imdb.movies (
        imdb_title_id STRING,
        title STRING,
        year INT,
        genre STRING,
        director STRING,
        avg_vote DOUBLE,
        votes INT
    )
    USING iceberg
""")

print(" Iceberg table 'movies' created in the 'raw' branch.")

 Iceberg table 'movies' created in the 'raw' branch.


In [5]:
raw_df.select(
    col("imdb_title_id"),
    col("title"),
    col("year").cast("int"),
    col("genre"),
    col("director"),
    col("avg_vote").cast("double"),
    col("votes").cast("int")
).writeTo("nessie.imdb.movies").append()
print("Raw data ingested into 'movies' in the 'raw' branch.")

                                                                                

Raw data ingested into 'movies' in the 'raw' branch.


In [6]:
print("Tables in the 'raw' branch:")
spark.sql("SHOW TABLES IN nessie.imdb").show(truncate=False)

print("Raw data in the 'raw' branch:")
spark.sql("SELECT * FROM nessie.imdb.movies").show(5)

Tables in the 'raw' branch:
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|imdb     |movies   |false      |
+---------+---------+-----------+

Raw data in the 'raw' branch:
+-------------+--------------------+----+--------------------+------------------+--------+------+
|imdb_title_id|               title|year|               genre|          director|avg_vote| votes|
+-------------+--------------------+----+--------------------+------------------+--------+------+
|    tt0035423|      Kate & Leopold|2001|Comedy, Fantasy, ...|     James Mangold|     6.4| 77852|
|    tt0118589|             Glitter|2001|Drama, Music, Rom...|Vondie Curtis-Hall|     2.2| 21298|
|    tt0118694|In the Mood for Love|2000|      Drama, Romance|      Kar-Wai Wong|     8.1|119171|
|    tt0120202|  Hollywood, Vermont|2000|       Comedy, Drama|       David Mamet|     6.7| 20220|
|    tt0120263|Canzoni del secon...|2000|       Comedy, Drama|     Roy Andersson|    

Describe the Iceberg table to view its properties and configurations.

In [7]:
print("Table description for 'movies':")
spark.sql("DESCRIBE TABLE EXTENDED nessie.imdb.movies").show(truncate=False)

Table description for 'movies':
+----------------------------+---------------------------------------------------------------+-------+
|col_name                    |data_type                                                      |comment|
+----------------------------+---------------------------------------------------------------+-------+
|imdb_title_id               |string                                                         |NULL   |
|title                       |string                                                         |NULL   |
|year                        |int                                                            |NULL   |
|genre                       |string                                                         |NULL   |
|director                    |string                                                         |NULL   |
|avg_vote                    |double                                                         |NULL   |
|votes                       |int        

Create a dev branch from the raw branch to perform transformations.

In [8]:
# Create a dev branch from raw
spark.sql("CREATE BRANCH dev FROM raw")
spark.sql("USE REFERENCE dev")
print("Created and switched to 'dev' branch for transformations.")

print("List of references:")
spark.sql("LIST REFERENCES").toPandas()

print(" Tables in the 'dev' branch:")
spark.sql("SHOW TABLES IN nessie.imdb").toPandas()

Created and switched to 'dev' branch for transformations.
List of references:
 Tables in the 'dev' branch:


Unnamed: 0,namespace,tableName,isTemporary
0,imdb,movies,False


Before performing transformations, record the commit hash to enable time travel.

In [9]:
commit_hash_before_cleaning=spark.sql("SHOW REFERENCE").filter("name='dev'").select("hash").collect()[0][0]
print(f"Commit hash before cleaning: {commit_hash_before_cleaning}")

Commit hash before cleaning: e22d2c0d115897834d7640e6d1872f71f69f9574d096b072b33d248a919313d5


In [14]:
# Query records that will be cleaned (null directors or avg_vote <= 5)
spark.sql("""
    SELECT * FROM nessie.imdb.movies
    WHERE director IS NULL OR avg_vote <= 5
    LIMIT 5
""").show()

+-------------+-----+----+-----+--------+--------+-----+
|imdb_title_id|title|year|genre|director|avg_vote|votes|
+-------------+-----+----+-----+--------+--------+-----+
+-------------+-----+----+-----+--------+--------+-----+



Perform transformations and cleaning directly in the movies table in the dev branch.

In [15]:
# Transform and clean data
spark.sql("""
    UPDATE nessie.imdb.movies
    SET director = 'Unknown'
    WHERE director IS NULL
""")

spark.sql("""
    DELETE FROM nessie.imdb.movies
    WHERE avg_vote <= 5
""")
print("Data transformed and cleaned directly in 'movies' in the 'dev' branch.")

Data transformed and cleaned directly in 'movies' in the 'dev' branch.


Nessie and Iceberg support time travel, allowing you to query data as it existed at a specific point in time. Let’s use the commit hash recorded earlier to query the data before transformations.

In [16]:
# Current state of dev
print("Data in 'dev' branch after transformations:")
spark.sql("""
    SELECT * FROM nessie.imdb.movies
    WHERE director IS NULL OR avg_vote <= 5
    LIMIT 5
""").show()
# Time Travel Query to confirm cleaned data is retrievable
print("Data before transformations (using commit hash):")
spark.sql(f"""
    -- Note: The `dev` branch is the current reference, so we don't need to specify it explicitly.
    -- However, you can explicitly reference a branch or commit hash like this: `table@branch#commithash`
    SELECT * FROM imdb.`movies@dev#{commit_hash_before_cleaning}`
    WHERE director IS NULL OR avg_vote <= 5
    LIMIT 5
""").show()

Data in 'dev' branch after transformations:
+-------------+-----+----+-----+--------+--------+-----+
|imdb_title_id|title|year|genre|director|avg_vote|votes|
+-------------+-----+----+-----+--------+--------+-----+
+-------------+-----+----+-----+--------+--------+-----+

Data before transformations (using commit hash):
+-------------+--------------------+----+--------------------+------------------+--------+-----+
|imdb_title_id|               title|year|               genre|          director|avg_vote|votes|
+-------------+--------------------+----+--------------------+------------------+--------+-----+
|    tt0118589|             Glitter|2001|Drama, Music, Rom...|Vondie Curtis-Hall|     2.2|21298|
|    tt0131704|Le avventure di R...|2000|Animation, Advent...|       Des McAnuff|     4.2|19101|
|    tt0132245|              Driven|2001|Action, Drama, Sport|      Renny Harlin|     4.6|38891|
|    tt0132910|Il corvo 3 - Salv...|2000|Action, Crime, Fa...|    Bharat Nalluri|     4.9|10705|

Perform data quality checks before promoting the data to main.

In [19]:
# Validate data: Check for data quality issues
validation_df = spark.sql("""
    SELECT COUNT(*) AS total_movies,
           SUM(CASE WHEN director = 'Unknown' THEN 1 ELSE 0 END) AS unknown_director,
           SUM(CASE WHEN avg_vote <= 5 THEN 1 ELSE 0 END) AS low_rated_movies
    FROM nessie.imdb.movies
""")

validation_df.show()

# Check for validation errors
if validation_df.filter(col("unknown_director") > 0).count() > 0:
    print("Validation failed: Some movies have unknown directors.")
elif validation_df.filter(col("low_rated_movies") > 0).count() > 0:
    print("Validation failed: Some movies have low ratings.")
else:
    print("Validation passed: Data is clean and ready for promotion.")
    

+------------+----------------+----------------+
|total_movies|unknown_director|low_rated_movies|
+------------+----------------+----------------+
|        5119|               0|               0|
+------------+----------------+----------------+

Validation passed: Data is clean and ready for promotion.


In [22]:
# Merge the dev branch into main
spark.sql("MERGE BRANCH dev INTO main")
print("'dev' branch merged into 'main'. Transformed data is now in production.")
print(" Data in the 'main' branch after merge:")
spark.sql("SELECT * FROM nessie.imdb.movies").show(10)

'dev' branch merged into 'main'. Transformed data is now in production.
 Data in the 'main' branch after merge:
+-------------+--------------------+----+--------------------+--------------------+--------+-------+
|imdb_title_id|               title|year|               genre|            director|avg_vote|  votes|
+-------------+--------------------+----+--------------------+--------------------+--------+-------+
|    tt0035423|      Kate & Leopold|2001|Comedy, Fantasy, ...|       James Mangold|     6.4|  77852|
|    tt0118694|In the Mood for Love|2000|      Drama, Romance|        Kar-Wai Wong|     8.1| 119171|
|    tt0120202|  Hollywood, Vermont|2000|       Comedy, Drama|         David Mamet|     6.7|  20220|
|    tt0120263|Canzoni del secon...|2000|       Comedy, Drama|       Roy Andersson|     7.6|  17188|
|    tt0120630|     Galline in fuga|2000|Animation, Advent...|Peter Lord, Nick ...|     7.0| 173987|
|    tt0120667|      I Fantastici 4|2005|Action, Adventure...|           Tim Sto

Create a tag to mark this version of the data as stable for reporting or auditing.

In [23]:
# Create a tag for reporting
spark.sql(""" 
    -- Create a tag named 'report_202501' at the current state of 'main'.
    -- This tag can be used to query the data as it exists at this point in time.
    -- Example: SELECT * FROM imdb.`movies@report_202501`
    CREATE TAG IF NOT EXISTS report_202501
    """)
print("Tag 'report_202501' created at the current state of 'main'.")
# List references to verify the tag creation
print(" List of references:")
spark.sql("LIST REFERENCES").toPandas()

Tag 'report_202501' created at the current state of 'main'.
 List of references:


Unnamed: 0,refType,name,hash
0,Branch,dev,096979b58f36a5bda718afd68d1e2d30bebaf743f6d8cf...
1,Branch,main,f278c812916df79c8e43518f340b23d61aae73204ac61f...
2,Branch,raw,e22d2c0d115897834d7640e6d1872f71f69f9574d096b0...
3,Tag,report_202501,096979b58f36a5bda718afd68d1e2d30bebaf743f6d8cf...


In [24]:
# Clean up: Delete the raw and dev branches
spark.sql("DROP BRANCH IF EXISTS raw")
spark.sql("DROP BRANCH IF EXISTS dev")
print(" 'raw' and 'dev' branches deleted.")

# Clean up: Delete the tag
spark.sql("DROP TAG IF EXISTS report_202501")
print(" Tag 'report_202501' deleted.")

# Cleanup Spark
spark.stop()
print(" Spark session stopped and resources cleaned up.")

 'raw' and 'dev' branches deleted.
 Tag 'report_202501' deleted.
 Spark session stopped and resources cleaned up.
