## üöÄ Mount AWS S3 Bucket to Databricks File System (DBFS)

In [0]:
import urllib.parse  # Used to safely encode the secret key for URL usage

# AWS credentials (use secrets management in production for security)
ACCESS_KEY = "<add access key>"
SECRET_KEY = "<add secret key>"

# URL-encode the secret key to handle special characters
ENCODED_SECRET_KEY = urllib.parse.quote(SECRET_KEY, safe='')

# Define the S3 bucket name and the DBFS mount point
aws_bucket_name = "taxi-weather-analytics-s3-bucket"
mnt_name = "/mnt"

# Mount the S3 bucket to DBFS using the s3a protocol and regional endpoint
dbutils.fs.mount(
  source = f"s3a://{ACCESS_KEY}:{ENCODED_SECRET_KEY}@{aws_bucket_name}",  # Authenticated S3 URI
  mount_point = mnt_name,  # Local mount point in DBFS
  extra_configs = {
    "fs.s3a.endpoint": "s3.ca-central-1.amazonaws.com"  # Specify AWS region endpoint
  }
)

In [0]:
# List files inside your mounted bucket path
display(dbutils.fs.ls("/mnt"))

# Deal with NYC taxi Data

## üõ†Ô∏è Load NYC Taxi Data and Register Bronze Delta Table in Hive Metastore

In [0]:
# Read July 2025 yellow taxi trip data from the mounted S3 landing zone
df_taxi = spark.read.parquet(f"{mnt_name}/landing/nyc/taxi/2025/07/yellow_tripdata_2025-07.parquet")

# Create the bronze_nyc schema in the Hive metastore if it doesn't already exist
spark.sql("CREATE SCHEMA IF NOT EXISTS hive_metastore.bronze_nyc")

# Define the target path for saving the Delta table in the bronze zone
target_path = "/mnt/bronze/nyc/taxi/2025/07/daily_trip_2025-07"

# Write the DataFrame as a Delta table to the bronze path, overwriting if it exists
df_taxi.write \
    .format("delta") \
    .mode("overwrite") \
    .save(target_path)

# Register the Delta table in the Hive metastore for SQL access
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS hive_metastore.bronze_nyc.daily_trip
    USING DELTA
    LOCATION '{target_path}'
""")

In [0]:
%sql
SELECT * FROM hive_metastore.bronze_nyc.daily_trip

In [0]:
%sql
-- DROP SCHEMA hive_metastore.bronze_nyc

In [0]:
%sql
-- üìä Query all records from the bronze-level NYC daily taxi trip Delta table
SELECT * FROM hive_metastore.bronze_nyc.daily_trip;

## ‚ú® Clean and Standardize NYC Taxi Data for Silver Layer Processing

In [0]:
from pyspark.sql.functions import col, to_timestamp, to_date

# Apply type casting and timestamp normalization to prepare data for analytics and downstream modeling
taxi_silver = (
    df_taxi
    .withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime"))  # Convert pickup time to timestamp
    .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))  # Convert dropoff time to timestamp
    .withColumn("passenger_count", col("passenger_count").cast("int"))  # Ensure passenger count is integer
    .withColumn("trip_distance", col("trip_distance").cast("double"))  # Cast trip distance to double
    .withColumn("fare_amount", col("fare_amount").cast("double"))  # Cast fare amount to double
    .withColumn("tip_amount", col("tip_amount").cast("double"))  # Cast tip amount to double
    .withColumn("total_amount", col("total_amount").cast("double"))  # Cast total amount to double
    .withColumn("date", to_date(col("tpep_pickup_datetime")))  # Extract date for partitioning or filtering
    .withColumn("pickup_datetime", col("tpep_pickup_datetime").cast("timestamp"))  # Duplicate pickup time for clarity
)

# Display the transformed silver-level DataFrame for inspection
display(taxi_silver)

In [0]:
# Check count before cleaning
taxi_silver.count()

In [0]:
# Clean taxi data
df_taxi_silver_clean = taxi_silver \
  .dropDuplicates()
# Check count after removing duplicate
df_taxi_silver_clean.count()

In [0]:
# Display the cleaned taxi data
display(df_taxi_silver_clean)

## üßº Persist Cleaned NYC Taxi Data to Silver Layer and Register Delta Table

In [0]:
# Create the silver_nyc schema in the Hive metastore if it doesn't already exist
spark.sql("CREATE SCHEMA IF NOT EXISTS hive_metastore.silver_nyc")

# Define the target path for saving the cleaned Delta table in the silver zone
target_path = "/mnt/silver/nyc/taxi/2025/07/cleaned_daily_trip_2025-07"

# Write the cleaned DataFrame to the silver path as a Delta table, overwriting if it exists
df_taxi_silver_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .save(target_path)

# Register the cleaned Delta table in the Hive metastore for SQL access
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS hive_metastore.silver_nyc.cleaned_daily_trip
    USING DELTA
    LOCATION '{target_path}'
""")

