#### Data Extraction

Load CSV filed from DBFS

In [0]:
# import necessary libraries and functions
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Delta Tables").getOrCreate()
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, StringType, IntegerType, DoubleType, TimestampType

from datetime import datetime, timedelta

In [0]:
#loading csv files
payments_df = spark.read.format("csv").option("separator", ",").load('dbfs:/FileStore/tables/payments.csv')
ridrs_df = spark.read.format("csv").option("separator", ",").load('dbfs:/FileStore/tables/riders.csv')
staions_df = spark.read.format("csv").option("separator", ",").load('dbfs:/FileStore/tables/stations.csv')
trips_df = spark.read.format("csv").option("separator", ",").load('dbfs:/FileStore/tables/trips.csv')
display(payments_df.head(5))

_c0,_c1,_c2,_c3
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000


In [0]:
payments_df = payments_df \
    .withColumnRenamed("_c0", "payment_id") \
    .withColumnRenamed("_c1", "date") \
    .withColumnRenamed("_c2", "amount") \
    .withColumnRenamed("_c3", "rider_id")


payments_df = payments_df \
    .withColumn("payment_id", F.col("payment_id").cast(IntegerType())) \
    .withColumn("date", F.col("date").cast(DateType())) \
    .withColumn("amount", F.col("amount").cast(DoubleType())) \
    .withColumn("rider_id", F.col("rider_id").cast(IntegerType()))

display(payments_df.head(5))


payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000


In [0]:
display(staions_df.head(5))

_c0,_c1,_c2,_c3
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


In [0]:
riders_df = ridrs_df.withColumnRenamed("_c0", "rider_id")\
                    .withColumnRenamed("_c1", "first_name")\
                    .withColumnRenamed("_c2", "last_name") \
                    .withColumnRenamed("_c3", "address")\
                    .withColumnRenamed("_c4", "birthday") \
                    .withColumnRenamed("_c5", "account_start_date") \
                    .withColumnRenamed("_c6", "account_end_date")\
                    .withColumnRenamed("_c7", "IsMember") 

riders_df = riders_df.withColumn("rider_id", F.col("rider_id").cast(IntegerType()))\
                     .withColumn("birthday", F.col("birthday").cast(DateType()))\
                     .withColumn("account_start_date", F.col("account_start_date").cast(DateType()))\
                     .withColumn("account_end_date", F.col("account_end_date").cast(DateType()))\
                     .withColumn("age", (F.datediff(F.col("account_start_date"), F.col("birthday")) / 365).cast('int'))\
                     .withColumn("IsMember", F.when(F.col("IsMember") == "True","Rider").otherwise("Casual"))
                    

display(riders_df.head(5))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,IsMember,age
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True,30
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True,43
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True,23
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False,20
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True,50


In [0]:
display(riders_df.head(5))


rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,IsMember,age
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,Rider,30
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,Rider,43
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,Rider,23
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,Casual,20
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,Rider,50


In [0]:
staions_df = staions_df.withColumnRenamed("_c0", "station_id")\
                       .withColumnRenamed("_c1", "name")\
                        .withColumnRenamed("_c2", "latitude")\
                        .withColumnRenamed("_c3", "longitude")

staions_df = staions_df.withColumn("latitude", F.col("latitude").cast(DoubleType()))\
                       .withColumn("longitude", F.col("longitude").cast(DoubleType()))

display(staions_df.head(5))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


In [0]:
display(trips_df.head(5))

_c0,_c1,_c2,_c3,_c4,_c5,_c6
89E7AA6C29227EFF,classic_bike,2021-02-12 16:14:56,2021-02-12 16:21:43,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14 17:52:38,2021-02-14 18:12:09,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09 19:10:18,2021-02-09 19:19:10,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02 17:49:41,2021-02-02 17:54:06,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23 15:07:23,2021-02-23 15:22:37,13216,TA1309000055,39608


