# Divvy Bike Data Lakehouse Project
## 1. Ingesting Raw Data `Bronze Layer`
In this step, we read the raw CSV files from DBFS and write them into Delta format.

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, BooleanType, FloatType, DateType

# Rider schema
rider_schema = StructType([
    StructField("rider_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("birthday", TimestampType(), True),            # was DateType
    StructField("account_start_date", DateType(), True),  # was DateType
    StructField("account_end_date", DateType(), True),    # was DateType
    StructField("is_member", BooleanType(), True)
])

# Trip schema
trip_schema = StructType([
    StructField("trip_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_id", StringType(), True),
    StructField("end_station_id", StringType(), True),
    StructField("rider_id", IntegerType(), True),
])

# Station schema
station_schema = StructType([
    StructField("station_id", StringType(), True),
    StructField("name", StringType(), True),              
    StructField("latitude", FloatType(), True),    
    StructField("longitude", FloatType(), True),
])

# Payment schema
payment_schema = StructType([
    StructField("payment_id", IntegerType(), True),         
    StructField("date", TimestampType(), True),      
    StructField("amount", FloatType(), True),
    StructField("rider_id", IntegerType(), True),                 
])

# Dictionary mapping
filesPath = {
    "rider": ["dbfs:/FileStore/projectDatalake/riders.csv", rider_schema],
    "trip": ["dbfs:/FileStore/projectDatalake/trips.csv", trip_schema],
    "station": ["dbfs:/FileStore/projectDatalake/stations.csv", station_schema],
    "payment": ["dbfs:/FileStore/projectDatalake/payments.csv", payment_schema]
}


for name, (path,schema) in filesPath.items():
    df = (spark.read.format("csv")
          .option("header", "true")
          .schema(schema)
          .load(path))
    
    # Save as Delta table
    df.write.format("delta").mode("overwrite").save(f"/deltaLake/bronze/{name}")
    
    # Verify by displaying first rows
    display(spark.read.format("delta").load(f"/deltaLake/bronze/{name}"))


rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
23344,Curtis,Harris,0309 Cassie Fall Suite 266,1979-12-13T00:00:00Z,2019-06-09,,True
23345,Joshua,Farmer,55226 Brandon Forge Apt. 718,1990-07-25T00:00:00Z,2020-10-16,2021-07-01,True
23346,Jill,Rangel,616 Maurice Forges Suite 888,1988-03-26T00:00:00Z,2021-01-10,,True
23347,Jamie,Evans,33245 Shane Walks,1973-10-10T00:00:00Z,2019-01-16,2021-03-01,True
23348,Shirley,Gomez,57751 English Lake,2004-01-22T00:00:00Z,2020-02-01,,True
23349,Michael,Schultz,976 Tracy Rest,1986-11-05T00:00:00Z,2016-03-06,2017-03-01,True
23350,Nathan,Baker,554 Audrey Glen Apt. 976,1984-10-12T00:00:00Z,2013-05-03,,True
23351,Shaun,Casey,886 Joanna Unions,1990-11-07T00:00:00Z,2014-03-23,,False
23352,Jennifer,Day,3731 Hayes Rest Apt. 371,1994-08-18T00:00:00Z,2019-05-05,2021-10-01,True
23353,Troy,Gross,93349 Joshua Bridge,1962-12-08T00:00:00Z,2018-09-27,,True


trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38Z,2021-02-14T18:12:09Z,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18Z,2021-02-09T19:19:10Z,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41Z,2021-02-02T17:54:06Z,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23Z,2021-02-23T15:22:37Z,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24T15:43:33Z,2021-02-24T15:49:05Z,18003,KP1705001026,36267
A772742351171257,classic_bike,2021-02-01T17:47:42Z,2021-02-01T17:48:33Z,KP1705001026,KP1705001026,50104
295476889D9B79F8,classic_bike,2021-02-11T18:33:53Z,2021-02-11T18:35:09Z,18003,18003,19618
362087194BA4CC9A,classic_bike,2021-02-27T15:13:39Z,2021-02-27T15:36:36Z,KP1705001026,KP1705001026,16732
21630F715038CCB0,classic_bike,2021-02-20T08:59:42Z,2021-02-20T09:17:04Z,KP1705001026,KP1705001026,57068
A977EB7FE7F5CD3A,classic_bike,2021-02-20T08:58:16Z,2021-02-20T08:58:41Z,KP1705001026,KP1705001026,32712


station_id,name,latitude,longitude
KA1503000012,Clark St & Lake St,41.885796,-87.6311
637,Wood St & Chicago Ave,41.895634,-87.672066
13216,State St & 33rd St,41.834732,-87.625824
18003,Fairbanks St & Superior St,41.89581,-87.620255
KP1705001026,LaSalle Dr & Huron St,41.89488,-87.632324
13253,Lincoln Ave & Waveland Ave,41.948795,-87.67528
KA1503000044,Rush St & Hubbard St,41.890175,-87.62618
KA1504000140,Winchester Ave & Elston Ave,41.924038,-87.676414
TA1305000032,Clinton St & Madison St,41.88224,-87.64107
TA1306000012,Wells St & Huron St,41.894753,-87.6344


payment_id,date,amount,rider_id
1496876,2020-04-01T00:00:00Z,9.0,58830
1496877,2020-05-01T00:00:00Z,9.0,58830
1496878,2020-06-01T00:00:00Z,9.0,58830
1496879,2020-07-01T00:00:00Z,9.0,58830
1496880,2020-08-01T00:00:00Z,9.0,58830
1496881,2020-09-01T00:00:00Z,9.0,58830
1496882,2020-10-01T00:00:00Z,9.0,58830
1496883,2020-11-01T00:00:00Z,9.0,58830
1496884,2020-12-01T00:00:00Z,9.0,58830
1496885,2021-01-01T00:00:00Z,9.0,58830


## 2. Staging Step  `Silver Layer`
The **staging step** is where we prepare the raw ingested data for transformations.  

In [0]:
from pyspark.sql.functions import col, to_date, datediff

# -------- Riders --------
df_rider_bronze = spark.read.format("delta").load("/deltaLake/bronze/rider")

df_rider_silver = (
    df_rider_bronze
    .dropDuplicates(["rider_id"])
    .filter(col("rider_id").isNotNull())
    .withColumnRenamed("birthday", "birth_date")
)

df_rider_silver.write.format("delta").mode("overwrite").save("/deltaLake/silver/rider")


# -------- Stations --------
df_station_bronze = spark.read.format("delta").load("/deltaLake/bronze/station")

df_station_silver = (
    df_station_bronze
    .dropDuplicates(["station_id"])
    .filter(col("station_id").isNotNull())
)

df_station_silver.write.format("delta").mode("overwrite").save("/deltaLake/silver/station")


# -------- Trips --------
df_trip_bronze = spark.read.format("delta").load("/deltaLake/bronze/trip")

df_trip_silver = (
    df_trip_bronze
    .dropDuplicates(["trip_id"])
    .filter((col("rider_id").isNotNull()))
)

df_trip_silver.write.format("delta").mode("overwrite").save("/deltaLake/silver/trip")


# -------- Payments --------
df_payment_bronze = spark.read.format("delta").load("/deltaLake/bronze/payment")

df_payment_silver = (
    df_payment_bronze
    .dropDuplicates(["payment_id"])
    .filter((col("rider_id").isNotNull()) & (col("amount").isNotNull()))
    .withColumn("amount", col("amount").cast("double"))
    .withColumnRenamed("date","payment_date")
)

df_payment_silver.write.format("delta").mode("overwrite").save("/deltaLake/silver/payment")


## The Star Schema `Gold Layer`
it’s been ingested, cleaned, conformed, and aggregated so it is reliable, and tailored for analytical needs. 

In [0]:
from pyspark.sql.functions import *


# ----------------------- silver layer DataFrames loaded -------------------
df_rider_silver = spark.read.format("delta").load("/deltaLake/silver/rider")
df_trip_silver = spark.read.format("delta").load("/deltaLake/silver/trip")
df_station_silver = spark.read.format("delta").load("/deltaLake/silver/station")
df_payment_silver = spark.read.format("delta").load("/deltaLake/silver/payment")


# Gather all dates
all_dates = df_payment_silver.select(col("payment_date").alias("full_date")) \
            .union(df_trip_silver.select(col("started_at").alias("full_date"))) \
            .union(df_trip_silver.select(col("ended_at").alias("full_date"))) \
            .filter(col("full_date").isNotNull()) \
            .distinct()

# dim_date
dim_date = all_dates \
            .withColumn("date_id", date_format(col("full_date"), "yyyyMMdd").cast("int")) \
            .withColumn("date", date_format(col("full_date"), ")")) \
            .withColumn("FullDateTime", to_timestamp(col("full_date"), "yyyy-MM-dd HH:mm:ss")) \
            .withColumn("year", year(col("full_date"))) \
            .withColumn("quarter", quarter(col("full_date"))) \
            .withColumn("month", month(col("full_date"))) \
            .withColumn("month_name", date_format(col("full_date"), "MMMM")) \
            .withColumn("day", dayofmonth(col("full_date"))) \
            .withColumn("day_name", date_format(col("full_date"), "EEEE")) \
            .orderBy("full_date")

# Save to gold layer
dim_date.write.format("delta").mode("overwrite").save("/deltaLake/gold/dim_date")

# dim_station
dim_station = df_station_silver.select("station_id", "name", "latitude", "longitude")

dim_station.write.format("delta").mode("overwrite").save("/deltaLake/gold/dim_station")

# dim_rider
dim_rider = df_rider_silver.select("rider_id", "first_name", "last_name", "address", "birth_date", "account_start_date", "account_end_date", "is_member")

dim_rider.write.format("delta").mode("overwrite").save("/deltaLake/gold/dim_rider")

# fact_payment
fact_payment = df_payment_silver \
               .join(dim_date,df_payment_silver["payment_date"] == dim_date["FullDateTime"], "inner") \
               .join(df_rider_silver, "rider_id" , "inner") \
               .select(
                   df_payment_silver["payment_id"],
                   dim_date["date_id"].alias("date_id"),
                   df_payment_silver["amount"],
                   df_rider_silver["rider_id"]
                   )

fact_payment.write.format("delta").mode("overwrite").save("/deltaLake/gold/fact_payment")

#  fact_trip
fact_trip = df_trip_silver \
    .join(dim_date.alias("d1"), df_trip_silver["started_at"] == col("d1.FullDateTime"), "inner") \
    .join(dim_date.alias("d2"), df_trip_silver["ended_at"] == col("d2.FullDateTime"), "inner") \
    .join(dim_station.alias("s1"), df_trip_silver["start_station_id"] == col("s1.station_id"), "inner") \
    .join(dim_station.alias("s2"), df_trip_silver["end_station_id"] == col("s2.station_id"), "inner") \
    .join(df_rider_silver, "rider_id", "inner") \
    .select(
        df_trip_silver["trip_id"],
        col("d1.date_id").alias("start_date_id"),
        col("d2.date_id").alias("end_date_id"),
        col("s1.station_id").alias("start_station_id"),
        col("s2.station_id").alias("end_station_id"),
        df_rider_silver["rider_id"],
        df_trip_silver["rideable_type"],
        df_trip_silver["started_at"],
        df_trip_silver["ended_at"],
        df_rider_silver["birth_date"]
    ) \
    .withColumn("trip_duration", round((unix_timestamp(col("ended_at")) - unix_timestamp(col("started_at"))) / 60)) \
    .withColumn("rider_age", floor(months_between(col("started_at"), col("birth_date")) / 12))

fact_trip.write.format("delta").mode("overwrite").save("/deltaLake/gold/fact_trip")


# Gold Layer - Convert to saveAsTable()

# Dimension tables
dim_rider.write.format("delta").mode("overwrite").saveAsTable("dim_rider")
dim_station.write.format("delta").mode("overwrite").saveAsTable("dim_station") 
dim_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")

# Fact tables
fact_trip.write.format("delta").mode("overwrite").saveAsTable("fact_trip")
fact_payment.write.format("delta").mode("overwrite").saveAsTable("fact_payment")