In [0]:
%sql
-- Display cleaned taxi data
SELECT * FROM hive_metastore.silver_nyc.cleaned_daily_trip

## Prepare silver taxi trip data for gold layer

In [0]:
from pyspark.sql.functions import avg, count, sum

# Taxi daily summary
df_taxi_silver_agg = df_taxi_silver_clean \
    .groupBy("date") \
    .agg(
        count("*").alias("trip_count"), 
        avg("fare_amount").alias("avg_fare"),
        sum("total_amount").alias("total_revenue")
    )

display(df_taxi_silver_agg)

## Deal with Weather Data

In [0]:
# üìÇ List all top-level directories and files under the mounted S3 path (/mnt)
display(dbutils.fs.ls(mnt_name))

# Ingest weather data and persist the object

## üå¶Ô∏è Define Nested Schema for Daily Weather JSON Response

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType

# Create a structured schema to parse weather JSON data with nested fields and arrays
w_schema = StructType([
    StructField("latitude", DoubleType(), True),  # Geographic latitude of the location
    StructField("longitude", DoubleType(), True),  # Geographic longitude of the location
    StructField("generationtime_ms", DoubleType(), True),  # Time taken to generate the response (ms)
    StructField("utc_offset_seconds", IntegerType(), True),  # UTC offset in seconds
    StructField("timezone", StringType(), True),  # Full timezone name (e.g., America/New_York)
    StructField("timezone_abbreviation", StringType(), True),  # Abbreviated timezone (e.g., EST)
    StructField("elevation", DoubleType(), True),  # Elevation of the location in meters

    # Units for each daily weather metric (e.g., ¬∞C, mm)
    StructField("daily_units", StructType([
        StructField("time", StringType(), True),
        StructField("temperature_2m_max", StringType(), True),
        StructField("temperature_2m_min", StringType(), True),
        StructField("precipitation_sum", StringType(), True)
    ]), True),

    # Actual daily weather data as arrays
    StructField("daily", StructType([
        StructField("time", ArrayType(StringType()), True),  # Dates for each observation
        StructField("temperature_2m_max", ArrayType(DoubleType()), True),  # Daily max temperatures
        StructField("temperature_2m_min", ArrayType(DoubleType()), True),  # Daily min temperatures
        StructField("precipitation_sum", ArrayType(DoubleType()), True)  # Daily total precipitation
    ]), True)
])

### üå§Ô∏è Load Raw Daily Weather JSON into DataFrame Using Defined Schema

In [0]:
# Read weather data from the mounted S3 landing path using the nested schema for proper parsing
weather_df = (
    spark.read
         .schema(w_schema)
         .json(f"{mnt_name}/landing/nyc/weather/2025/07/")
)

# Display the parsed weather DataFrame for inspection and validation
display(weather_df)

### üåßÔ∏è Flatten and Categorize Daily Weather Data for Silver Layer Enrichment

In [0]:
from pyspark.sql.functions import explode, arrays_zip, col, when

# Step 1: Zip the daily weather arrays into a single array of structs for row-wise expansion
zipped_df = weather_df.select(
    "latitude", "longitude", "generationtime_ms", "utc_offset_seconds",
    "timezone", "timezone_abbreviation", "elevation",
    col("daily_units.*"),  # Flatten the daily_units struct into individual columns
    explode(arrays_zip(  # Combine daily arrays into a single array of structs and explode into rows
        col("daily.time"),
        col("daily.temperature_2m_max"),
        col("daily.temperature_2m_min"),
        col("daily.precipitation_sum")
    )).alias("daily")  # Alias the exploded struct as 'daily'
)

