##Mounting the folder

In [0]:
# Configuration
storage_account_name = "STORAGE ACCOUNT NAME" # <--- UPDATE THIS
storage_account_key = "KEY" # <--- UPDATE THIS
container_name = "bronze"

# Create the specific URL for mounting
mount_point = f"/mnt/{container_name}"
source_url = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"
extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}

# Check if already mounted to avoid errors
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    print(f"{mount_point} is already mounted.")
else:
    # Mount the drive
    dbutils.fs.mount(
        source=source_url,
        mount_point=mount_point,
        extra_configs=extra_configs
    )
    print(f"Mounted {mount_point} successfully!")

In [0]:
# %fs ls /mnt/bronze/raw_files/

In [0]:
from pyspark.sql.functions import lit
source_path = "/mnt/bronze/raw_files/"

# 1. Get the list of file paths
file_paths = [file.path for file in dbutils.fs.ls(source_path)]

# 2. Define an empty DataFrame to start
final_df = None

# 3. Loop through every file
for path in file_paths:
    # Read the individual file
    # We allow the "Vectorized Reader" to be off to handle small type changes
    spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
    
    current_df = spark.read.parquet(path)
    
    # If this is the first file, make it the "Master" DataFrame
    if final_df is None:
        final_df = current_df
    else:
        # 4. The Magic Trick: unionByName with allowMissingColumns=True
        # This merges them even if schemas are slightly different
        final_df = final_df.unionByName(current_df, allowMissingColumns=True)

# 5. Display the result
print("Success! All files merged.")
display(final_df)

In [0]:
# # to check the df has all 12 months
# from pyspark.sql.functions import col, month

# # Get a list of unique months present in the data
# present_months = [row['month'] for row in final_df.select(month(col("tpep_pickup_datetime")).alias("month")).distinct().collect()]

# # Compare against the full set of months (1 to 12)
# all_months = set(range(1, 13))
# print(all_months)

## Silver layer transformation

In [0]:
# Configuration for Silver
container_name = "silver"
storage_account_name = "stgadlsurban" # Put your storage account name here
storage_account_key = "AmkqdoobmkU45h233gmEtdCIVQJVzu434cRSfjmlrJKhUGzR1F4+5Scp9hHgl6Rst45+C0CpI9/5+AStlsmzYg==" # access_key

source_url = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"
mount_point = f"/mnt/{container_name}"
extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}

# Mount Silver if not already mounted
if not any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source=source_url,
        mount_point=mount_point,
        extra_configs=extra_configs
    )
    print(f"Mounted {mount_point} successfully!")
else:
    print(f"{mount_point} is already mounted.")

In [0]:
from pyspark.sql.functions import col, to_timestamp, current_timestamp

# 1. Select and Rename columns for clarity
# We are creating a new DataFrame 'silver_df' with clean types
silver_df = final_df.select(
    col("VendorID").cast("long").alias("vendor_id"),
    to_timestamp(col("tpep_pickup_datetime")).alias("pickup_time"),
    to_timestamp(col("tpep_dropoff_datetime")).alias("dropoff_time"),
    col("passenger_count").cast("int"),
    col("trip_distance").cast("double"),
    col("fare_amount").cast("double"),
    col("total_amount").cast("double"),
    col("payment_type").cast("long"),
    col("PULocationID").cast("long"),
    col("DOLocationID").cast("long"),
    col("RatecodeID").cast("long")
)

# 2. Apply Filters (Business Logic)
# Remove rows where trip_distance is zero or fare is negative (bad data)
silver_df_clean = silver_df.filter(
    (col("trip_distance") > 0) & 
    (col("total_amount") > 0)
)

# 3. Add an "Ingestion Date" (Audit Trail)
silver_df_clean = silver_df_clean.withColumn("ingestion_date", current_timestamp())

# Verify the clean data
print("Silver Data Preview:")
display(silver_df_clean)

In [0]:
# Write the clean data to the Silver layer in Delta format
silver_path = "/mnt/silver/taxi_trips"

silver_df_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(silver_path)

print(f"Successfully wrote Delta Table to {silver_path}")

In [0]:
%fs ls /mnt/silver/taxi_trips

In [0]:
# Step 1: Download the CSV file from the web to the driver's temp folder
import os

# Define the URL and where we want to save it locally on the cluster
url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
local_path = "/tmp/taxi_zone_lookup.csv"

# Use wget to download (Linux command)
os.system(f"wget {url} -O {local_path}")

# Step 2: Read the downloaded CSV into Spark
# We use "file://" to tell Spark to look at the local hard drive
zone_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f"file://{local_path}")

# Step 3: Save it as a Delta Table in your Silver layer
zone_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/mnt/silver/taxi_zone_lookup")

# Step 4: Register it as a SQL table
spark.sql("""
    CREATE TABLE IF NOT EXISTS urban_mobility.taxi_zone_lookup
    USING DELTA
    LOCATION '/mnt/silver/taxi_zone_lookup'
""")

print("Success! Taxi Zone Lookup table created.")
display(zone_df)

In [0]:
# 1. Create a Database (Schema) to organize our tables
spark.sql("CREATE DATABASE IF NOT EXISTS urban_mobility")

# 2. Register the Silver Delta files as a Table
# This doesn't move data; it just points to your existing Silver folder.
spark.sql("""
    CREATE TABLE IF NOT EXISTS urban_mobility.silver_taxi
    USING DELTA
    LOCATION '/mnt/silver/taxi_trips'
""")

print("Table 'urban_mobility.silver_taxi' created successfully!")

In [0]:
%sql
SELECT * FROM urban_mobility.silver_taxi LIMIT 10