In [None]:
print("Hello World from Jupyter!")

In [None]:
# Do not delete this file. It is used to set environment variables for PySpark in Jupyter Notebooks.
# Helper script to set PySpark environment variables for Jupyter Notebooks
import os

# Define a function to set PySpark environment variables
def set_pyspark_env_vars():
    """
    Sets the PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON environment variables.
    This is necessary to help Spark find the correct Python interpreter.
    You MUST replace the path below with the absolute path to your python.exe.
    """
    os.environ['PYSPARK_PYTHON'] = "path_to_your_python_executable"
    os.environ['PYSPARK_DRIVER_PYTHON'] = "path_to_your_python_executable"
    print("PySpark environment variables set.")

set_pyspark_env_vars()

In [None]:
#define a helper function to get the path of the project directory
import os
def get_notebook_path():
        try:
        # This works for scripts run from the command line
            SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
        except NameError:
            # This is the dynamic way for Jupyter Notebooks
            SCRIPT_DIR = os.path.abspath(os.getcwd())
        return SCRIPT_DIR
    
# Get the base path to your Hudi table 
SCRIPT_DIR = get_notebook_path()
HUDI_BASE_PATH = os.path.join(SCRIPT_DIR, "hudi_copy_on_write_table_data")
print(f"Notebook path: {SCRIPT_DIR}")
print(f"Hudi base path: {HUDI_BASE_PATH}")  
    

In [None]:
# Helpers to create a Spark session with Hudi jars
# This code assumes you have Apache Spark installed and configured on your system.
# It also assumes you have Java installed, as Spark requires Java to run.
# Make sure to set the SPARK_HOME environment variable to point to your Spark installation directory.
# For example, on Windows, you might set it like this:
# os.environ['SPARK_HOME'] = "C:\\path\\to\\your\\spark"
# You also need to add the Spark bin directory to your PATH environment variable.
# For example:
# os.environ['PATH'] += os.pathsep + os.path.join(os.environ['SPARK_HOME'], 'bin')
# You need to have the Hudi jar files downloaded and placed in a known directory.
# You can download the Hudi jars from the Apache Hudi releases page:
# https://hudi.apache.org/releases.html
# Make sure to download the correct version that matches your Spark and Scala versions.
# For example, for Spark 3.4 and Scala 2.12, you would download the hudi-spark3.4-bundle_2.12-0.14.1.jar
# Place the jar files in a directory, e.g., "jars" in the same directory as this notebook.
# Adjust the path in the code below to point to your jar files.
# Define a function to create a Spark session with Hudi jars

def create_spark_session():
    print("Creating Spark session with Hudi jars...")
    # Import necessary libraries
    
    import os
    import findspark

    # Findspark helps to find the Spark installation on your system
    # You might need to add this line if it's not already in your path
    findspark.init(spark_home=os.environ.get("SPARK_HOME"))
    from pyspark.sql import SparkSession

    # Path to the directory containing Hudi jar files
    jars_path = os.path.join(SCRIPT_DIR, "jars")
    print(f"Resolved jars path: {jars_path}")

    hudi_jars = [
            os.path.join(jars_path, 'hudi-spark3.4-bundle_2.12-0.14.1.jar')
        ]
    print(f"Hudi jars will be loaded from: {hudi_jars}")
    print(f"Jars path resolved to: {jars_path}")
    
    # Configure Spark session with Hudi configurations and jars
    # Adjust Spark configurations as necessary
    spark = SparkSession.builder \
        .appName("HudiExplore-TripApp-CopyonWrite") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
        .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
        .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") \
        .config("spark.jars", ",".join(hudi_jars)) \
        .getOrCreate()
    print("Spark session with Hudi jars has been created.")
    return spark
create_spark_session()


In [None]:
# This code verifies that the Spark session and Hudi dependencies are working
import os, shutil, sys
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Ensure PySpark environment variables are set
set_pyspark_env_vars()