In [0]:
trips_df = trips_df.withColumnRenamed("_c0", "trip_id")\
                   .withColumnRenamed("_c1", "bike_type")\
                    .withColumnRenamed("_c2", "start_at")\
                     .withColumnRenamed("_c3", "end_at")\
                      .withColumnRenamed("_c4", "start_station_id")\
                       .withColumnRenamed("_c5", "end_station_id")\
                        .withColumnRenamed("_c6", "rider_id")

trips_df = trips_df.withColumn("start_at", F.col("start_at").cast(TimestampType()))\
                   .withColumn("end_at", F.col("end_at").cast(TimestampType()))\
                    .withColumn("rider_id", F.col("rider_id").cast(IntegerType()))\
                    .withColumn('duration_mins', ((F.unix_timestamp(F.col('end_at')) - F.unix_timestamp(F.col('start_at'))) / 60))\
                    .withColumn("duration_mins", F.round(F.col("duration_mins"), 2))

display(trips_df.head(5))               
                   

trip_id,bike_type,start_at,end_at,start_station_id,end_station_id,rider_id,duration_mins
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56Z,2021-02-12T16:21:43Z,525,660,71934,6.78
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38Z,2021-02-14T18:12:09Z,525,16806,47854,19.52
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18Z,2021-02-09T19:19:10Z,KA1503000012,TA1305000029,70870,8.87
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41Z,2021-02-02T17:54:06Z,637,TA1305000034,58974,4.42
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23Z,2021-02-23T15:22:37Z,13216,TA1309000055,39608,15.23


Tranformation of Data

## Creating delta tables 


In [0]:

tables = ["fact_trips", "fact_payments", "dim_riders", "dim_stations", "dim_date"]

for table in tables:
    spark.sql(f"DROP TABLE IF EXISTS {table}")

In [0]:
trips_df.write.format("delta").mode("overwrite").saveAsTable("fact_trips")
riders_df.write.format("delta").mode("overwrite").saveAsTable("dim_riders")
staions_df.write.format("delta").mode("overwrite").saveAsTable("dim_stations")
payments_df.write.format("delta").mode("overwrite").saveAsTable("fact_payments")          

In [0]:
row = spark.sql("SELECT MIN(start_at) AS min_ts, MAX(start_at) AS max_ts FROM fact_trips").collect()[0]
min_date = row['min_ts'].date()
max_date = row['max_ts'].date()

num_hours = int(((max_date - min_date).days + 1) * 24)


#date_df = spark.range(0, num_hours).withColumn("full_datetime", F.expr(f"CAST('{min_date}' AS TIMESTAMP) + INTERVAL id HOUR")).drop("id")
date_df = spark.range(0, num_hours).withColumn("full_datetime",F.expr(f"timestamp('{min_date}') + id * interval 1 second * 3600")).drop("id")


dim_date = date_df.withColumn("date", F.date_format("full_datetime", "yyyy-MM-dd").cast(DateType()))\
                   .withColumn("year", F.year(F.col("date")))\
                   .withColumn("month", F.month(F.col("date")))\
                   .withColumn("day", F.dayofmonth(F.col("date")))\
                   .withColumn("qtr", F.quarter(F.col("date")))\
                   .withColumn("week", F.weekofyear(F.col("date")))\
                    .withColumn('day_of_the_week', F.dayofweek(F.col("date")))\
                   .withColumn("hour", F.date_format("full_datetime", "HH"))\
                    .withColumn("time_of_the_day",F.when((F.col("hour") >= 5) & (F.col("hour") <= 11), "morning")\
                        .when((F.col("hour") >= 12) & (F.col("hour") <= 16), "afternoon")\
                            .when((F.col("hour") >= 17) & (F.col("hour") <= 21), "evening").otherwise("night"))

                  
                                            
dim_date.limit(5).display()

full_datetime,date,year,month,day,qtr,week,day_of_the_week,hour,time_of_the_day
2021-02-01T00:00:00Z,2021-02-01,2021,2,1,1,5,2,0,night
2021-02-01T01:00:00Z,2021-02-01,2021,2,1,1,5,2,1,night
2021-02-01T02:00:00Z,2021-02-01,2021,2,1,1,5,2,2,night
2021-02-01T03:00:00Z,2021-02-01,2021,2,1,1,5,2,3,night
2021-02-01T04:00:00Z,2021-02-01,2021,2,1,1,5,2,4,night


