In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
import datetime
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [0]:
#Creat Spark Session
spark=SparkSession.builder.appName("DW_BikeShare").getOrCreate()

In [0]:
#Create Filestore paths
payments_file_path="/FileStore/payments.csv"
rider_file_path="/FileStore/riders.csv"
stations_file_path="/FileStore/stations.csv"
trips_file_path="/FileStore/trips.csv"


In [0]:
#Create payment dataframe from csv file
df_pay=spark.read.csv(payments_file_path,header=False,inferSchema=True)

In [0]:
#Create rider dataframe from csv file
df_rider=spark.read.csv(rider_file_path,header=False,inferSchema=True)

In [0]:
#Create stations dataframe from csv file
df_stations=spark.read.csv(stations_file_path,header=False,inferSchema=True)

In [0]:
#Create trips dataframe from csv file
df_trips=spark.read.csv(trips_file_path,header=False,inferSchema=True)

In [0]:
#Add header record to the dataframe
df_riders=df_rider.toDF("rider_id","first","last","address","birthday","account_start_date","account_end_date","is_member")

In [0]:
df_riders.printSchema()

root
 |-- rider_id: integer (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- account_start_date: date (nullable = true)
 |-- account_end_date: date (nullable = true)
 |-- is_member: boolean (nullable = true)



In [0]:
df_riders.dropDuplicates()

DataFrame[rider_id: int, first: string, last: string, address: string, birthday: date, account_start_date: date, account_end_date: date, is_member: boolean]

In [0]:
#Add header record to the dataframe
df_trip=df_trips.toDF("trip_id","rideable_type","started_at","ended_at","start_station_id","end_station_id","rider_id")

In [0]:
df_trip.printSchema()

root
 |-- trip_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- rider_id: integer (nullable = true)



In [0]:
df_trip.dropDuplicates()

DataFrame[trip_id: string, rideable_type: string, started_at: timestamp, ended_at: timestamp, start_station_id: string, end_station_id: string, rider_id: int]

In [0]:
#Add header record to the dataframe
df_station=df_stations.toDF("station_id","name","latitude","longitude")

In [0]:
#Drop Duplicates
df_station.dropDuplicates()

DataFrame[station_id: string, name: string, latitude: double, longitude: double]

In [0]:
df_station.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [0]:
#Add header record to the dataframe
df_payment=df_pay.toDF("id","date","amount","rider_id")

In [0]:
df_payment.show(5)

+---+----------+------+--------+
| id|      date|amount|rider_id|
+---+----------+------+--------+
|  1|2019-05-01|   9.0|    1000|
|  2|2019-06-01|   9.0|    1000|
|  3|2019-07-01|   9.0|    1000|
|  4|2019-08-01|   9.0|    1000|
|  5|2019-09-01|   9.0|    1000|
+---+----------+------+--------+
only showing top 5 rows



In [0]:
#df_payment.printSchema()
df_payment.dropDuplicates()

DataFrame[id: int, date: date, amount: double, rider_id: int]

In [0]:
#df_payment.count()
#1946607
#Create Delta file for Payment
df_payment.write.format("delta").mode("overwrite").save("/delta/bronze_payment")

In [0]:
df_payment.show(5)

+---+----------+------+--------+
| id|      date|amount|rider_id|
+---+----------+------+--------+
|  1|2019-05-01|   9.0|    1000|
|  2|2019-06-01|   9.0|    1000|
|  3|2019-07-01|   9.0|    1000|
|  4|2019-08-01|   9.0|    1000|
|  5|2019-09-01|   9.0|    1000|
+---+----------+------+--------+
only showing top 5 rows



In [0]:
df_p=spark.read.format("delta").load("/delta/bronze_payment")

In [0]:
df_p.head(5)

[Row(id=1064462, date=datetime.date(2020, 6, 1), amount=9.0, rider_id=42106),
 Row(id=1064463, date=datetime.date(2020, 7, 1), amount=9.0, rider_id=42106),
 Row(id=1064464, date=datetime.date(2020, 8, 1), amount=9.0, rider_id=42106),
 Row(id=1064465, date=datetime.date(2020, 9, 1), amount=9.0, rider_id=42106),
 Row(id=1064466, date=datetime.date(2020, 10, 1), amount=9.0, rider_id=42106)]

In [0]:
##Create Delta table for Payment
spark.sql ("CREATE TABLE IF NOT EXISTS silver_payment USING DELTA LOCATION '/delta/bronze_payment' ")

DataFrame[]

In [0]:
#Create Delta table for rider
df_riders.write.format("delta").mode("overwrite").save("/delta/bronze_rider")
spark.sql ("CREATE TABLE IF NOT EXISTS silver_rider USING DELTA LOCATION '/delta/bronze_rider' ")

DataFrame[]

In [0]:
#Create Delta table for trips data
df_trip.write.format("delta").mode("overwrite").save("/delta/bronze_trip")
spark.sql ("CREATE TABLE IF NOT EXISTS silver_trip USING DELTA LOCATION '/delta/bronze_trip' ")

DataFrame[]

In [0]:
#Create Delta table for Stations data
df_station.write.format("delta").mode("overwrite").save("/delta/bronze_station")
spark.sql ("CREATE TABLE IF NOT EXISTS silver_station USING DELTA LOCATION '/delta/bronze_station' ")

DataFrame[]

In [0]:
import pyspark.sql.functions as F 
min_date=spark.sql("select date(min(date)) from silver_payment").collect()[0][0]
#print(min_date)
max_date=spark.sql("select date(max(date)) from silver_payment").collect()[0][0]
#print(max_date)
date_dim = spark.range(0, (max_date - min_date).days + 1) \
    .withColumn("date", F.date_add(F.lit(min_date), F.col("id").cast("integer"))) \
    .withColumn("day_of_week", F.dayofweek("date").cast("integer")) \
    .withColumn("day_of_month", F.dayofmonth("date").cast("integer")) \
    .withColumn("day_of_year", F.dayofyear("date").cast("integer")) \
    .withColumn("week_of_year", F.weekofyear("date").cast("integer")) \
    .withColumn("month", F.month("date").cast("integer")) \
    .withColumn("month_name", F.monthname("date")) \
    .withColumn("quarter", F.quarter("date").cast("integer")) \
    .withColumn("year", F.year("date").cast("integer"))

In [0]:
date_dim.write.format("delta").mode("overwrite").saveAsTable("date_dim")

  

In [0]:
#Create Payment Fact table 
spark.sql('''CREATE TABLE IF NOT EXISTS payment_fact USING DELTA AS 
          SELECT DISTINCT p.id AS payment_id, dd.id as date_id,amount,rider_id FROM silver_payment p
          join date_dim dd on dd.date=p.date
          ''')

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("Select * from payment_fact limit 2").show()

+----------+-------+------+--------+
|payment_id|date_id|amount|rider_id|
+----------+-------+------+--------+
|       264|   2038|   9.0|    1011|
|       516|   2922| 17.99|    1021|
+----------+-------+------+--------+



In [0]:
##Create Dim Rider 
spark.sql('''CREATE TABLE IF NOT EXISTS dim_rider USING DELTA AS 
          SELECT DISTINCT * FROM silver_rider''')

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("select * from dim_rider  limit 2").show()

+--------+-------+---------+--------------------+----------+------------------+----------------+---------+
|rider_id|  first|     last|             address|  birthday|account_start_date|account_end_date|is_member|
+--------+-------+---------+--------------------+----------+------------------+----------------+---------+
|    1040|Matthew|    Watts|   323 Matthew Flats|1996-12-24|        2014-11-26|            NULL|     true|
|    1065|Rebecca|Mccormick|4912 Smith Alley ...|1983-07-18|        2017-07-13|            NULL|    false|
+--------+-------+---------+--------------------+----------+------------------+----------------+---------+



In [0]:
##Create DIM_Station
spark.sql('''CREATE TABLE IF NOT EXISTS dim_station USING DELTA AS 
          SELECT DISTINCT  *  FROM silver_station''')

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("select * from dim_station limit 2").show()

+------------+--------------------+-----------------+------------------+
|  station_id|                name|         latitude|         longitude|
+------------+--------------------+-----------------+------------------+
|       15550|Canal St & Taylor St|        41.870257|-87.63947399999999|
|TA1307000150|Pine Grove Ave & ...|41.94947274088333|-87.64645278453827|
+------------+--------------------+-----------------+------------------+



In [0]:
#Create Dim Ridetype table
spark.sql ('''
           CREATE TABLE IF NOT EXISTS dim_ride_type
USING DELTA
AS
SELECT
  ROW_NUMBER() OVER (ORDER BY rideable_type) AS ride_type_id,
  rideable_type
FROM
  silver_trip
GROUP BY
  rideable_type
  '''
           
           )

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql ("select * from dim_ride_type").show()

+------------+-------------+
|ride_type_id|rideable_type|
+------------+-------------+
|           1| classic_bike|
|           2|  docked_bike|
|           3|electric_bike|
+------------+-------------+



In [0]:
spark.sql ("select * from payment_fact where rider_id=34062").show()

+----------+-------+------+--------+
|payment_id|date_id|amount|rider_id|
+----------+-------+------+--------+
|    856454|   2891|   9.0|   34062|
|    856453|   2860|   9.0|   34062|
+----------+-------+------+--------+



In [0]:
##Create Trip Fact Table
spark.sql ('''
           CREATE TABLE IF NOT EXISTS trip_fact USING DELTA AS 

          select t.trip_id ,t.rider_id,t.start_station_id,t.end_station_id,rt.ride_type_id,sdd.id as start_date_id,edd.id as end_date_id, started_at,ended_at,
           round(datediff (second, t.started_at, t.ended_at) / 60)  AS trip_duration,
            floor (months_between (t.started_at, r.birthday) / 12) as rider_age,r.is_member
from silver_trip t
join dim_rider r on t.rider_id=r.rider_id
join dim_station s on s.station_id=t.start_station_id
join dim_station se on se.station_id=t.end_station_id
join dim_ride_type rt on rt.rideable_type=t.rideable_type
join date_dim sdd on sdd.date=date(t.started_at)
join date_dim edd on edd.date=date(t.ended_at)

'''
           
           
           )

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql ("select * from trip_fact limit 2").show()

+----------------+--------+----------------+--------------+------------+-------------+-----------+-------------------+-------------------+-------------+---------+---------+
|         trip_id|rider_id|start_station_id|end_station_id|ride_type_id|start_date_id|end_date_id|         started_at|           ended_at|trip_duration|rider_age|is_member|
+----------------+--------+----------------+--------------+------------+-------------+-----------+-------------------+-------------------+-------------+---------+---------+
|222BB8E5059252D7|   34062|    KA1503000064|         13021|           1|         3054|       3054|2021-06-13 09:48:47|2021-06-13 10:07:23|         19.0|       30|     true|
|1826E16CB5486018|    5342|    TA1306000010|         13021|           1|         3062|       3062|2021-06-21 22:59:13|2021-06-21 23:04:29|          5.0|       26|     true|
+----------------+--------+----------------+--------------+------------+-------------+-----------+-------------------+-----------------

In [0]:
spark.sql ("select min(started_at) as min_date,max(ended_at) as max_Date from trip_fact limit 2").show()

+-------------------+-------------------+
|           min_date|           max_Date|
+-------------------+-------------------+
|2021-02-01 01:07:04|2022-02-01 00:12:04|
+-------------------+-------------------+



In [0]:
spark.sql("select * from date_dim limit 5").show()

+----+----------+-----------+------------+-----------+------------+-----+----------+-------+----+
|  id|      date|day_of_week|day_of_month|day_of_year|week_of_year|month|month_name|quarter|year|
+----+----------+-----------+------------+-----------+------------+-----+----------+-------+----+
|2466|2019-11-03|          1|           3|        307|          44|   11|       Nov|      4|2019|
|2467|2019-11-04|          2|           4|        308|          45|   11|       Nov|      4|2019|
|2468|2019-11-05|          3|           5|        309|          45|   11|       Nov|      4|2019|
|2469|2019-11-06|          4|           6|        310|          45|   11|       Nov|      4|2019|
|2470|2019-11-07|          5|           7|        311|          45|   11|       Nov|      4|2019|
+----+----------+-----------+------------+-----------+------------+-----+----------+-------+----+



Data Anaylysis  how much time is spent per ride Based on the following Factors:
- Based on date and time factors such as day of week and time of day
- Based on which station is the starting and / or ending station
- Based on age of the rider at time of the ride
- Based on whether the rider is a member or a casual rider

In [0]:
# Analyze how much time is spent per ride -->  Based on date and time factors such as day of week and time of day

rider_stats_by_day_hour=spark.sql (  '''SELECT rider_id, DAY(started_at) as DAY, extract(HOUR FROM started_at) as HOUR, count(*) as num_rides, AVG( DATEDIFF (second, started_at, ended_at)) * 60 as minutes_spent
    FROM trip_fact
    GROUP by rider_id, DAY(started_at),extract(HOUR FROM started_at)
    Order By rider_id,DAY(started_at),extract(HOUR FROM started_at)'''
    )
rider_stats_by_day_hour.createOrReplaceTempView("rider_stats_by_day_hour")
spark.sql ("Select * from rider_stats_by_day_hour limit 5").show()

+--------+---+----+---------+-------------+
|rider_id|DAY|HOUR|num_rides|minutes_spent|
+--------+---+----+---------+-------------+
|    1000|  2|  10|        1|      95640.0|
|    1000|  2|  17|        1|      53880.0|
|    1000|  2|  23|        1|      31620.0|
|    1000|  3|  14|        1|      84660.0|
|    1000|  6|  19|        1|     170820.0|
+--------+---+----+---------+-------------+



In [0]:
# How Much Time Spent based on which station is the starting and / or ending station
trp_stats_by_stations=spark.sql("select start_station_id,end_station_id,count(distinct trip_id) as trip_count,sum(trip_duration) as timespent,avg(trip_duration) as avg_time from trip_fact group by start_station_id,end_station_id")
trp_stats_by_stations.createOrReplaceTempView("trp_stats_by_stations")
spark.sql("select * from trp_stats_by_stations limit 2").show()

+----------------+--------------+----------+---------+------------------+
|start_station_id|end_station_id|trip_count|timespent|          avg_time|
+----------------+--------------+----------+---------+------------------+
|             600|           600|       114|  37575.0|329.60526315789474|
|    TA1308000022|  TA1308000022|      1294|  51259.0|39.612828438948995|
+----------------+--------------+----------+---------+------------------+



In [0]:
# How Much Time Spent per rider based on which station is the starting and / or ending station

trp_stats_by_stations=spark.sql("select rider_id,start_station_id,end_station_id,count(distinct trip_id) as trip_count,sum(trip_duration) as timespent,avg(trip_duration) as avg_time from trip_fact group by rider_id,start_station_id,end_station_id")
trp_stats_by_stations.createOrReplaceTempView("trp_stats_by_rider_stations")
spark.sql("select * from trp_stats_by_rider_stations order by rider_id limit 5 ").show()

+--------+----------------+--------------+----------+---------+--------+
|rider_id|start_station_id|end_station_id|trip_count|timespent|avg_time|
+--------+----------------+--------------+----------+---------+--------+
|    1000|    KA1504000126|  TA1309000014|         2|     35.0|    17.5|
|    1000|    KA1504000126|  TA1309000030|         1|     43.0|    43.0|
|    1000|    KA1504000126|  KA1504000096|         1|      9.0|     9.0|
|    1000|    KA1504000126|  KA1504000126|         1|     45.0|    45.0|
|    1000|    KA1504000126|         13379|         1|     29.0|    29.0|
+--------+----------------+--------------+----------+---------+--------+



In [0]:
#Analyze how much time is spent per ride
#Based on age of the rider at time of the ride
rider_stats_by_age=spark.sql("select rider_age,count(trip_id) as trp_count,avg(trip_duration) as avg_timespent from trip_fact group by rider_age")

rider_stats_by_age.createOrReplaceTempView("rider_avg_stats_by_age")
spark.sql("select * from rider_avg_stats_by_age ORder By rider_age desc limit 2").show()


+---------+---------+------------------+
|rider_age|trp_count|     avg_timespent|
+---------+---------+------------------+
|       75|       45|              51.4|
|       74|     1768|19.059954751131222|
+---------+---------+------------------+



In [0]:
#Based on whether the rider is a member or a casual rider
rider_stats_by_member_type=spark.sql("select  CASE WHEN is_member=true then 'member' else 'casualrider' end as Member_type,count(trip_id) as trp_count,round(sum(trip_duration)/60 ) as total_timespent_hrs from trip_fact group by  CASE WHEN is_member=true then 'member' else 'casualrider' end ")
rider_stats_by_member_type.createOrReplaceTempView("rider_stats_by_member_type")
spark.sql("select * from rider_stats_by_member_type limit 2").show()

+-----------+---------+-------------------+
|Member_type|trp_count|total_timespent_hrs|
+-----------+---------+-------------------+
|casualrider|   918615|           326582.0|
|     member|  3666306|          1338940.0|
+-----------+---------+-------------------+



In [0]:

# aggregate count of rides and duration by rider, by month
rider_stats_month_year = spark.sql(
    """
    SELECT  MONTH(started_at) as month, YEAR(started_at) as year, count(*) as num_rides, AVG( DATEDIFF (second, started_at, ended_at)) * 60 as minutes_spent
    FROM trip_fact
    GROUP by  MONTH(started_at), YEAR(started_at);
    """
)
rider_stats_month_year.createOrReplaceTempView("rider_stats_month_year")
spark.sql("select * from rider_stats_month_year limit 2").show()

+-----+----+---------+-----------------+
|month|year|num_rides|    minutes_spent|
+-----+----+---------+-----------------+
|    4|2021|   298207|86494.35231231997|
|    5|2021|   450994|95410.06802751256|
+-----+----+---------+-----------------+



Analyze how much money is spent
- Per month, quarter, year
- Per member, based on the age of the rider at account start

In [0]:
#how much money is spent Per month, quarter, year Per Rider

rider_stats_amount=spark.sql ('''
           select r.rider_id,dat.year ,dat.quarter , dat.month ,sum(amount) as Total_Amount, Avg(amount) Avg_Amount
           From
           dim_rider r join payment_fact p on r.rider_Id=p.rider_Id
           join date_dim dat on dat.Id=p.date_id
           
           Group By r.rider_id,dat.year ,dat.quarter , dat.month
           Order By r.rider_id,dat.year ,dat.quarter , dat.month
           ''')
rider_stats_amount.createOrReplaceTempView("rider_stats_amount")
spark.sql("select * from rider_stats_amount limit 5").show()

+--------+----+-------+-----+------------+----------+
|rider_id|year|quarter|month|Total_Amount|Avg_Amount|
+--------+----+-------+-----+------------+----------+
|    1000|2019|      2|    5|         9.0|       9.0|
|    1000|2019|      2|    6|         9.0|       9.0|
|    1000|2019|      3|    7|         9.0|       9.0|
|    1000|2019|      3|    8|         9.0|       9.0|
|    1000|2019|      3|    9|         9.0|       9.0|
+--------+----+-------+-----+------------+----------+



In [0]:
money_spent=spark.sql ('''
           select dat.year ,dat.quarter , dat.month ,sum(amount) as Total_Amount, Avg(amount) Avg_Amount
           From
           dim_rider r join payment_fact p on r.rider_Id=p.rider_Id
           join date_dim dat on dat.id=p.date_id
           
           Group By dat.year ,dat.quarter , dat.month
           Order By dat.year ,dat.quarter , dat.month
           ''')
money_spent.createOrReplaceTempView("money_spent")
spark.sql("select * from money_spent limit 5").show()

+----+-------+-----+------------------+------------------+
|year|quarter|month|      Total_Amount|        Avg_Amount|
+----+-------+-----+------------------+------------------+
|2013|      1|    2|              12.9|              12.9|
|2013|      1|    3| 817.7499999999999|10.095679012345677|
|2013|      2|    4|1672.6500000000003|  9.89733727810651|
|2013|      2|    5|           2716.71|10.290568181818182|
|2013|      2|    6|3775.2999999999997|10.148655913978494|
+----+-------+-----+------------------+------------------+



In [0]:
#Per member, based on the age of the rider at account start

rider_stats_by_age=spark.sql('''select floor (months_between (account_start_Date, birthday) / 12) as rider_age,sum(p.amount) as Total_Amt   From
           dim_rider r join payment_fact p on r.rider_Id=p.rider_Id
           where is_member=true
          Group By floor (months_between (account_start_Date, birthday) / 12)
                     ''')
rider_stats_by_age.createOrReplaceTempView("rider_stats_by_age")
spark.sql("select * from rider_stats_by_age limit 4").show()

+---------+---------+
|rider_age|Total_Amt|
+---------+---------+
|       29| 438219.0|
|       26| 470439.0|
|       65|   8469.0|
|       54|  58365.0|
+---------+---------+



In [0]:
# how many minutes the rider spends on a bike per month
spark.sql ('''
           select rider_id,MONTH(started_at) as Month,count(distinct trip_id) as trip_count, sum(trip_duration) as total_trip_mns 
           ,AVG(COUNT(DISTINCT trip_id)) OVER (PARTITION BY rider_id, MONTH(started_at)) AS avg_monthly_rides
             from trip_fact 
           Group by rider_id,MONTH(started_at)
           limit 4
           ''').show()

+--------+-----+----------+--------------+-----------------+
|rider_id|Month|trip_count|total_trip_mns|avg_monthly_rides|
+--------+-----+----------+--------------+-----------------+
|    1000|    4|         2|          34.0|              2.0|
|    1000|   11|         3|          58.0|              3.0|
|    1003|    6|        18|         296.0|             18.0|
|    1003|   10|        16|         190.0|             16.0|
+--------+-----+----------+--------------+-----------------+



In [0]:
# how many minutes the rider spends on a bike per month
spark.sql ('''
           select t.rider_id,MONTH(started_at) as Month,count(distinct trip_id) as trip_count, sum(trip_duration) as total_trip_mns 
           ,AVG(COUNT(DISTINCT trip_id)) OVER (PARTITION BY t.rider_id, MONTH(started_at)) AS avg_monthly_rides
             from trip_fact t join Dim_rider d on d.rider_id=t.rider_id
           Group by t.rider_id,MONTH(started_at)
           limit 4
           ''').show()

+--------+-----+----------+--------------+-----------------+
|rider_id|Month|trip_count|total_trip_mns|avg_monthly_rides|
+--------+-----+----------+--------------+-----------------+
|    1000|    4|         2|          34.0|              2.0|
|    1000|   11|         3|          58.0|              3.0|
|    1003|    6|        18|         296.0|             18.0|
|    1003|   10|        16|         190.0|             16.0|
+--------+-----+----------+--------------+-----------------+