# This function verifies that the Spark session and Hudi dependencies are working
def verify_hudi_setup():
    """
    Verifies the Hudi setup by writing to and reading from a Hudi table.
    This function assumes a SparkSession has already been created with Hudi jars.
    """
    try:
        # Use the existing SparkSession from the notebook
        spark = SparkSession.getActiveSession()
        if spark is None:
            print("No active SparkSession found. Please run your setup cell first.")
            create_spark_session()
            spark = SparkSession.getActiveSession()
            if spark is None:
                raise Exception("Failed to create SparkSession.")
        else:
            print("Calling SparkSession.getActiveSession() succeeded.")
        print("Verification: SparkSession is active.")
                   
        # Define a temporary path for the test Hudi table.
        # os.path.abspath() converts the relative path to an absolute path,
        # which is required by Hudi's Java-based file system layer on Windows.
        temp_hudi_path = os.path.abspath("./temp_hudi_test")
        
        # Clean up any previous test data
        if os.path.exists(temp_hudi_path):
            print(f"Cleaning up previous test data at {temp_hudi_path}")
            shutil.rmtree(temp_hudi_path)

        # Create a simple DataFrame to write to the Hudi table
        print("Verification: Creating a simple DataFrame...")
        test_data = [
            ("1", "item_A", "2023-01-01", "1672531200"),
            ("2", "item_B", "2023-01-01", "1672531200")
        ]
        test_columns = ["uuid", "item_name", "partitionpath", "ts"]
        test_df = spark.createDataFrame(test_data, test_columns)
        
        # Hudi options for the write operation
        hudi_options = {
            'hoodie.table.name': 'temp_test_table',
            'hoodie.datasource.write.precombine.field': 'ts',
            'hoodie.datasource.write.recordkey': 'uuid',
            'hoodie.datasource.write.partitionpath.field': 'partitionpath'
        }

        # Write the DataFrame to the Hudi table
        print("Verification: Writing data to Hudi table...")
        test_df.write.format("hudi") \
            .options(**hudi_options) \
            .mode("overwrite") \
            .save(temp_hudi_path)
            
        print("Verification: Data successfully written to Hudi.")

        # Read the data back from the Hudi table to verify
        print("Verification: Reading data from Hudi table...")
        hudi_read_df = spark.read.format("hudi").load(temp_hudi_path)
        
        print("Read data:")
        hudi_read_df.show()
        
        # Clean up the test data
        print(f"Cleaning up temporary data at {temp_hudi_path}")
        shutil.rmtree(temp_hudi_path)

        print("Verification Complete: Hudi setup is working correctly!")
        
    except Exception as e:
        print(f"Verification Failed. An error occurred: {e}")
        # You can add more specific error handling here if needed
        
# Call the verification function
verify_hudi_setup()


In [None]:
# Define a function to create Spark dataframe from sample data

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import to_date

def create_dataframe(spark, data):
    # Schema for ride-sharing data
    schema = StructType([
        StructField("trip_id", StringType(), False),
        StructField("ts", TimestampType(), True),
        StructField("rider_id", StringType(), True),
        StructField("driver_id", StringType(), True),
        StructField("fare", DoubleType(), True),
        StructField("city", StringType(), True),
        StructField("trip_date", StringType(), True)  # For partitioning
    ])
    
    df = spark.createDataFrame(data, schema)
    # Convert trip_date string to actual date type
    # For partitioning, Hudi expects the partition field to be of date type if it's a date
    df = df.withColumn("trip_date", to_date(df["trip_date"], "yyyy-MM-dd")) 
    print("Sample DataFrame created:")
    df.show(5)
    return df

# Call the function to create Spark dataframe

create_spark_session()
spark = SparkSession.getActiveSession()

if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    create_spark_session()
    spark = SparkSession.getActiveSession()
    if spark is None:
        raise Exception("Failed to create SparkSession.")

#create_dataframe(spark, create_sample_data(100))

In [None]:
# Define a function to create sample data for this example
# Adjust the schema and data as necessary for your use case

import time, datetime
from datetime import datetime

def create_sample_data(number_of_records):     # Schema for ride-sharing data

    # Generate large dataset (simulate 100 trips)
    # The ts field should be a proper datetime object for TimestampType
        
    data = [(f"trip_{i}", datetime.fromtimestamp(time.time() + i), f"rider_{i% 1000}", f"driver_{i%500}", 20.0 + (i%50), 
            ["nyc", "sf", "la"][i%3], f"2025-09-{(i%30)+1:02}") for i in range(number_of_records)]
    # print data --- IGNORE ---
    print(data)
    return data

