**Dimension Table: Station**

In [0]:
# Station Dimension Table: dim_station
dim_station = spark.read.table("staging_station")
# Save as a table in Delta
dim_station.write.format('delta').mode('overwrite').saveAsTable('dim_station')

**Dimension Table: Rider**

In [0]:
from pyspark.sql.functions import col, when, year, to_date
# Load the staging_rider table
df_rider = spark.read.table("staging_rider")
# Transform the data for dim_rider
dim_rider = df_rider.select(
    col("rider_id"),
    col("first"),
    col("last"),
    to_date(col("birthday"), "yyyy-MM-dd").alias("birthday"),
    to_date(col("account_start_date"), "yyyy-MM-dd").alias("account_start_date"),
    (year(col("account_start_date")) - year(col("birthday"))).alias("age_at_account_start"),
    when(col("is_member") == 1, "Member").otherwise("Casual").alias("membership_status")
)

# Write the data to the dim_rider table to a Delta table
dim_rider.write.format('delta').mode('overwrite').saveAsTable('dim_rider')

**Dimension Table: Date**

In [0]:
from pyspark.sql.functions import col, date_format, date_add, date_sub, row_number
from pyspark.sql.window import Window

# Load the staging_payment table
df_payment = spark.read.table("staging_payment")
# Drop any rows where date is null
df_payment = df_payment.dropna(subset=["date"])
# Extract the minimum and maximum dates
min_date = df_payment.agg({"date": "min"}).collect()[0][0]
max_date = df_payment.agg({"date": "max"}).collect()[0][0]
# Generate a sequence of dates from min_date to max_date
date_range_df = spark.range(0, (max_date - min_date).days + 1).selectExpr(f"date_add('{min_date}', CAST(id AS INT)) as date")
# Create the dim_date DataFrame with date_id starting from 1 to n
dim_date = date_range_df.withColumn("date_id", row_number().over(Window.orderBy("date"))) \
    .withColumn("day_of_week", date_format(col("date"), "EEEE")) \
    .withColumn("month", date_format(col("date"), "MM").cast("int")) \
    .withColumn("quarter", date_format(col("date"), "Q").cast("int")) \
    .withColumn("year", date_format(col("date"), "yyyy").cast("int"))

# Write the data to the dim_date table to a Delta table
dim_date.write.format('delta').mode('overwrite').saveAsTable('dim_date')

**Fact Table: Payment**

In [0]:
# Load the staging_payment table
df_payment = spark.read.table("staging_payment")
# Convert the payment date to a date type
df_payment = df_payment.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

In [0]:
# Join staging_payment with dim_date to get the date_id
fact_payment_df = df_payment.join(
    dim_date,
    df_payment["date"] == dim_date["date"],
    "inner"
).select(
    col("payment_id"),
    col("date_id").alias("payment_date_id"),
    col("amount"),
    col("rider_id")
)

In [0]:
# Join fact_payment_df with dim_rider
fact_payment = fact_payment_df.join(
    dim_rider,
    fact_payment_df["rider_id"] == dim_rider["rider_id"],
    "inner"
).select(
    fact_payment_df["payment_id"],
    fact_payment_df["payment_date_id"],
    fact_payment_df["amount"],
    fact_payment_df["rider_id"]  # Explicitly selecting rider_id from fact_payment_df
)


# Write the fact_payment DataFrame to a Delta table
fact_payment.write.format("delta").mode("overwrite").saveAsTable("fact_payment")

**Fact Table: Trip**

In [0]:
from pyspark.sql.functions import unix_timestamp, to_date, expr, substring, year

# Load the staging_trip table
df_trip = spark.read.table("staging_trip")

# Truncate datetime strings to 'YYYY-MM-DD HH:MM:SS' format and convert to timestamp
df_trip = df_trip.withColumn("started_at", substring("started_at", 1, 19).cast("timestamp")) \
                 .withColumn("ended_at", substring("ended_at", 1, 19).cast("timestamp"))

# Join staging_trip with dim_date to get the trip_date_id
fact_trip_df = df_trip.join(
    dim_date,
    to_date(df_trip["started_at"]) == dim_date["date"],
    "inner"
).select(
    "trip_id",
    "rider_id",
    "start_station_id",
    "end_station_id",
    "started_at",  # Include started_at for later age calculation
    dim_date["date_id"].alias("trip_date_id"),
    ((unix_timestamp("ended_at") - unix_timestamp("started_at")) / 60).alias("duration")  # Duration in minutes
)

# Join with dim_rider to ensure the rider exists in dim_rider
fact_trip = fact_trip_df.join(
    dim_rider,
    fact_trip_df["rider_id"] == dim_rider["rider_id"],
    "inner"
).select(
    "trip_id",
    fact_trip_df["rider_id"],
    "start_station_id",
    "end_station_id",
    "trip_date_id",
    "duration",
    (year(fact_trip_df["started_at"]) - year(dim_rider["birthday"])).alias("rider_age")
)

# Show the first 3 records to verify the data
fact_trip.show(3)

+----------------+--------+----------------+--------------+------------+-----------------+---------+
|         trip_id|rider_id|start_station_id|end_station_id|trip_date_id|         duration|rider_age|
+----------------+--------+----------------+--------------+------------+-----------------+---------+
|222BB8E5059252D7|   34062|    KA1503000064|         13021|        3055|             18.6|       30|
|1826E16CB5486018|    5342|    TA1306000010|         13021|        3063|5.266666666666667|       26|
|3D9B6A0A5330B04D|    3714|    TA1305000030|         13021|        3060|5.333333333333333|       26|
+----------------+--------+----------------+--------------+------------+-----------------+---------+
only showing top 3 rows



In [0]:
# Write the fact_trip DataFrame to a Delta table
fact_trip.write.format("delta").mode("overwrite").saveAsTable("fact_trip")