In [0]:
dim_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")

## Business Outcomes Analysis 

#  The business outcomes you are designing for:

##  Analyze how much time is spent per ride
- 
- Based on date and time factors such as day of week and time of day
- Based on which station is the starting and / or ending station
- Based on age of the rider at time of the ride
- Based on whether the rider is a member or a casual rider

In [0]:
trips_df = spark.table('fact_trips')
riders_df = spark.table('dim_riders')
stations_df = spark.table('dim_stations')
date_df = spark.table('dim_date')
payments_df = spark.table('fact_payments')

**Based on date and time factors such as day of week and time of day**

In [0]:
#how much time is spent per ride Based on date and time factors such as day of week and time of day

trips_df = trips_df.withColumn("start_date", F.to_date(F.col("start_at")))

riders_time_df = trips_df.join(date_df, [F.col('start_date') == F.col('date')], how = 'left') \
                         .groupBy('rider_id', 'day_of_the_week', 'time_of_the_day') \
                         .agg(F.sum('duration_mins').alias('total_time_spent_mins')).orderBy('rider_id')
display(riders_time_df)

rider_id,day_of_the_week,time_of_the_day,total_time_spent_mins
1000,1,night,448.0
1000,5,night,467.2499999999999
1000,7,evening,235.05
1000,1,morning,448.0
1000,6,night,436.94
1000,3,evening,193.05
1000,1,evening,320.0
1000,4,morning,602.5600000000001
1000,1,afternoon,320.0
1000,5,morning,467.2499999999999


In [0]:
trips_df.join(date_df, (F.col('start_date') == F.col('date')), how = 'left').groupBy('day_of_the_week').agg(F.sum('duration_mins').alias('total_duration_mins'),
                                                                                                            F.avg('duration_mins').alias('avg_duration_mins')).orderBy('day_of_the_week').display()

day_of_the_week,total_duration_mins,avg_duration_mins
1,476492948.39996505,27.8250166777364
2,285143343.84002113,20.62292925768893
3,263829192.96004564,18.176754473139525
4,266780969.04003492,18.03541203175128
5,261941803.6799556,18.24242434675909
6,326276658.9600244,20.805764135669687
7,517055242.3199687,26.194230099576515


In [0]:
trips_df.join(date_df, (F.col('start_date') == F.col('date')), how = 'left').groupBy('time_of_the_day').agg(F.sum('duration_mins').alias('total_duration_mins'),
                                                                                                            F.count('*').alias('total_trips')).display()

time_of_the_day,total_duration_mins,total_trips
afternoon,499483366.500005,22924605
night,699276713.0999637,32094447
morning,699276713.0999637,32094447
evening,499483366.500005,22924605


In [0]:
trips_df.join(date_df, (F.col('start_date') == F.col('date')), how = 'left').groupBy('day_of_the_week', 'time_of_the_day').agg(F.sum('duration_mins').alias('total_duration_mins'),
                                                                                                            F.avg('duration_mins').alias('avg_duration_mins'),
                                                                                                            F.count('*').alias('total_trips')).orderBy('day_of_the_week').display()

day_of_the_week,time_of_the_day,total_duration_mins,avg_duration_mins,total_trips
1,morning,138977109.9500025,27.82501667773894,4994682
1,evening,99269364.2500038,27.825016677739512,3567630
1,night,138977109.9500025,27.82501667773894,4994682
1,afternoon,99269364.2500038,27.825016677739512,3567630
2,night,83166808.61999758,20.6229292576868,4032735
2,afternoon,59404863.29999884,20.622929257687,2880525
2,evening,59404863.29999884,20.622929257687,2880525
2,morning,83166808.61999756,20.6229292576868,4032735
3,morning,76950181.28000076,18.17675447313656,4233439
3,evening,54964415.19999855,18.1767544731359,3023885


**Based on which station is the starting and / or ending station**