#create_sample_data(100)

In [None]:
# Clean up any previous Hudi table data
def cleanup_hudi_table():
    try:
        SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
        HUDI_TABLE_NAME = "hudi_trips_table"
        HUDI_BASE_PATH = os.path.join(SCRIPT_DIR, "hudi_table_data")
        if os.path.exists(HUDI_BASE_PATH):
            print(f"Cleaning up existing data at {HUDI_BASE_PATH}")
            shutil.rmtree(HUDI_BASE_PATH)
        else:
            print(f"Directory {HUDI_BASE_PATH} does not exist, no cleanup needed.")
    except NameError:
        SCRIPT_DIR = os.path.abspath(os.getcwd())

#cleanup_hudi_table()



In [None]:
# Define a function to get Hudi parameters
# This function assumes a SparkSession has already been created with Hudi jars.


def get_hudi_param():
    print("Initializing Hudi parameters...")
   
    try:
        SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
    except NameError:
        # Fallback to the current working directory if __file__ is not defined
        SCRIPT_DIR = os.path.abspath(os.getcwd())

    # Define the Hudi table name and base path
    HUDI_TABLE_NAME = "hudi_trips_table_cow"
    HUDI_BASE_PATH = os.path.join(SCRIPT_DIR, "hudi_table_data_cow")

    # Define Hudi parameters

    hudi_options = {
        'hoodie.table.name': HUDI_TABLE_NAME,
        'hoodie.datasource.write.recordkey.field': "trip_id",
        'hoodie.datasource.write.partitionpath.field': "city",
        'hoodie.datasource.write.precombine.field': "ts",
        'hoodie.datasource.write.operation': 'upsert',
        'hoodie.upsert.shuffle.parallelism': 2,
        'hoodie.insert.shuffle.parallelism': 2
    }

    print(f"Hudi parameters initialized: {hudi_options}")
    return hudi_options, HUDI_BASE_PATH

# Call this function
get_hudi_param()

In [None]:
# Define a function to perform the initial bulk insert to Hudi table with Copy-on-Write storage
# Note: The table type can be either COPY_ON_WRITE 
import sys
from pyspark.sql.functions import monotonically_increasing_id

def bulk_insert_hudi(spark, hudi_options_base, HUDI_BASE_PATH):
    print("Performing bulk insert to Hudi table...")
    # Create a DataFrame with new data for bulk insert
    bulk_data = create_sample_data(50)  # Create 50 new sample records
    bulk_df = create_dataframe(spark, bulk_data)
    # Map the generated columns to the Hudi schema
    bulk_df = bulk_df.withColumn("uuid", monotonically_increasing_id().cast(StringType()))
    bulk_df = bulk_df.withColumnRenamed("rider_id", "rider")

    # Set the table type to Copy-on-Write for the bulk insert
    hudi_options_bulk = hudi_options_base.copy()
    hudi_options_bulk['hoodie.datasource.write.table.type'] = 'COPY_ON_WRITE'
    hudi_options_bulk['hoodie.datasource.write.operation'] = 'bulk_insert'
    
    print("Writing bulk inserted data to Hudi table...")
    bulk_df.write.format("hudi") \
        .options(**hudi_options_bulk) \
        .mode("append") \
        .save(HUDI_BASE_PATH)
    print("Hudi table bulk inserted successfully.")
    hudi_read_df = spark.read.format("hudi").load(HUDI_BASE_PATH)  
    hudi_read_df.show()
    return hudi_read_df


# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()

# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()

# Call the bulk insert function
# Perform the bulk insert
bulk_insert_hudi(spark, hudi_options_base, hudi_base_path)


The Hudi table on disk carries the schema from its first creation. When you perform an upsert with new data, Hudi performs the following steps:

Reads the Existing Schema: It checks the schema of the Hudi table already saved on disk.

Aligns the New Data: It compares the schema of your new upsert_df with the existing table schema.

Applies Schema Evolution: If the data does not contain a specific column, Hudi correctly recognizes this and automatically adds the  column to the incoming data with null values to match the existing table schema.

This Schema Evolution behavior is by design. It allows to add or drop columns over time without having to rebuild the entire table, making schema changes much more flexible.

In [None]:

