In [0]:
%fs ls /Volumes/bikeshare-dc-data/bikeshare_schema/raw_landing_zone/dc_share_data

In [0]:
# read in the bronze data and then perform ops

bikeshare_bronze_df = spark.read.table("`bikeshare-dc-data`.bronze.dc_rideshare_bt")


In [0]:
%sql
-- Verify starting number of rows
USE CATALOG `bikeshare-dc-data`;
USE SCHEMA bronze;
SELECT COUNT(*) AS num_rows
FROM dc_rideshare_bt;

In [0]:
"""
Remove Duplicates

- Deduplicate based on ride_id (primary key)
- Check for duplicate rides with identical start/end times and locations
"""
bikeshare_bronze_df = bikeshare_bronze_df.dropDuplicates(["ride_id"])

bikeshare_bronze_df.count()


In [0]:
from pyspark.sql.functions import col, when
import pyspark.pandas as ps

def station_validation(df):
    validated_df = df.withColumn(
        "is_valid",
        when(
            (col("start_station_id").isNotNull()) &
            (col("end_station_id").isNotNull()), True

        ).otherwise(False)
    )
    return validated_df

In [0]:
"""
Handle Null Values
Drop records where critical fields are null: ride_id, started_at, ended_at
For member_casual, either drop nulls or set a default value like "unknown"
Validate start_station_id, end_station_id are not null (or flag incomplete rides)
"""

check_valid_rides_df =  station_validation(bikeshare_bronze_df)
check_valid_rides_df = check_valid_rides_df.where(check_valid_rides_df.is_valid == 'True')
display(check_valid_rides_df)



In [0]:
"""
Data Type Validation & Casting
Ensure started_at and ended_at are proper timestamp types
Ensure start_station_id and end_station_id are consistent integer types
"""
check_valid_rides_df = check_valid_rides_df.withColumn("started_at", col("started_at").cast("timestamp"))
check_valid_rides_df = check_valid_rides_df.withColumn("ended_at", col("ended_at").cast("timestamp"))
check_valid_rides_df = check_valid_rides_df.withColumn("start_station_id", col("start_station_id").cast("integer"))
check_valid_rides_df = check_valid_rides_df.withColumn("end_station_id", col("end_station_id").cast("integer"))



In [0]:
"""
Validate start_lat, start_lng, end_lat, end_lng are within valid ranges (DC coordinates)
"""
check_valid_rides_df = check_valid_rides_df.filter(col("start_lat").between(38.8, 39.9))
check_valid_rides_df = check_valid_rides_df.filter(col("end_lat").between(38.8, 39.9))
check_valid_rides_df = check_valid_rides_df.filter(col("start_lng").between(-77.2, -76.9))
check_valid_rides_df = check_valid_rides_df.filter(col("end_lng").between(-77.2, -76.9))

In [0]:
%sql
USE CATALOG `bikeshare-dc-data`;
CREATE SCHEMA IF NOT EXISTS silver;

In [0]:
check_valid_rides_df.count()

In [0]:
# write bronze content to silver table
check_valid_rides_df.write.format("delta").option("checkpointLocation", "/Volumes/bikeshare-dc-data/bikeshare_schema/silver").mode("overwrite").saveAsTable("`bikeshare-dc-data`.silver.dc_rideshare_st")


In [0]:
%sql
USE CATALOG `bikeshare-dc-data`;
USE SCHEMA silver;

DESCRIBE EXTENDED dc_rideshare_st;

# SILVER SECTION

### Data Enrichment

In [0]:
from pyspark.sql import functions as F

In [0]:
"""
Calculate Derived Metrics
ride_duration: (ended_at - started_at) in minutes/seconds
ride_distance: Haversine distance between start/end coordinates
day_of_week: Extract from started_at
hour_of_day: Extract hour from started_at
is_weekend: Boolean flag
ride_month, ride_year: For partitioning
"""
bikeshare_silver_df = spark.read.table("`bikeshare-dc-data`.silver.dc_rideshare_st")
# need to extract the seconds property from the struct thats in the ride_duration column

bikeshare_silver_df = bikeshare_silver_df.withColumn("ride_duration (minutes)", F.timestamp_diff("MINUTE", F.col("started_at"), F.col("ended_at")))

# day of the week from started_at
bikeshare_silver_df = bikeshare_silver_df.withColumn("day_of_week", F.dayofweek(F.col("started_at")))
# hour of day
bikeshare_silver_df = bikeshare_silver_df.withColumn("hour_of_day", F.hour(F.col("started_at")))

# determine if day is weekedend
bikeshare_silver_df = bikeshare_silver_df.withColumn("is_weekend", F.when(F.col("day_of_week").isin([6,7]), True).otherwise(False))
# extract month and year from started_at
bikeshare_silver_df = bikeshare_silver_df.withColumn("ride_month", F.month(F.col("started_at")))
bikeshare_silver_df = bikeshare_silver_df.withColumn("ride_year", F.year(F.col("started_at")))
# bikeshare_silver_df = bikeshare_silver_df.withColumn("ride_duration", F.col("ride_duration").cast("interval minute to second"))
display(bikeshare_silver_df)