# Step 2: Flatten the zipped struct into individual columns for analysis and joining
bronze_flat_df = zipped_df.select(
    "latitude", "longitude", "generationtime_ms", "utc_offset_seconds",
    "timezone", "timezone_abbreviation", "elevation",
    col("time").alias("unit_time_format"),  # Units metadata for time
    col("temperature_2m_max").alias("unit_temp_max"),  # Units metadata for max temp
    col("temperature_2m_min").alias("unit_temp_min"),  # Units metadata for min temp
    col("precipitation_sum").alias("unit_precip_mm"),  # Units metadata for precipitation
    col("daily.time").alias("date"),  # Actual date of observation
    col("daily.temperature_2m_max").alias("temperature_max"),  # Max temperature value
    col("daily.temperature_2m_min").alias("temperature_min"),  # Min temperature value
    col("daily.precipitation_sum").alias("precipitation_mm")  # Precipitation value
)

# Step 3: Add a rainfall category column based on precipitation thresholds
silver_cleaned_df = bronze_flat_df.withColumn(
    "rainfall_category",
    when(col("precipitation_mm") < 2, "Dry/No rain")
    .when(col("precipitation_mm") < 10, "Light Rain")
    .when(col("precipitation_mm") < 30, "Moderate Rain")
    .when(col("precipitation_mm") < 60, "Heavy Rain")
    .when(col("precipitation_mm") < 100, "Very Heavy Rain")
    .otherwise("Extreme Rainfall")
)

# Display the cleaned and categorized silver-level weather DataFrame
silver_cleaned_df.display(5)

### üåü Enrich Aggregated Taxi Data with Daily Weather Metrics and Rainfall Classification

In [0]:
# Select relevant weather columns for joining with taxi data by date
flat_df_selection = silver_cleaned_df.select(
    "date",  # Date of weather observation
    "temperature_max",  # Daily maximum temperature
    "temperature_min",  # Daily minimum temperature
    "precipitation_mm",  # Total daily precipitation in mm
    "rainfall_category"  # Categorized rainfall intensity
)

# Perform a left join to enrich taxi aggregates with weather data based on matching date
df_taxi_gold_updated = df_taxi_silver_agg.join(flat_df_selection, "date", "left")

# Display the enriched gold-level DataFrame for inspection and downstream analysis
display(df_taxi_gold_updated)

## üèÜ Persist Final Gold-Level Taxi + Weather Dataset and Register Delta Table in Hive Metastore

### üß© Reusable Function to Save DataFrame as Delta Table and Register in Hive Metastore

