## Create dimension/fact tables

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.functions import substring
from pyspark.sql.functions import col, unix_timestamp, months_between, round
from pyspark.sql.functions import year, quarter, month, weekofyear, dayofmonth

In [0]:
# read in stations data
stations = spark.read.format("delta") \
    .load("/delta/stations")

# Rename columns
columns = ['station_id', 'name', 'latitude', 'longitude']
dim_stations = stations.toDF(*columns) 
dim_stations.show(5) 

# Write dim_stations to table
dim_stations.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_stations")

In [0]:
# read in riders data
riders = spark.read.format("delta") \
    .load("/delta/riders")

# Rename columns
columns = ['rider_id', 'first_name', 'last_name', 'address', 'birthday', 'account_start_date', 'account_end_date', 'is_member']
dim_riders = riders.toDF(*columns) 
dim_riders.show(5) 

# Write dim_riders to table
dim_riders.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_riders")

In [0]:
# read in payment data
payments = spark.read.format("delta") \
    .load("/delta/payments")

# Rename columns
columns = ['payment_id', 'date', 'amount', 'rider_id']
payments = payments.toDF(*columns) 

# Convert the "date" column to integer type
fact_payments = payments.withColumn("date", payments["date"].cast("string"))  # Ensure it's a string
fact_payments = fact_payments.withColumn("date", regexp_replace(fact_payments["date"], "-", ""))  # Remove '-'
fact_payments = fact_payments.withColumn("date", substring(fact_payments["date"],1,8).cast("long")) 

# rename date to date_id
fact_payments = fact_payments.withColumnRenamed("date", "date_id")
fact_payments = fact_payments.select("payment_id","amount","rider_id","date_id")  # rearrange columns

# Show table
fact_payments.show(5)

# Write fact_payment to table
fact_payments.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_payment")

In [0]:
# read in trips data
trips = spark.read.format("delta") \
    .load("/delta/trips")

# Rename columns
columns = ['trip_id', 'rideable_type', 'start_at', 'end_at', 'start_station_id', 'end_station_id', 'rider_id']
trips = trips.toDF(*columns) 

# create age column
trips = trips.join(dim_riders[["rider_id", "birthday"]], on = "rider_id", how = "left")
trips = trips.withColumn("age", (months_between(trips["start_at"], trips["birthday"]) / 12))
trips = trips.withColumn("age", round(trips["age"], 0))

# Create duration in minutess column
trips = trips.withColumn("duration", (unix_timestamp("end_at") - unix_timestamp("start_at"))/60)
trips = trips.withColumn("duration", round(trips["duration"], 3))

# Create date_id column
trips = trips.withColumn("date_id", trips["start_at"].cast("string"))  # Ensure it's a string
trips = trips.withColumn("date_id", regexp_replace(trips["date_id"], "-", ""))  # Remove '-'
trips = trips.withColumn("date_id", substring(trips["date_id"],1,8).cast("long"))

# Rearrange columns
fact_trips = trips.select("trip_id","rideable_type","start_at","end_at", "duration", "age", "start_station_id", "end_station_id", "rider_id", "date_id")  
fact_trips.show(5)

# Write fact_trips to table
fact_trips.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_trips")

In [0]:
# Create date dimension
date_df = fact_trips.select('start_at').union(payments.select('date'))
date_df = date_df.withColumn("start_at", col("start_at").cast("date"))
date_df = date_df.dropDuplicates(["start_at"])
date_df = date_df.withColumnRenamed("start_at", "date")

# Extract year, quarter, month, week, and day of the month into separate columns
date_df = date_df.withColumn("year", year(date_df["date"]))
date_df = date_df.withColumn("quarter", quarter(date_df["date"]))
date_df = date_df.withColumn("month", month(date_df["date"]))
date_df = date_df.withColumn("week", weekofyear(date_df["date"]))
date_df = date_df.withColumn("day", dayofmonth(date_df["date"]))

# Create date_id column
date_df = date_df.withColumn("date_id", date_df["date"].cast("string"))  # Ensure it's a string
date_df = date_df.withColumn("date_id", regexp_replace(date_df["date_id"], "-", ""))  # Remove '-'
date_df = date_df.withColumn("date_id", substring(date_df["date_id"],1,8).cast("long"))

# Rearrange columns
dim_date = date_df.select("date_id","date","year","quarter", "month", "week", "day")  
dim_date.show(5)

# Write fact_trips to table
dim_date.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_date")