# Define a function to do incremental updates to Hudi table with Copy-on-Write storage in Upsert mode
# Note The available options for hoodie.datasource.write.operation are:
    #upsert: Inserts new records and updates existing ones.
    #insert: Only inserts new records.
    #bulk_insert: A more efficient way to insert a large volume of new data.
    #delete: Deletes records based on the record key.
# "append" is a Spark DataFrameWriter mode, which simply tells Spark to add new data without overwriting the table.
# When performing an incremental update that includes both new and updated records, the correct Hudi operation to use is upsert.

import time, datetime
from datetime import datetime

def upsert_hudi(spark, hudi_options_base, HUDI_BASE_PATH):
    print("Performing upsert update to Hudi table...")
    # Create a DataFrame with a new data and one updated data
    upsert_data = [
        ("trip_2", datetime.fromtimestamp(time.time() + 101), "rider_2_updated", "driver_2", 100.0, "la", "2025-09-03"),
        ("trip_101", datetime.fromtimestamp(time.time() + 102), "rider_101", "driver_101", 200.0, "nyc", "2025-09-01")
    ]
    upsert_schema = StructType([
        StructField("trip_id", StringType(), False),
        StructField("ts", TimestampType(), True),
        StructField("rider", StringType(), True),
        StructField("driver_id", StringType(), True),
        StructField("fare", DoubleType(), True),
        StructField("city", StringType(), True),
        StructField("trip_date", StringType(), True)
    ])
    upsert_df = spark.createDataFrame(upsert_data, upsert_schema)
    upsert_df = upsert_df.withColumn("uuid", lit(None).cast(StringType())) # We'll let Hudi determine the uuid for new records
    
    # Set the table type to Copy-on-Write for the upsert
    hudi_options_upsert = hudi_options_base.copy()
    hudi_options_upsert['hoodie.datasource.write.table.type'] = 'COPY_ON_WRITE'
    hudi_options_upsert['hoodie.datasource.write.operation'] = 'upsert'
    print("Writing upserted data to Hudi table...")
    upsert_df.write.format("hudi") \
        .options(**hudi_options_upsert) \
        .mode("append") \
        .save(HUDI_BASE_PATH)
    print("Hudi table upserted successfully.")
    hudi_read_df = spark.read.format("hudi").load(HUDI_BASE_PATH)
    hudi_read_df.show()
    return hudi_read_df

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()
# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()


# Call the upsert update function
upsert_hudi(spark, hudi_options_base, hudi_base_path)


Now, that the initial bulk load is done using Copy On Write storage, lets get to the heart of the Apache Hudi. 
The distinction between snapshot and read-optimized queries is crucial, especially when deciding how to access the upsert data.

Snapshot Query
A snapshot query provides the most up-to-date view of a Hudi table. It reads the latest committed state of the data, ensuring you get a complete and accurate "snapshot" of the table at a specific point in time.

How it works (for a Copy on Write table): With a Copy on Write (CoW) table, every update or insert operation creates a new version of the Parquet base file. A snapshot query simply reads all the latest versions of these Parquet files. There are no incremental files to merge, so the query is straightforward and efficient.

When to choose it: This is the default and recommended choice for most use cases with CoW tables. It's the simplest way to read the current state of your data. The code spark.read.format("hudi").load(HUDI_BASE_PATH) performs a snapshot query by default.

Read-Optimized Query
A read-optimized query is designed to be as fast as possible by reading only the Parquet base files, completely ignoring any recent incremental data.

How it works (for a Copy on Write table): Since a CoW table already stores all its data in Parquet base files, a read-optimized query behaves exactly the same as a snapshot query. It reads the same set of files and provides the identical result. You will not see a performance difference between the two for a CoW table.

When to choose it: This query type is primarily useful for Merge on Read (MoR) tables, not Copy on Write. With a MoR table, a read-optimized query would be faster than a snapshot query because it skips the step of merging the Parquet base files with the Avro/ORC delta logs.  However, this speed comes at the cost of having potentially stale data, as it won't include recent updates that are only in the delta logs.