In [0]:
trips_df.join(stations_df, (F.col('start_station_id') == F.col('station_id')), how='left').groupBy('name').agg(F.count('*').alias('total_trips'), F.sum('duration_mins').alias
                                                                                                               ('total_duration_mins'), F.avg('duration_mins').alias('avg_duration_mins')).orderBy(F.desc('total_trips')).display()



name,total_trips,total_duration_mins,avg_duration_mins
Streeter Dr & Grand Ave,80344,3131935.610000004,38.98157435527237
Lake Shore Dr & North Blvd,46380,1353331.0300000014,29.17919426476933
Lake Shore Dr & Monroe St,44672,1782985.4800000037,39.91281966332386
Michigan Ave & Oak St,42722,1551938.8199999968,36.32645522213372
Wells St & Concord Ln,41604,659284.11,15.84665200461494
Millennium Park,40505,1955578.3599999973,48.27992494753728
Clark St & Elm St,39346,686051.9300000005,17.43638311391248
Wells St & Elm St,35955,499983.4299999996,13.90581087470448
Theater on the Lake,35704,1044941.9500000008,29.266803439390564
Kingsbury St & Kinzie St,32422,385063.9899999993,11.87662667324654


In [0]:
trips_df.join(stations_df, (F.col('end_station_id') == F.col('station_id')), how='left').groupBy('name').agg(F.count('*').alias('total_trips'), F.sum('duration_mins').alias
                                                                                                               ('total_duration_mins'), F.avg('duration_mins').alias('avg_duration_mins')).orderBy(F.desc('total_trips')).display()

name,total_trips,total_duration_mins,avg_duration_mins
Streeter Dr & Grand Ave,81840,3144348.2600000054,38.42067766373418
Lake Shore Dr & North Blvd,52641,1488567.5700000012,28.277722117740947
Michigan Ave & Oak St,43435,1504248.359999997,34.63217129043392
Lake Shore Dr & Monroe St,43151,1651356.7000000016,38.2692567959028
Wells St & Concord Ln,41947,647914.3300000014,15.446023076739728
Millennium Park,41766,1564881.5100000026,37.46783292630376
Clark St & Elm St,38767,645040.0900000018,16.63889622617179
Theater on the Lake,36192,1164563.0300000026,32.17736046640149
Wells St & Elm St,35843,515079.8300000004,14.370444159250075
Kingsbury St & Kinzie St,31960,354071.70999999985,11.078589173967456


**Based on age of the rider at time of the ride**

In [0]:
trips = trips_df.join(riders_df.select('rider_id', 'birthday'), on='rider_id', how='left')
trips = trips.withColumn("age_at_ride", (F.datediff(F.col('start_at'), F.col("birthday")) / 365).cast('int'))

trips.groupBy('rider_id',"age_at_ride").agg(F.count('trip_id').alias('total_trips'),
                                             F.sum('duration_mins').alias('total_duration_mins'), 
                                             F.avg('duration_mins').alias('avg_duration_mins'))\
                                                 .orderBy(F.desc('total_trips')).display()



rider_id,age_at_ride,total_trips,total_duration_mins,avg_duration_mins
53044,38,1620,56901.78000000003,35.124555555555574
37388,16,1529,56368.34000000003,36.86614780902553
33748,25,1525,51332.12000000002,33.66040655737706
21973,32,1490,45344.83,30.43277181208054
5211,66,1422,51593.280000000006,36.282194092827005
66814,43,1396,53527.72999999998,38.34364613180514
4193,19,1367,48226.41000000001,35.27901243599123
19330,17,1359,50279.11999999998,36.99714495952905
43849,34,1346,45688.3,33.94375928677563
55633,43,1329,48944.76999999999,36.82826937547027


**Based on whether the rider is a member or a casual rider**

In [0]:
trips_df_alias = trips_df.alias("t")
riders_df_alias = riders_df.alias("r")


result_df = trips_df_alias.join(
    riders_df_alias,
    on=F.col("t.rider_id") == F.col("r.rider_id"),
    how="left"
).groupBy("r.IsMember") \
 .agg(
     F.count("t.trip_id").alias("total_trips"),
     F.sum("t.duration_mins").alias("total_duration_mins"),
     F.avg("t.duration_mins").alias("avg_duration_mins")
 ).orderBy(F.desc("total_trips"))


