In [0]:
# Databricks Widgets Setup
dbutils.widgets.combobox("Environment", "", ["dev", "prod"])
dbutils.widgets.text("CatalogName", "_nyctaxi_catalog")
dbutils.widgets.combobox("SchemaName", "",["bronze", "silver", "gold"])
dbutils.widgets.text("Table_name","bronze_nyc_taxi")
dbutils.widgets.combobox("StoragePath", "",["landing_path","bronze_path", "silver_path", "gold_path"])

# Get widget values
Environment = dbutils.widgets.get("Environment")
catalog_name = dbutils.widgets.get("CatalogName")
schema_name = dbutils.widgets.get("SchemaName")
table_name = dbutils.widgets.get("Table_name")
storage_path = dbutils.widgets.get("StoragePath")



In [0]:
#dbutils.widgets.removeAll()

In [0]:
landing_path = spark.sql("""DESCRIBE EXTERNAL LOCATION landingzone_external_location""").select("url").collect()[0][0]
bronze_path = spark.sql("""DESCRIBE EXTERNAL LOCATION bronze_external_location""").select("url").collect()[0][0]
silver_path = spark.sql("""DESCRIBE EXTERNAL LOCATION silver_external_location""").select("url").collect()[0][0]
gold_path = spark.sql("""DESCRIBE EXTERNAL LOCATION gold_external_location""").select("url").collect()[0][0]

print(bronze_path)
print(silver_path)
print(gold_path)
print(landing_path)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.types import *      

# Create Spark session
spark = SparkSession.builder.getOrCreate()
print("Spark session created successfully")

def read_landingzone_data(landing_path):
    # Variable to track last processed file time (initially None)
    last_processed_file = None

    # Manually define schema of the data
    manual_schema = StructType([
        StructField("VendorID", IntegerType(), True),
        StructField("tpep_pickup_datetime", TimestampType(), True),
        StructField("tpep_dropoff_datetime", TimestampType(), True),
        StructField("passenger_count", LongType(), True),
        StructField("trip_distance", DoubleType(), True),
        StructField("RatecodeID", LongType(), True),
        StructField("store_and_fwd_flag", StringType(), True),
        StructField("PULocationID", IntegerType(), True),
        StructField("DOLocationID", IntegerType(), True),
        StructField("payment_type", LongType(), True),
        StructField("fare_amount", DoubleType(), True),
        StructField("extra", DoubleType(), True),
        StructField("mta_tax", DoubleType(), True),
        StructField("tip_amount", DoubleType(), True),
        StructField("tolls_amount", DoubleType(), True),
        StructField("improvement_surcharge", DoubleType(), True),
        StructField("total_amount", DoubleType(), True),
        StructField("congestion_surcharge", DoubleType(), True),
        StructField("Airport_fee", DoubleType(), True)
    ])

    # Read data from landing zone using schema
    read_df = spark.read.format("parquet")\
                        .schema(manual_schema)\
                        .option("header", "true")\
                        .load(landing_path)
    
    # Add current timestamp column to data
    add_columns = read_df.withColumn("ingestion_timestamp", current_timestamp())

    # Print schema of dataframe
    print("Schema of data after adding ingestion_timestamp:")
    add_columns.printSchema()

    # Check if it's the first run (no previous file processed)
    if last_processed_file == None:
        print("First time reading data. Returning full data with timestamp.")
        return add_columns
    else:
        # Get the latest timestamp from processed data
        last_processed_file = add_columns.select("ingestion_timestamp").agg(max("ingestion_timestamp")).collect()[0][0]
        print("Last processed file timestamp:", last_processed_file)

        # Filter only new data if new data has arrived
        if last_processed_file < current_timeStamp():
            filter_df = add_columns.filter(add_columns.ingestion_timestamp > last_processed_file)
            print("New data found. Returning filtered data.")
            return filter_df


In [0]:
def standardize_column_names(df):
    # Define original-to-standard name mapping manually
    column_mapping = {
        "VendorID": "vendor_id",
        "tpep_pickup_datetime": "pickup_datetime",
        "tpep_dropoff_datetime": "dropoff_datetime",
        "passenger_count": "passenger_count",
        "trip_distance": "trip_distance",
        "RatecodeID": "ratecode_id",
        "store_and_fwd_flag": "store_and_fwd_flag",
        "PULocationID": "pu_location_id",
        "DOLocationID": "do_location_id",
        "payment_type": "payment_type",
        "fare_amount": "fare_amount",
        "extra": "extra",
        "mta_tax": "mta_tax",
        "tip_amount": "tip_amount",
        "tolls_amount": "tolls_amount",
        "improvement_surcharge": "improvement_surcharge",
        "total_amount": "total_amount",
        "congestion_surcharge": "congestion_surcharge",
        "Airport_fee": "airport_fee",
        "ingestion_timestamp": "ingestion_timestamp"
    }
    for original_name, standard_name in column_mapping.items():
        df = df.withColumnRenamed(original_name, standard_name)
    return df
    