In [None]:
# This function will do a snapshot query on the Hudi table to show the latest state of the data
# Every commit in a CoW table implicitly compacts the data into the columnar base file format. 
# This ensures that the data being read is always in its most optimized form for analytical queries, 
# without requiring any on-the-fly merging or processing. 
# This makes snapshot queries on CoW tables very efficient, delivering high performance for analytical workloads where data changes are less frequent.
def snapshot_query_hudi(spark, HUDI_BASE_PATH):
    print("Performing snapshot query on Hudi table...")
    trip_snapshot_df = spark.read.format("hudi").load(HUDI_BASE_PATH)
    print("Snapshot DataFrame loaded from Hudi table:")
    trip_snapshot_df.show(5)
    print("Creating temporary view for SQL queries...")
    trip_snapshot_df.createOrReplaceTempView("hudi_trips_snapshot")
    print("Snapshot query result:")
    spark.sql("SELECT * FROM hudi_trips_snapshot ORDER BY ts DESC").show(10)
    spark.sql("SELECT city, COUNT(*) as trip_count FROM hudi_trips_snapshot GROUP BY city ORDER BY trip_count DESC").show()
    # Saving this to a df for further use if needed
    hudi_snapshot_df = spark.sql("SELECT * FROM hudi_trips_snapshot ORDER BY ts DESC")
    hudi_snapshot_df.show(5)
    return trip_snapshot_df

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()

# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()

# Call the snapshot query function
snapshot_query_hudi(spark, hudi_base_path)

In [None]:
# This funtion will do a read-optimized query on the Hudi table to show the latest state of the data.
# The output is exactly the same as the default spark.read.format("hudi").load(HUDI_BASE_PATH)
def read_optimized_query_hudi(spark, HUDI_BASE_PATH):
    print("Performing read-optimized query on Hudi table...")
    trip_ro_df = spark.read.format("hudi").option("hoodie.datasource.query.type", "read_optimized").load(HUDI_BASE_PATH)
    print("Read-Optimized DataFrame loaded from Hudi table:")
    trip_ro_df.show(5)
    print("Creating temporary view for SQL queries...")
    trip_ro_df.createOrReplaceTempView("hudi_trips_ro")
    print("Read-Optimized query result:")
    spark.sql("SELECT * FROM hudi_trips_ro ORDER BY ts DESC").show(10)
    spark.sql("SELECT city, COUNT(*) as trip_count FROM hudi_trips_ro GROUP BY city ORDER BY trip_count DESC").show()
    # Saving this to a df for further use if needed
    hudi_ro_df = spark.sql("SELECT * FROM hudi_trips_ro ORDER BY ts DESC")
    hudi_ro_df.show(5)
    return trip_ro_df

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()

# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()

# Call the read optimized function
read_optimized_query_hudi(spark,hudi_base_path)

In [None]:
# This function will get all the commits and select the earliest one
def read_earliest_commit_incremental(spark, base_path):
    """
    Performs an incremental query from the earliest commit time.
    
    Args:
        spark (SparkSession): The active Spark session.
        base_path (str): The base path to the Hudi table.
    """
    print("Performing incremental query from the earliest commit...")
    
    # Read all commits and find the earliest one
    # Note: This loads metadata, not the full dataset, so it's efficient.
    try:
        commits = spark.read.format("hudi").load(base_path).select("_hoodie_commit_time").distinct().collect()
        if not commits:
            print("No commits found in the table. Cannot perform incremental query.")
            return
        
        # Sort commits and get the earliest one
        begin_time = sorted([row["_hoodie_commit_time"] for row in commits])[0]
        print(f"Earliest commit found: {begin_time}")
        
    except NameError as e:
        print(f"Error reading commits. Make sure the Hudi table exists at: {base_path}")
        print(e)
        return

    # Perform the incremental query using the earliest commit time
    inc_query_df = spark.read.format("hudi") \
        .option("hoodie.datasource.query.type", "incremental") \
        .option("hoodie.datasource.read.begin.instanttime", begin_time) \
        .load(base_path)

    print("Incremental query DataFrame loaded.")
    inc_query_df.show(5)
    
    # Run a sample SQL query on the incremental data
    inc_query_df.createOrReplaceTempView("hudi_trips_incremental")
    print("Incremental query result (fare > 20.0):")
    spark.sql("SELECT trip_id, fare, ts FROM hudi_trips_incremental WHERE fare > 20.0").show(10)
    print("Incremental query completed.")
    return inc_query_df

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()

# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()

# call this function
read_earliest_commit_incremental(spark, hudi_base_path)