result_df.display()

IsMember,total_trips,total_duration_mins,avg_duration_mins
Rider,3666306,80308322.90000041,21.904424480662662
Casual,918615,19588350.399999835,21.323786787718287


# Analyze how much money is spent
- Per month, quarter, year
- Per member, based on the age of the rider at account start

In [0]:
#Spent per rider
payments_df.groupBy("rider_id").agg(F.sum("amount").alias("total_amount")).orderBy(F.desc("total_amount")).display()



rider_id,total_amount
65089,1658.8299999999997
63569,1608.2999999999995
11368,1600.9
19270,1594.4699999999996
39136,1586.0099999999986
51831,1584.7500000000002
38365,1580.88
7243,1571.3000000000002
20017,1568.9899999999996
60682,1567.5500000000006


In [0]:
payments_df.groupBy(F.year("date"). alias('year')).agg(F.sum("amount").alias("total_amount")).orderBy(F.desc("total_amount")).display()

year,total_amount
2021,6081098.249999979
2020,4315449.400000015
2019,2978658.790000004
2018,2000105.500000001
2017,1308372.5399999977
2022,1189970.4499999983
2016,825120.8099999988
2015,477233.2199999999
2014,227402.94999999995
2013,53693.34000000001


In [0]:
payments_df.groupBy(F.quarter("date"). alias('quarter')).agg(F.sum("amount").alias("total_amount")).orderBy(F.asc("quarter")).display()

quarter,total_amount
1,5112376.490000002
2,4328269.550000012
3,4773199.860000011
4,5243259.349999996


In [0]:
payments_df.groupBy(F.month("date"). alias('month')).agg(F.sum("amount").alias("total_amount")).orderBy(F.asc("month")).display()

month,total_amount
1,1855786.829999999
2,1907807.2700000012
3,1348782.389999997
4,1395762.7800000007
5,1441279.1899999976
6,1491227.5799999991
7,1538960.88
8,1592322.5800000012
9,1641916.4000000013
10,1696207.3200000029


In [0]:
payments_df.groupBy(F.year("date"). alias('year'), F.quarter("date"). alias('quarter'), F.month("date"). alias('month')).agg(F.sum("amount").alias("total_amount")).orderBy(F.asc("year"), F.asc('quarter')).display()

year,quarter,month,total_amount
2013,1,2,12.9
2013,1,3,817.7499999999999
2013,2,5,2716.71
2013,2,4,1672.65
2013,2,6,3775.3
2013,3,9,6672.100000000001
2013,3,8,5834.3
2013,3,7,4760.959999999999
2013,4,11,9195.45
2013,4,12,10349.1


### EXTRA CREDIT - Analyze how much money is spent per member
- Based on how many rides the rider averages per month
- Based on how many minutes the rider spends on a bike per month

In [0]:
payments_df.groupBy('rider_id').agg(F.avg("amount").alias("average_amount"), F.sum("amount").alias("total_amount")).orderBy(F.desc("total_amount")).display()

rider_id,average_amount,total_amount
65089,15.503084112149528,1658.8299999999997
63569,15.317142857142857,1608.2999999999995
11368,14.961682242990651,1600.9
19270,15.480291262135918,1594.4699999999996
39136,14.962358490566023,1586.0099999999986
51831,14.810747663551403,1584.7500000000002
38365,15.056,1580.88
7243,14.549074074074076,1571.3000000000002
20017,15.086442307692304,1568.9899999999996
60682,14.788207547169815,1567.5500000000006


In [0]:
payments_df.groupBy('rider_id', F.month("date"). alias('month')).agg(F.avg("amount").alias("average_amount")).orderBy(("rider_id")).display()

rider_id,month,average_amount
1000,2,9.0
1000,12,9.0
1000,7,9.0
1000,10,9.0
1000,5,9.0
1000,3,9.0
1000,9,9.0
1000,8,9.0
1000,6,9.0
1000,4,9.0
