In [None]:
# Extract step
# Copy raw data in DBFS and ingest these data into the delta file system.

# the csv files don't have column names, so we need to specify them according to the schema provided in the project description.
trip_columns= ["trip_id", "rideable_type", "start_at", "ended_at", "start_station_id", "end_station_id", "rider_id"]
station_columns = ["station_id", "name", "latitude", "longitude"]
rider_columns = ["rider_id", "first", "last", "address", "birthday", "account_start_date", "account_end_date", "is_member"]
payment_columns = ["payment_id", "date", "amount", "rider_id"]
csv_files = ["trips", "stations", "riders", "payments"]
columns = [trip_columns, station_columns, rider_columns, payment_columns]
for item, item_columns in zip(csv_files, columns):
    df = spark.read.format("csv") \
        .option("inferSchema", "true") \
        .option("header", "false") \
        .option("sep", ",") \
        .load(f"/FileStore/tables/{item}.csv")
    df = df.toDF(*item_columns)

    df.write.format("delta") \
        .save(f"/delta/bronze/{item}")

In [ ]:
# Load  and transform step
# Save tables in the star schema as tables in a gold folder in the delta file system.
from pyspark.sql.functions import datediff, floor, col, date_format, date_add, lit, unix_timestamp
df_trips = spark.read.format("delta") \
    .load("/delta/bronze/trips")
df_stations = spark.read.format("delta") \
    .load("/delta/bronze/stations")
df_rider = spark.read.format("delta") \
    .load("/delta/bronze/riders")
df_payments = spark.read.format("delta") \
    .load("/delta/bronze/payments")


In [ ]:
# Dim station
df_stations.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dim_station")

In [ ]:
# Dim rider
dim_rider = df_rider \
    .withColumn("age_at_account_start", floor(datediff("account_start_date", "birthday") / 365)) \
    .select("rider_id", "address", "first", "last", "birthday", "is_member", "age_at_account_start")

dim_rider.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dim_rider")



In [ ]:
# Dim date

# Create a range of dates from 2012-01-01 to 2023-12-31
date_range_df = spark.range(0, (365 * 12) + 3).selectExpr("CAST(id AS INT) AS id") 
date_range_df = date_range_df \
    .withColumn("start_date", lit("2012-01-01")) \
    .withColumn("date", date_add("start_date", col("id"))) \
    .select("date")

# Add additional columns
dim_date = date_range_df \
    .withColumn("date_key", date_format("date", "yyyyMMdd").cast("int")) \
    .withColumn("week_day", date_format("date", "E")) \
    .withColumn("month", date_format("date", "M").cast("int")) \
    .withColumn("quarter", date_format("date", "q").cast("int")) \
    .drop("date")

dim_date.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dim_date")



In [ ]:
# fact payments
fact_payments = df_payments \
    .withColumn("date_key", date_format("date", "yyyyMMdd").cast("int")) \
    .withColumnRenamed('date', 'payment_date') \
    .withColumnRenamed('amount', 'payment_amount') \
    .select("rider_id", "payment_date", "date_key", "payment_amount")

fact_payments.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_fact_payments")

In [ ]:
# fact trip
# join df_trips with df_rider
fact_trip = df_trips \
    .join(df_rider, "rider_id", "inner")
fact_trip = fact_trip \
    .withColumn("date_key", date_format("start_at", "yyyyMMdd").cast("int")) \
    .withColumn("start_timestamp", unix_timestamp("start_at")) \
    .withColumn("end_timestamp", unix_timestamp("ended_at")) \
    .withColumn('duration_in_minutes', ((col('end_timestamp') - col('start_timestamp')) / 60).cast("int")) \
    .withColumn("starting_hour", date_format("start_at", "H").cast("int")) \
    .withColumn('rider_age', floor(datediff("start_at", "birthday") / 365)) \
    .withColumnRenamed('start_at', 'started_at') \
    .select("trip_id","start_station_id", "end_station_id", "rider_id",
            "duration_in_minutes", "rider_age", "started_at", "ended_at", "date_key", "starting_hour")

fact_trip.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_fact_trip")