In [0]:
def save_and_register_delta_table(df, target_path, schema_name, table_name):
    """
    Saves a DataFrame to the specified path as a Delta table and registers it in the Hive metastore.

    Parameters:
    - df (DataFrame): The Spark DataFrame to persist.
    - target_path (str): The DBFS or S3 path where the Delta table will be saved.
    - schema_name (str): Hive metastore schema to create/use.
    - table_name (str): Name of the table to register.

    Returns:
    - None
    """
    # Create schema if it doesn't exist
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS hive_metastore.{schema_name}")

    # Write DataFrame as Delta table
    df.write \
      .format("delta") \
      .mode("overwrite") \
      .save(target_path)

    # Register table in Hive metastore
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS hive_metastore.{schema_name}.{table_name}
        USING DELTA
        LOCATION '{target_path}'
    """)

In [0]:
save_and_register_delta_table(
    df=bronze_flat_df,
    target_path="/mnt/bronze/nyc/weather/2025/07/daily_weather",
    schema_name="bronze_nyc",
    table_name="daily_weather"
)

In [0]:
save_and_register_delta_table(
    df=silver_cleaned_df,
    target_path="/mnt/silver/nyc/weather/2025/07/cleaned_daily_weather",
    schema_name="silver_nyc",
    table_name="cleaned_daily_weather"
)

In [0]:
save_and_register_delta_table(
    df=df_taxi_gold_updated,
    target_path="/mnt/gold/nyc/trip_weather/2025/07/final_daily_trip_weather",
    schema_name="gold_nyc",
    table_name="final_daily_trip_weather"
)

In [0]:
display(df_taxi_gold_updated)

In [0]:
%sql
-- View final gold data
SELECT * FROM hive_metastore.gold_nyc.final_daily_trip_weather

# Write data to Redshift

### üîó Test Redshift JDBC Connection from Databricks

In [0]:
# # JDBC connection parameters
# jdbc_url = "jdbc:redshift://<add path>"
# user = "<add user>"
# password = "<add password>"

# # Use a lightweight system table for testing the connection
# table_name = "pg_catalog.pg_tables"  # Redshift system catalog table

# # -----------------------------------------------
# # üöÄ Attempt to connect and read sample data
# # -----------------------------------------------
# try:
#     # Read the system table using Spark JDBC
#     test_df = spark.read \
#         .format("jdbc") \
#         .option("url", jdbc_url) \
#         .option("dbtable", table_name) \
#         .option("user", user) \
#         .option("password", password) \
#         .option("driver", "com.amazon.redshift.jdbc42.Driver") \
#         .load()

#     # Display a few rows to confirm success
#     print("‚úÖ Redshift connection successful. Sample rows:")
#     test_df.show(5)

# except Exception as e:
#     # Handle connection or read errors
#     print("‚ùå Redshift connection failed.")
#     print(e)

In [0]:
# redshift_url = "jdbc:redshift://<add path>"
# redshift_table = "gold_nyc_2.final_daily_trip_weather"
# redshift_properties = {
#     "user": "<add user>",
#     "password": "<add password>",
#     "driver": "com.amazon.redshift.jdbc.Driver"
# }

In [0]:
# Dataframe
# df_taxi_gold_updated.write \
#   .mode("append") \
#   .jdbc(url=redshift_url, table=redshift_table, properties=redshift_properties)

In [0]:
# # table
# df_table = spark.table("nyc_weather_table")

# df_table.write \
#   .mode("append") \
#   .jdbc(url=redshift_url, table=redshift_table, properties=redshift_properties)

## Create Schema and Table via JDB

In [0]:
# from pyspark.sql import SparkSession

# # SQL to create a new database and schema
# create_schema_sql = "CREATE SCHEMA IF NOT EXISTS gold_nyc_2;"
# create_table_sql = """
# CREATE TABLE gold_nyc_2.final_daily_trip_weather (
#     date DATE,
#     trip_count BIGINT,
#     avg_fare DOUBLE PRECISION,
#     total_revenue DOUBLE PRECISION,
#     temperature_max DOUBLE PRECISION,
#     temperature_min DOUBLE PRECISION,
#     precipitation_mm DOUBLE PRECISION,
#     rainfall_category VARCHAR(50)
# );
# """

# # Execute SQL via JVM bridge
# conn = SparkSession.getActiveSession()._sc._jvm.java.sql.DriverManager.getConnection(
#     redshift_url,
#     redshift_properties["user"],
#     redshift_properties["password"]
# )
# stmt = conn.createStatement()
# stmt.execute(create_schema_sql)
# stmt.execute(create_table_sql)
# stmt.close()
# conn.close()

In [0]:
redshift_url = "jdbc:redshift://<add path>"
redshift_table = "gold_nyc.final_daily_trip_weather"
redshift_properties = {
    "user": "<add user>",
    "password": "<add password>",
    "driver": "com.amazon.redshift.jdbc.Driver"
}

In [0]:
from pyspark.sql import SparkSession

# # JDBC connection to Redshift's default database (e.g., dev)
# redshift_url = "jdbc:redshift://<add path>"
# redshift_properties = {
#     "user": "<add user>",
#     "password": "<add password",
#     "driver": "com.amazon.redshift.jdbc.Driver"
# }

# SQL to create a new database and schema
create_schema_sql = "CREATE SCHEMA IF NOT EXISTS gold_nyc;"
create_table_sql = """
CREATE TABLE gold_nyc.final_daily_trip_weather (
    date DATE,
    trip_count BIGINT,
    avg_fare DOUBLE PRECISION,
    total_revenue DOUBLE PRECISION,
    temperature_max DOUBLE PRECISION,
    temperature_min DOUBLE PRECISION,
    precipitation_mm DOUBLE PRECISION,
    rainfall_category VARCHAR(50)
);
"""

# Execute SQL via JVM bridge
conn = SparkSession.getActiveSession()._sc._jvm.java.sql.DriverManager.getConnection(
    redshift_url,
    redshift_properties["user"],
    redshift_properties["password"]
)
stmt = conn.createStatement()
stmt.execute(create_schema_sql)
stmt.execute(create_table_sql)
stmt.close()
conn.close()

In [0]:
# Dataframe
df_taxi_gold_updated.write \
  .mode("append") \
  .jdbc(url=redshift_url, table=redshift_table, properties=redshift_properties)