In [None]:
# Define a function to hard delete records from Hudi table
def hard_delete_hudi(spark, hudi_options_base, HUDI_BASE_PATH):
    print("Performing hard delete on Hudi table...")
    # Create a DataFrame with the record keys to delete
    delete_data = [
        ("trip_3",),  # Assuming trip_3 exists
        ("trip_50",)  # Assuming trip_50 exists
    ]
    delete_schema = StructType([
        StructField("trip_id", StringType(), False)
    ])
    delete_df = spark.createDataFrame(delete_data, delete_schema)
    
    # Set the operation to 'delete'
    hudi_options_delete = hudi_options_base.copy()
    hudi_options_delete['hoodie.datasource.write.operation'] = 'delete'
    
    print("Writing delete records to Hudi table...")
    delete_df.write.format("hudi") \
        .options(**hudi_options_delete) \
        .mode("append") \
        .save(HUDI_BASE_PATH)
    print("Hudi table hard delete completed.")
    hudi_read_df = spark.read.format("hudi").load(HUDI_BASE_PATH)
    hudi_read_df.show()
    return hudi_read_df

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()
# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()
# Call the hard delete function
hard_delete_hudi(spark, hudi_options_base, hudi_base_path)

In [None]:
# Define a function to create a DataFrame with schema evolution and upsert to Hudi table
# Schema Evolution in Hudi allows you to add or drop columns over time without having to rebuild the entire table.

import time, datetime
from datetime import datetime

def schema_evolution_upsert_hudi(spark, hudi_options_base, HUDI_BASE_PATH):
    print("Performing schema evolution upsert to Hudi table...")
    # Create a DataFrame with new data that has an additional column 'payment_method'
    schema_evolution_data = [
        ("trip_4", datetime.fromtimestamp(time.time() + 201), "rider_4", "driver_4", 150.0, "sf", "2025-09-04", "credit_card"),
        ("trip_5", datetime.fromtimestamp(time.time() + 202), "rider_5", "driver_5", 80.0, "nyc", "2025-09-05", "paypal")
    ]
    schema_evolution_schema = StructType([
        StructField("trip_id", StringType(), False),
        StructField("ts", TimestampType(), True),
        StructField("rider", StringType(), True),
        StructField("driver_id", StringType(), True),
        StructField("fare", DoubleType(), True),
        StructField("city", StringType(), True),
        StructField("trip_date", StringType(), True),
        StructField("payment_method", StringType(), True)  # New column added
    ])
    schema_evolution_df = spark.createDataFrame(schema_evolution_data, schema_evolution_schema)
    schema_evolution_df = schema_evolution_df.withColumn("uuid", lit(None).cast(StringType())) # Let Hudi determine the uuid for new records
    
    # Set the operation to 'upsert' and enable schema evolution
    hudi_options_se = hudi_options_base.copy()
    hudi_options_se['hoodie.datasource.write.operation'] = 'upsert'
    hudi_options_se['hoodie.datasource.write.table.type'] = 'COPY_ON_WRITE'
    hudi_options_se['hoodie.datasource.write.schema.evolution.enable'] = 'true'
    
    print("Writing schema-evolved data to Hudi table...")
    schema_evolution_df.write.format("hudi") \
        .options(**hudi_options_se) \
        .mode("append") \
        .save(HUDI_BASE_PATH)
    print("Hudi table schema evolution upsert completed.")


    hudi_read_df = spark.read.format("hudi").load(HUDI_BASE_PATH)
    hudi_read_df.show()
    return hudi_read_df

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()
# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()

# Call the schema evolution upsert function
schema_evolution_upsert_hudi(spark, hudi_options_base, hudi_base_path)


In [None]:
# Define a function to do a new indexed read query on the Hudi table
def indexed_read_query_hudi(spark, HUDI_BASE_PATH):
    print("Performing indexed read query on Hudi table...")
    trip_indexed_df = spark.read.format("hudi").option("hoodie.datasource.query.type", "snapshot").load(HUDI_BASE_PATH)
    print("Indexed Read DataFrame loaded from Hudi table:")
    trip_indexed_df.show(5)
    print("Creating temporary view for SQL queries...")
    trip_indexed_df.createOrReplaceTempView("hudi_trips_indexed")
    print("Indexed read query result:")
    spark.sql("SELECT * FROM hudi_trips_indexed ORDER BY ts DESC").show(10)
    spark.sql("SELECT city, COUNT(*) as trip_count FROM hudi_trips_indexed GROUP BY city ORDER BY trip_count DESC").show()
    # Saving this to a df for further use if needed
    hudi_indexed_df = spark.sql("SELECT * FROM hudi_trips_indexed ORDER BY ts DESC")
    hudi_indexed_df.show(5)
    return trip_indexed_df

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()

# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()
# Call the indexed read query function
indexed_read_query_hudi(spark, hudi_base_path)
# This function will clean up the Hudi table data directory

In [None]:
# Define a function to show time travel query on the Hudi table
def time_travel_query_hudi(spark, HUDI_BASE_PATH):
    print("Performing time travel query on Hudi table...")
    
    # Read all commits and find the earliest one
    # Note: This loads metadata, not the full dataset, so it's efficient.
    try:
        commits = spark.read.format("hudi").load(HUDI_BASE_PATH).select("_hoodie_commit_time").distinct().collect()
        if not commits:
            print("No commits found in the table. Cannot perform time travel query.")
            return
        
        # Sort commits and get the earliest one
        commit_times = sorted([row["_hoodie_commit_time"] for row in commits])
        print(f"Available commits: {commit_times}")
        
    except NameError as e:
        print(f"Error reading commits. Make sure the Hudi table exists at: {HUDI_BASE_PATH}")
        print(e)
        return

    # For demonstration, we'll pick the second commit if available
    if len(commit_times) < 2:
        print("Not enough commits to perform time travel. Need at least 2 commits.")
        return
    
    travel_time = commit_times[1]
    print(f"Time travel to commit: {travel_time}")

    # Perform the time travel query using the selected commit time
    time_travel_df = spark.read.format("hudi") \
        .option("as.of.instant", travel_time) \
        .load(HUDI_BASE_PATH)
    print("Time travel query DataFrame loaded.")
    time_travel_df.show(5)
    # Run a sample SQL query on the time-traveled data
    time_travel_df.createOrReplaceTempView("hudi_trips_time_travel")
    print("Time travel query result:")
    spark.sql("SELECT trip_id, fare, ts FROM hudi_trips_time_travel ORDER BY ts DESC").show(10)
    print("Time travel query completed.")
    return time_travel_df

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()
# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()
# Call the time travel query function
time_travel_query_hudi(spark, hudi_base_path)


In [None]:
# Define a function to create an index using Bloom filter on the Hudi table
# This shows an example of using Bloom filter index for upsert operations with the matching schema
# So, this function will create a new DataFrame with the same schema as the existing Hudi table
# and perform an upsert operation using the Bloom filter index.
import time, datetime
from datetime import datetime
def create_bloom_index_hudi(spark, hudi_options_base, HUDI_BASE_PATH):
    print("Creating Bloom filter index on Hudi table...")
    print("Performing upsert with Bloom filter index enabled...")
    # Create a DataFrame with new data for bulk insert
    # This data will either update existing records or insert new ones
    hudi_read_df = spark.read.format("hudi").load(HUDI_BASE_PATH) 
    print("Current Hudi table data before Bloom index upsert:") 
    hudi_read_df.show(5)
       # Create the upsert data with values for the new columns
    upsert_data = [
        # Update an existing record (trip_2)
        ("trip_2", datetime.fromtimestamp(time.time()), "rider_2_updated", "driver_2", 100.0, "la", "2025-09-03", "credit", "some-uuid"),
        # Insert a new record (trip_101)
        ("trip_101", datetime.fromtimestamp(time.time()), "rider_101", "driver_101", 200.0, "nyc", "2025-09-15", "paypal", "another-uuid")
    ]
    upsert_schema = StructType([
        StructField("trip_id", StringType(), False),
        StructField("ts", TimestampType(), True),
        StructField("rider", StringType(), True),
        StructField("driver_id", StringType(), True),
        StructField("fare", DoubleType(), True),
        StructField("city", StringType(), True),
        # Add the missing columns from the existing table's schema
        StructField("trip_date", StringType(), True),
        StructField("payment_method", StringType(), True),
        StructField("uuid", StringType(), True)
    ])
    upsert_df = spark.createDataFrame(upsert_data, upsert_schema)
    
    # Set the table type to Copy-on-Write and enable Bloom filter index
    hudi_options_bloom = hudi_options_base.copy()
    hudi_options_bloom['hoodie.datasource.write.table.type'] = 'COPY_ON_WRITE'
    hudi_options_bloom['hoodie.datasource.write.operation'] = 'upsert'
    hudi_options_bloom['hoodie.index.type'] = 'BLOOM'

    print("Writing data with Bloom filter index to Hudi table...")
    upsert_df.write.format("hudi") \
        .options(**hudi_options_bloom) \
        .mode("append") \
        .save(HUDI_BASE_PATH)
    print("Hudi table with Bloom filter index created successfully.")

    hudi_read_df = spark.read.format("hudi").load(HUDI_BASE_PATH)
    hudi_read_df.show()
    return hudi_read_df

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()
# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()
# Call the Bloom filter index creation function
create_bloom_index_hudi(spark, hudi_options_base, hudi_base_path)