In [0]:
def standardize_payment_type(df):
    df = df.withColumn("payment_type", col("payment_type").cast(StringType()))
    df = df.withColumn(
        "payment_type",
        when(col("payment_type") == 1, "Credit Card")
        .when(col("payment_type") == 2, "Cash")
        .when(col("payment_type") == 3, "No Charge")
        .when(col("payment_type") == 4, "Dispute")
        .when(col("payment_type") == 5, "Unknown")
        .when(col("payment_type") == 6, "Voided Trip")
        .otherwise("Other")
    )
    
    return df

In [0]:
def clean_and_handle_null_values(df):
    # Drop rows with nulls in critical columns
    critical_columns = ["pickup_datetime", "dropoff_datetime", "total_amount"]

    # Count nulls in each critical column and print
    for col_name in critical_columns:
        null_count = df.filter(df[col_name].isNull()).count()
        print(f"Null count in column '{col_name}': {null_count}")

    df_cleaned = df.dropna(subset=critical_columns)

    fill_values = {
        "Airport_fee": 0.0,
        "congestion_surcharge": 0.0,
        "tip_amount": 0.0,
        "tolls_amount": 0.0,
        "improvement_surcharge": 0.0,
        "mta_tax": 0.0,
        "extra": 0.0,
        "fare_amount": 0.0
    }
    df_cleaned = df_cleaned.fillna(fill_values)
    
    df_cleaned =  df_cleaned.filter(
        (col("passenger_count") > 0) &
        (col("trip_distance") > 0) &
        (col("fare_amount") >= 0) &
        (col("total_amount") >= 0)
    )
    return df_cleaned
    

In [0]:
def remove_duplicates(df):
    # Total count before removing duplicates
    original_count = df.count()
    # Remove duplicate rows
    df_deduplicated = df.dropDuplicates()
    # Count after removing duplicates
    deduplicated_count = df_deduplicated.count()
    # Calculate number of duplicate rows
    duplicates_removed = original_count - deduplicated_count

    print(f"Original row count: {original_count}")
    print(f"Row count after removing duplicates: {deduplicated_count}")
    print(f"Number of duplicate rows removed: {duplicates_removed}")

    return df_deduplicated

In [0]:
def write_to_bronze(rep_values, Environment, catalog_name, schema_name, table_name):
    # Create full table path using environment, catalog, schema, and table name
    full_bronze_path = f"{Environment}{catalog_name}.{schema_name}.{table_name}"
    print("Full table path:", full_bronze_path)
    # Print the full path where data will be saved
    print("Saving data to table:", full_bronze_path)

    # Write data to Delta table in append mode with mergeSchema option
    rep_values.write.mode("append")\
                 .format("delta")\
                 .option("mergeSchema", "true")\
                 .saveAsTable(f"{full_bronze_path}")



In [0]:
# Step 1: Read data from landing zone (Parquet format) using defined schema
read_df = read_landingzone_data(landing_path)

# Step 2: Rename all columns to standardized names
renamed_df = standardize_column_names(read_df)  # corrected variable name

# Step 3: Convert numeric payment_type to string values
payment_df = standardize_payment_type(renamed_df)  # used renamed_df instead of undefined df

# Step 4: Clean null and invalid values from critical columns
null_df = clean_and_handle_null_values(payment_df)  # corrected typo in function name

# Step 5: Remove duplicate rows and print stats
deduped_df = remove_duplicates(null_df)

# Step 6: Write the final clean and transformed data to the bronze Delta table
write_to_bronze(deduped_df, Environment, catalog_name, schema_name, table_name)

# Final message
print("Landing zone data written to bronze table successfully")


In [0]:
%sql
SELECT count(*) 
FROM dev_nyctaxi_catalog.bronze.bronze_nyc_taxi

In [0]:
#spark.sql("DELETE FROM dev_nyctaxi_catalog.bronze.bronze_nyc_taxi")