#Read in csv files and turn into delta

#To Payments delta


In [0]:
payments_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/payments.csv")

columns = ['payment_id','date','amount','account_number']
payments_df = payments_df.toDF(*columns)

payments_df.write.format("delta").mode("overwrite").save("/delta/payments_delta")

payments_df.printSchema()

root
 |-- payment_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- account_number: string (nullable = true)



#To Trips delta


In [0]:
trips_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/trips.csv")

columns = ['trip_id','rideable_type','started_at','ended_at','start_station_id','end_station_id','rider_id']
trips_df = trips_df.toDF(*columns)

trips_df.write.format("delta").mode("overwrite").save("/delta/trips_delta")
trips_df.printSchema()



root
 |-- trip_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- ended_at: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- rider_id: string (nullable = true)



#To Riders delta


In [0]:
riders_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/riders.csv")
    
columns = ['rider_id','first','last','address','birthday','account_start_date','account_end_date','is_member']
riders_df = riders_df.toDF(*columns)

riders_df.write.format("delta").mode("overwrite").save("/delta/riders_delta")
riders_df.printSchema()


root
 |-- rider_id: string (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- account_start_date: string (nullable = true)
 |-- account_end_date: string (nullable = true)
 |-- is_member: string (nullable = true)



#To Stations delta


In [0]:

stations_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/stations.csv")

columns = ['station_id','name','latitude','longitude']
stations_df = stations_df.toDF(*columns)

stations_df.write.format("delta").mode("overwrite").save("/delta/stations_delta")

In [0]:
#delete tables
dbutils.fs.rm("/delta/trips_delta", recurse=True)

#show
dbutils.fs.ls("/delta/")

True

#Riders and Stations to dim

In [0]:
riders_df = spark.read.format("delta").load("/delta/riders_delta")
riders_df.write.format("delta").mode("overwrite").save("/delta/riders_dim")

stations_df = spark.read.format("delta").load("/delta/stations_delta")
stations_df.write.format("delta").mode("overwrite").save("/delta/stations_dim")

#Create Time Table, dim

In [0]:
#creates time table
# 
# 
# 
from pyspark.sql.functions import col, to_date, explode, sequence, year, month, dayofweek, quarter

# Generate a date range
start_date = "2020-01-01"
end_date = "2022-12-31"

dates_df = spark.sql(f"""
    SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) AS date
""")

# Add date attributes properly using functions
time_dim_df = dates_df \
    .withColumn("year", year(col("date"))) \
    .withColumn("month", month(col("date"))) \
    .withColumn("day_of_week", dayofweek(col("date"))) \
    .withColumn("quarter", quarter(col("date")))

# Save to delta
time_dim_df.write.format("delta").mode("overwrite").save("/delta/time_dim")

#Trip delta to dim, with trip duration

In [0]:
#Creates column of trip with minutes duration and adds to trips
# 
# 
from pyspark.sql.functions import to_timestamp, unix_timestamp, col

trips_df = spark.read.format("delta").load("/delta/trips_delta")

# Calculate duration in minutes
from pyspark.sql.functions import unix_timestamp

trips_df = trips_df.withColumn(
    "trip_duration_minutes",
    (unix_timestamp("ended_at") - unix_timestamp("started_at")) / 60
)

# Save trips fact table
trips_df.write.format("delta").mode("overwrite").save("/delta/trips_dim")

#Joins payments dim on date table

In [0]:
# 1. Read payments_delta table
payments_df = spark.read.format("delta").load("/delta/payments_delta")

# 2. Load your time_dim table
time_dim = spark.read.format("delta").load("/delta/time_dim")

# 3. Prepare for join with payments date. For some reason GenAI suggested to copy the date column
time_dim_payments_dim = time_dim.withColumnRenamed("date", "date_for_payments") 

# 4. Join for the new time table with the payments table
payments_df = payments_df.join(
    time_dim_payments_dim,
    payments_df.date == time_dim_payments_dim.date_for_payments,
    "left"
)

# Save payments fact table
payments_df.write.format("delta").mode("overwrite").save("/delta/payments_dim")

#Joins Trips dim on date table


In [0]:
#Based on date and time factors such as day of week and time of day
# 
# 

from pyspark.sql.functions import to_date, col
trips_df = spark.read.format("delta").load("/delta/trips_dim")

# 1. Extract date from start_time 
#trips_df = trips_df.withColumn("start_date", to_date("started_at")) 

# 2. Load your time_dim table
time_dim = spark.read.format("delta").load("/delta/time_dim")

# 3.  Prepare for join with trips start date. For some reason GenAI suggested to copy the date column
start_time_dim = time_dim.withColumnRenamed("date", "start_date_temp")  

# 4. Join for start_time_id
trips_df = trips_df.join(
    start_time_dim,
    trips_df.started_at == start_time_dim.start_date_temp,
    "left"
)

# 5. Prepare for join with end_time
#end_time_dim = time_dim.withColumnRenamed("date", "end_date_temp") \
 #                      .withColumnRenamed("time_id", "end_time_id") 

# 6. Join for end_time_id
#trips_df = trips_df.join(
  #  end_time_dim,
 #   trips_df.end_date == end_time_dim.end_date_temp,
  #  "left"
#)

#Create trips view for SQL

In [0]:
trips_df.createOrReplaceTempView("trips_view")

In [0]:
%sql
DESCRIBE trips_view;

col_name,data_type,comment
trip_id,string,
rideable_type,string,
started_at,string,
ended_at,string,
start_station_id,string,
end_station_id,string,
rider_id,string,
trip_duration_minutes,double,
start_date_temp,date,
year,int,


#Query to group by day of week


In [0]:
%sql
--Query to group by hour of day

SELECT  COUNT(*) AS trip_count, AVG(trip_duration_minutes) AS avg_duration
FROM trips_view
GROUP BY day_of_week
ORDER BY day_of_week;


trip_count,avg_duration
713526,27.825023334818823
576105,20.622928632801287
604777,18.176753332219565
616336,18.035415828379502
598289,18.24242280904365
653418,20.80576866569332
822470,26.19423184634876


#Query to group by hour of day

In [0]:

%sql


SELECT  HOUR(started_at) AS hour_of_day, COUNT(*) AS trip_count, AVG(trip_duration_minutes) AS avg_duration
FROM trips_view
GROUP BY hour_of_day
ORDER BY hour_of_day;

hour_of_day,trip_count,avg_duration
0,67744,26.179483545898247
1,47310,29.96362396956245
2,28751,33.69535900200584
3,15372,34.422843481654965
4,12719,31.05885944911809
5,34706,14.04947031253002
6,90930,13.521314747608075
7,166642,13.863389281613737
8,199719,14.77647661631257
9,169069,18.42869351172209


#Query to group by start station

In [0]:


%sql
--Based on which station is the starting and / or ending station


SELECT start_station_id, COUNT(*) AS trip_count, AVG(trip_duration_minutes) AS avg_duration
FROM trips_view
GROUP BY start_station_id
ORDER BY start_station_id;

start_station_id,trip_count,avg_duration
13001,24369,32.94646135117016
13006,9856,22.37495941558442
13008,40505,48.27992346623867
13011,16776,18.8961631696074
13016,28497,23.801775625504444
13017,14247,19.44874476497977
13021,17212,12.894366333565731
13022,80344,38.98157837963413
13028,8196,15.07985602733038
13029,8359,33.619882761095816


#Query to group by end station

In [0]:

%sql


SELECT end_station_id, COUNT(*) AS trip_count, AVG(trip_duration_minutes) AS avg_duration
FROM trips_view
GROUP BY end_station_id
ORDER BY end_station_id;

end_station_id,trip_count,avg_duration
13001,24160,27.92953228476822
13006,9812,17.163595597227893
13008,41766,37.46783867260459
13011,16043,15.40573978266742
13016,29630,21.88758521768478
13017,13740,13.590043668122275
13021,17173,13.212423571886104
13022,81840,38.4206698028674
13028,8233,13.49276084051987
13029,6896,35.002992072699165


#Based on age of the rider at time of the ride

In [0]:


from pyspark.sql.functions import datediff, floor
from pyspark.sql.functions import count
#selects, where it only creates a table of rider id and birthday (to join on trips later)
riders_birthdays_df = spark.read.format("delta").load("/delta/riders_dim").select("rider_id", "birthday")

# Load trips
trips_df = spark.read.format("delta").load("/delta/trips_dim")

# Join only with rider_id and birthday included
trips_with_birthday = trips_df.join(riders_birthdays_df, "rider_id", "left")

# Assuming 'birthday' and 'start_time' are timestamps/dates
trips_with_ages = trips_with_birthday.withColumn("age_at_ride",
               (datediff("started_at", "birthday") / 365.25).cast("int"))

from pyspark.sql.functions import when, count, avg

#creates age buckets
trips_with_age_groups = trips_with_ages.withColumn(
    "age_group",
    when(col("age_at_ride") < 20, "Under 20")
    .when(col("age_at_ride").between(20, 29), "20s")
    .when(col("age_at_ride").between(30, 39), "30s")
    .when(col("age_at_ride").between(40, 49), "40s")
    .when(col("age_at_ride").between(50, 59), "50s")
    .otherwise("60+")
)

#query for count and duration average by age group
trips_with_age_groups.groupBy("age_group").agg(
    count("*").alias("trips"),
    avg("trip_duration_minutes").alias("avg_duration")
).orderBy("trips", ascending=False).show()



+---------+-------+------------------+
|age_group|  trips|      avg_duration|
+---------+-------+------------------+
|      20s|1541344| 21.86783813347302|
|      30s|1353770| 21.94270623271809|
|      40s| 728898|21.413515471300634|
| Under 20| 595304| 21.95564171134957|
|      50s| 284199|21.321180287990707|
|      60+|  81406| 21.46569356067118|
+---------+-------+------------------+



#Based on whether the rider is a member or a casual rider

In [0]:

# # Assuming 'is_member' is a boolean in riders
#selects, where it only creates a table of rider id and birthday (to join on trips later)
riders_memberships_df = spark.read.format("delta").load("/delta/riders_dim").select("rider_id", "is_member")

# Load trips
trips_df = spark.read.format("delta").load("/delta/trips_dim")

# Join only with rider_id and birthdate included
trips_with_memberships = trips_df.join(riders_memberships_df, "rider_id", "left")

trips_with_memberships.groupBy("is_member").agg(
    count("*").alias("trips"),
    avg("trip_duration_minutes").alias("avg_duration")
).show()

+---------+-------+------------------+
|is_member|  trips|      avg_duration|
+---------+-------+------------------+
|    False| 918615|21.323787622308135|
|     True|3666306| 21.90442686726103|
+---------+-------+------------------+



In [0]:
payments_df = spark.read.format("delta").load("/delta/payments_dim")
payments_df.createOrReplaceTempView("payments_view")

#Analyze how much money is spent
#Per month

In [0]:
%sql
SELECT  month,SUM(amount) AS payment_amt
FROM payments_view
GROUP BY month
ORDER BY month

month,payment_amt
,7870587.150000028
1.0,1329161.9999999988
2.0,1360419.1400000025
3.0,781150.5399999997
4.0,803825.5499999997
5.0,826727.5200000003
6.0,851521.8599999989
7.0,875447.1600000017
8.0,901677.31
9.0,926569.8700000012


#Per quarter

Analyze how much money is spent
Per month, quarter, year

In [0]:
%sql
SELECT  quarter,SUM(amount) AS payment_amt
FROM payments_view
GROUP BY quarter
ORDER BY quarter

quarter,payment_amt
,7870587.149999959
1.0,3470731.6800000146
2.0,2482074.9300000016
3.0,2703694.339999998
4.0,2930017.1499999957


#per year

In [0]:
%sql
SELECT  year,SUM(amount) AS payment_amt
FROM payments_view
GROUP BY year
ORDER BY year


year,payment_amt
,7870587.150000028
2020.0,4315449.400000006
2021.0,6081098.249999958
2022.0,1189970.4499999986


#By Age group, can total up total amount spent

In [0]:
from pyspark.sql.functions import datediff, floor
from pyspark.sql.functions import count, sum, round
#selects, where it only creates a table of rider id and birthday (to join on trips later)
riders_birthdays_df = spark.read.format("delta").load("/delta/riders_dim").select("rider_id", "birthday")

# Load payments
payments_df = spark.read.format("delta").load("/delta/payments_dim")

# Join only with rider_id and birthday included
payments_with_birthday = riders_birthdays_df.join(payments_df, \
                riders_birthdays_df.rider_id  == payments_df.account_number , "left")

# Assuming 'birthday' and 'start_time' are timestamps/dates
payments_with_ages = payments_with_birthday.withColumn("age_at_ride",
               (datediff("date", "birthday") / 365.25).cast("int"))



from pyspark.sql.functions import when, count, avg

#creates age buckets
payments_with_age_groups = payments_with_ages.withColumn(
    "age_group",
    when(col("age_at_ride") < 20, "Under 20")
    .when(col("age_at_ride").between(20, 29), "20s")
    .when(col("age_at_ride").between(30, 39), "30s")
    .when(col("age_at_ride").between(40, 49), "40s")
    .when(col("age_at_ride").between(50, 59), "50s")
    .otherwise("60+")
)



#in case they were asking for total amount spent per member, ordered by age


In [0]:

#query for sum 
payments_with_by_member_ordered_by_age = payments_with_ages.groupBy("age_at_ride").agg(
    sum("amount").alias("amount_spent")
).orderBy("age_at_ride", ascending=True).show()

+-----------+------------------+
|age_at_ride|      amount_spent|
+-----------+------------------+
|       NULL|              NULL|
|          7|            946.84|
|          8| 5601.849999999999|
|          9|16281.889999999998|
|         10|35732.130000000005|
|         11| 67595.26000000001|
|         12|         117477.69|
|         13|196032.96000000002|
|         14|         309318.64|
|         15|464579.09999999986|
|         16|         569652.99|
|         17| 569586.3000000009|
|         18|         587365.28|
|         19| 607406.4500000005|
|         20| 627883.9600000002|
|         21| 642158.6200000006|
|         22|         658136.13|
|         23| 658877.2400000015|
|         24| 664448.7700000001|
|         25| 656714.3499999999|
+-----------+------------------+
only showing top 20 rows


#EXTRA CREDIT - Analyze how much money is spent per member
Based on how many rides the rider averages per month (average amount spent per member, on a ride)


In [0]:
#query for average per rider 
payments_with_by_member_ordered_by_age = payments_with_ages.groupBy("account_number").agg(
    avg("amount").alias("avg_ride_spend_amount")
).orderBy("avg_ride_spend_amount", ascending=True).show()

+--------------+---------------------+
|account_number|avg_ride_spend_amount|
+--------------+---------------------+
|          NULL|                 NULL|
|         43637|                 3.02|
|         63104|                 3.02|
|         26436|                 3.04|
|          9438|                 3.07|
|         59753|                 3.13|
|         68947|                 3.15|
|         18703|                 3.17|
|         26633|                 3.18|
|         70563|                 3.19|
|         14096|                 3.25|
|          9000|                 3.25|
|         61615|                 3.25|
|         62159|                 3.28|
|         21040|                 3.29|
|         64084|                  3.3|
|         59169|                 3.31|
|          2133|                 3.39|
|         22753|                 3.46|
|         28443|                 3.48|
+--------------+---------------------+
only showing top 20 rows


#Based on how many minutes the rider spends on a bike per month (average amount a member spends per month, and include average minutes per month)

In [0]:
# select out the trips
trips_with_minutes_df = spark.read.format("delta").load("/delta/trips_dim").select("rider_id", "trip_duration_minutes")

# Join the new payments table on trips to get the minutes
payments_with_trip_minutes = payments_df.join(trips_with_minutes_df, \
                payments_df.account_number  == trips_with_minutes_df.rider_id , "left")
                
                
#query for sum per month for members
payments_with_monthly_sum = payments_with_trip_minutes.groupBy("account_number","year","month").agg(
    sum("amount").alias("monthly_spend"), sum("trip_duration_minutes").alias("monthly_minutes")
)

#query for average per month per member
payments_with_monthly_average = payments_with_monthly_sum.groupBy("account_number").agg(
    round(avg("monthly_spend"),2).alias("monthly_avg_spend"), round(avg("monthly_minutes")).alias("monthly_avg_minutes")
).show()



+--------------+-----------------+-------------------+
|account_number|monthly_avg_spend|monthly_avg_minutes|
+--------------+-----------------+-------------------+
|         45273|            801.0|             1287.0|
|         31518|           2509.0|             3859.0|
|          4821|          3791.67|             5810.0|
|         13610|           3528.0|            10604.0|
|         55321|          1551.84|             1321.0|
|         19132|          10205.1|            41800.0|
|         70962|           1098.0|             2808.0|
|         16504|            915.0|             1362.0|
|         39581|          5072.52|             6013.0|
|         23097|          1246.42|             1552.0|
|         53616|          2714.58|             4188.0|
|         75007|           9716.0|            16795.0|
|         54816|           9882.0|            26385.0|
|         55445|           2772.0|             4619.0|
|         23459|           3357.0|             7723.0|
|         