In [None]:
# This function will show using a Bloom filter index for upsert operations with a mismatched schema
# In this example, the new DataFrame has a different schema than the existing Hudi table
import time, datetime
from datetime import datetime
def bloom_index_mismatched_schema_hudi(spark, hudi_options_base, HUDI_BASE_PATH):
    print("Performing upsert with Bloom filter index and mismatched schema on Hudi table...")
    # Create a DataFrame with new data that has a different schema
    # For example, it has an additional column 'vehicle_type' and is missing 'payment_method'
    mismatched_data = [
        ("trip_6", datetime.fromtimestamp(time.time() + 301), "rider_6", "driver_6", 120.0, "sf", "2025-09-06", "car"),
        ("trip_7", datetime.fromtimestamp(time.time() + 302), "rider_7", "driver_7", 90.0, "nyc", "2025-09-07", "bike")
    ]
    mismatched_schema = StructType([
        StructField("trip_id", StringType(), False),
        StructField("ts", TimestampType(), True),
        StructField("rider", StringType(), True),
        StructField("driver_id", StringType(), True),
        StructField("fare", DoubleType(), True),
        StructField("city", StringType(), True),
        StructField("trip_date", StringType(), True),
        StructField("vehicle_type", StringType(), True)  # New column added
        # Note: 'payment_method' column is missing
    ])
    mismatched_df = spark.createDataFrame(mismatched_data, mismatched_schema)
    mismatched_df = mismatched_df.withColumn("uuid", lit(None).cast(StringType())) # Let Hudi determine the uuid for new records
    
    # Set the operation to 'upsert' and enable Bloom filter index
    hudi_options_bloom_mismatch = hudi_options_base.copy()
    hudi_options_bloom_mismatch['hoodie.datasource.write.operation'] = 'upsert'
    hudi_options_bloom_mismatch['hoodie.datasource.write.table.type'] = 'COPY_ON_WRITE'
    hudi_options_bloom_mismatch['hoodie.index.type'] = 'BLOOM'
    hudi_options_bloom_mismatch['hoodie.datasource.write.schema.evolution.enable'] = 'true'  # Enable schema evolution
    
    print("Writing data with Bloom filter index and mismatched schema to Hudi table...")
    mismatched_df.write.format("hudi") \
        .options(**hudi_options_bloom_mismatch) \
        .mode("append") \
        .save(HUDI_BASE_PATH)
    print("Hudi table with Bloom filter index and mismatched schema upsert completed.")

# get or create Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    print("No active SparkSession found. Please run your setup cell first.")
    spark=create_spark_session()
# Get Hudi parameters and the base path
hudi_options_base, hudi_base_path = get_hudi_param()
# Call the Bloom filter index creation function
bloom_index_mismatched_schema_hudi(spark, hudi_options_base, hudi_base_path)

In [None]:
# This function will stop and clean up the Spark session
# --- IGNORE ---
def stop_spark_session(spark):
    if spark:
        print("Stopping Spark session...")
        spark.stop()
        print("Spark session stopped.")
    else:
        print("No Spark session to stop.")
# --- IGNORE ---
# Call the stop function at the end of your notebook or script
# --- IGNORE ---

stop_spark_session(spark)