In [36]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf

In [37]:
spark = (
    SparkSession
    .builder
    .master("spark://developer:7077")
    .getOrCreate()
)

In [38]:
df_q4 = spark.read.csv("data/Divvy_Trips_2019_Q4.csv", header=True, inferSchema=True)

                                                                                

In [39]:
df_q4.count()

                                                                                

704054

In [40]:
df_q4.show(4)

+--------+-------------------+-------------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
| trip_id|         start_time|           end_time|bikeid|tripduration|from_station_id|   from_station_name|to_station_id|     to_station_name|  usertype|gender|birthyear|
+--------+-------------------+-------------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
|25223640|2019-10-01 00:01:39|2019-10-01 00:17:20|  2215|       940.0|             20|Sheffield Ave & K...|          309|Leavitt St & Armi...|Subscriber|  Male|     1987|
|25223641|2019-10-01 00:02:16|2019-10-01 00:06:34|  6328|       258.0|             19|Throop (Loomis) S...|          241| Morgan St & Polk St|Subscriber|  Male|     1998|
|25223642|2019-10-01 00:04:32|2019-10-01 00:18:43|  3003|       850.0|             84|Milwaukee Ave & G...|          199|Wabash Ave & Gran...|Sub

## 1. Average trip duration per day

### Converting `trip duration` from str to double

In [41]:
df_q4 = df_q4.withColumn("tripduration", sf.regexp_replace(df_q4.tripduration, r",", ""))

df_q4.show()

+--------+-------------------+-------------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
| trip_id|         start_time|           end_time|bikeid|tripduration|from_station_id|   from_station_name|to_station_id|     to_station_name|  usertype|gender|birthyear|
+--------+-------------------+-------------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
|25223640|2019-10-01 00:01:39|2019-10-01 00:17:20|  2215|       940.0|             20|Sheffield Ave & K...|          309|Leavitt St & Armi...|Subscriber|  Male|     1987|
|25223641|2019-10-01 00:02:16|2019-10-01 00:06:34|  6328|       258.0|             19|Throop (Loomis) S...|          241| Morgan St & Polk St|Subscriber|  Male|     1998|
|25223642|2019-10-01 00:04:32|2019-10-01 00:18:43|  3003|       850.0|             84|Milwaukee Ave & G...|          199|Wabash Ave & Gran...|Sub

In [42]:
df_q4 = df_q4.withColumn("tripduration", df_q4.tripduration.cast("double"))

### Filter out trip duration greater than 24 hours

In [43]:

df_q4 = df_q4.filter(sf.date_diff(df_q4.end_time, df_q4.start_time) < 1)

In [44]:
average_trip = df_q4.select(sf.avg("tripduration").alias("avg_trip_duration"))

average_trip.show()



+-----------------+
|avg_trip_duration|
+-----------------+
|864.9071331477212|
+-----------------+



                                                                                

### Adding date column

In [45]:
df_q4 = df_q4.withColumn("date", sf.to_date("start_time"))


In [46]:
average_trip_per_day = (
    df_q4
    .groupBy("date")
    .agg(sf.avg("tripduration").alias("average_trip_per_day"))
    .orderBy("date")
).persist()


In [47]:
average_trip_per_day.show()

                                                                                

+----------+--------------------+
|      date|average_trip_per_day|
+----------+--------------------+
|2019-10-01|   971.6630221932115|
|2019-10-02|   778.7126786257221|
|2019-10-03|   861.8459663973323|
|2019-10-04|   880.3960314179413|
|2019-10-05|   1231.907294979281|
|2019-10-06|  1451.1369154676258|
|2019-10-07|   965.3760599372749|
|2019-10-08|   940.9945108353822|
|2019-10-09|   920.9143073517419|
|2019-10-10|   886.9929626577062|
|2019-10-11|   718.4914936202151|
|2019-10-12|  1184.4799769186382|
|2019-10-13|   1418.700571973308|
|2019-10-14|   1042.906599796482|
|2019-10-15|   848.0566606389391|
|2019-10-16|   759.5750077808901|
|2019-10-17|   798.6278659611993|
|2019-10-18|   931.1406706058233|
|2019-10-19|  1396.0116128101558|
|2019-10-20|  1186.6773230621088|
+----------+--------------------+
only showing top 20 rows


In [48]:
average_trip_per_day_1 = average_trip_per_day.select(sf.avg("average_trip_per_day").alias("average_trip_per_day"))

In [49]:
average_trip_per_day_1.show()



+--------------------+
|average_trip_per_day|
+--------------------+
|   846.2012546231474|
+--------------------+



                                                                                

In [50]:
average_trip_per_day.unpersist()

DataFrame[date: date, average_trip_per_day: double]

## 2. No. of trip taken each day

In [51]:
trip_cnt = (
    df_q4
    .groupBy("date")
    .count().alias("trip_count")
    .sort("date")
).persist()

In [52]:
trip_cnt.count()

                                                                                

92

In [53]:
trip_cnt.show()

+----------+-----+
|      date|count|
+----------+-----+
|2019-10-01|18384|
|2019-10-02| 9867|
|2019-10-03|15594|
|2019-10-04|14514|
|2019-10-05|10377|
|2019-10-06|13344|
|2019-10-07|17218|
|2019-10-08|17489|
|2019-10-09|17166|
|2019-10-10|15773|
|2019-10-11| 7994|
|2019-10-12| 8665|
|2019-10-13|10490|
|2019-10-14|13758|
|2019-10-15|13272|
|2019-10-16|12852|
|2019-10-17|13608|
|2019-10-18|14047|
|2019-10-19|13864|
|2019-10-20|10385|
+----------+-----+
only showing top 20 rows


## 3. Most popular starting trip station each month

In [54]:
month_trip_st = (
    df_q4
    .groupBy(sf.date_format("start_time", "yyyy-MM").alias("month"), "from_station_id", "from_station_name")
    .agg(sf.count("*").alias("trip_count"))
)

In [55]:
# window

from pyspark.sql.window import Window
win = Window.orderBy(sf.desc("trip_count")).partitionBy("month")

month_trip_st = (
    month_trip_st
    .withColumn("rank", sf.row_number().over(win))
).persist()

month_trip_st.show()

                                                                                

+-------+---------------+--------------------+----------+----+
|  month|from_station_id|   from_station_name|trip_count|rank|
+-------+---------------+--------------------+----------+----+
|2019-10|            192| Canal St & Adams St|      6558|   1|
|2019-10|             77|Clinton St & Madi...|      5337|   2|
|2019-10|             91|Clinton St & Wash...|      4920|   3|
|2019-10|             35|Streeter Dr & Gra...|      4535|   4|
|2019-10|             76|Lake Shore Dr & M...|      3964|   5|
|2019-10|            195|Columbus Dr & Ran...|      3917|   6|
|2019-10|            287|Franklin St & Mon...|      3736|   7|
|2019-10|            133|Kingsbury St & Ki...|      3491|   8|
|2019-10|             81|  Daley Center Plaza|      3378|   9|
|2019-10|             43|Michigan Ave & Wa...|      3291|  10|
|2019-10|            283|LaSalle St & Jack...|      2853|  11|
|2019-10|            174|Canal St & Madiso...|      2821|  12|
|2019-10|             90|     Millennium Park|      271

In [56]:
result = (
    month_trip_st
    .select("month", "from_station_id", "from_station_name")
    .filter(month_trip_st.rank == 1)
    .orderBy("month")
)

result = result.withColumnsRenamed({"from_station_id":"popular_station_id", "from_station_name": "popular_station_name"})
result.show()



+-------+------------------+--------------------+
|  month|popular_station_id|popular_station_name|
+-------+------------------+--------------------+
|2019-10|               192| Canal St & Adams St|
|2019-11|               192| Canal St & Adams St|
|2019-12|               192| Canal St & Adams St|
+-------+------------------+--------------------+



                                                                                

## 4. What were the top 3 trip stations each day for the last two weeks?

In [57]:
x = df_q4.select(sf.max("date").alias("max_date"))

In [67]:
x.first()['max_date']

                                                                                

datetime.date(2019, 12, 31)

In [58]:
max_date = df_q4.select(sf.max("date").alias("max_date")).head()
max_date

                                                                                

Row(max_date=datetime.date(2019, 12, 31))

In [59]:
max_date = max_date['max_date']

In [60]:
from datetime import timedelta
two_week_b_date = max_date - timedelta(weeks=2)

In [61]:
top_3_trip_st = (
    df_q4
    .filter((two_week_b_date < df_q4.date) & (df_q4.date <= max_date))
    .groupBy("date", "from_station_id", "from_station_name")
    .agg(sf.count("*").alias("count"))
    .orderBy(sf.desc("count")) # <-- is this right to use new created col 
)

top_3_trip_st.show()



+----------+---------------+--------------------+-----+
|      date|from_station_id|   from_station_name|count|
+----------+---------------+--------------------+-----+
|2019-12-26|             35|Streeter Dr & Gra...|  165|
|2019-12-25|             35|Streeter Dr & Gra...|  133|
|2019-12-19|            192| Canal St & Adams St|  133|
|2019-12-20|            192| Canal St & Adams St|  131|
|2019-12-29|             76|Lake Shore Dr & M...|  129|
|2019-12-18|            192| Canal St & Adams St|  123|
|2019-12-19|             77|Clinton St & Madi...|  123|
|2019-12-18|             77|Clinton St & Madi...|  115|
|2019-12-23|            192| Canal St & Adams St|  109|
|2019-12-20|             91|Clinton St & Wash...|  109|
|2019-12-26|             76|Lake Shore Dr & M...|  104|
|2019-12-29|             35|Streeter Dr & Gra...|  100|
|2019-12-19|             91|Clinton St & Wash...|   95|
|2019-12-20|             77|Clinton St & Madi...|   94|
|2019-12-18|             91|Clinton St & Wash...

                                                                                

In [63]:
ranked_top3_trip_each_day = (
    top_3_trip_st
    .withColumn("rank", sf.row_number().over(Window.orderBy(sf.desc("count")).partitionBy("date")))
    .filter(sf.col("rank") <= 3)
    .orderBy("date", "rank")
).persist()

ranked_top3_trip_each_day.show()

25/11/05 16:09:10 WARN CacheManager: Asked to cache already cached data.
                                                                                

+----------+---------------+--------------------+-----+----+
|      date|from_station_id|   from_station_name|count|rank|
+----------+---------------+--------------------+-----+----+
|2019-12-18|            192| Canal St & Adams St|  123|   1|
|2019-12-18|             77|Clinton St & Madi...|  115|   2|
|2019-12-18|             91|Clinton St & Wash...|   94|   3|
|2019-12-19|            192| Canal St & Adams St|  133|   1|
|2019-12-19|             77|Clinton St & Madi...|  123|   2|
|2019-12-19|             91|Clinton St & Wash...|   95|   3|
|2019-12-20|            192| Canal St & Adams St|  131|   1|
|2019-12-20|             91|Clinton St & Wash...|  109|   2|
|2019-12-20|             77|Clinton St & Madi...|   94|   3|
|2019-12-21|             35|Streeter Dr & Gra...|   62|   1|
|2019-12-21|            133|Kingsbury St & Ki...|   47|   2|
|2019-12-21|            289|Wells St & Concor...|   46|   3|
|2019-12-22|              3|      Shedd Aquarium|   87|   1|
|2019-12-22|            

In [64]:
ranked_top3_trip_each_day.count()

42

## 5. Do Males or Females take longer trips on average?

In [None]:
avg_trip_gender = (
    df_q4
    .groupBy("gender")
    .agg(sf.avg("tripduration").alias("avg_trip_duration"))
    .filter(df_q4.gender.isNotNull())
)

avg_trip_gender.show()

[Stage 173:>                                                        (0 + 4) / 4]

+------+-----------------+
|gender|avg_trip_duration|
+------+-----------------+
|Female|854.9775964440906|
|  Male|722.3914975861337|
+------+-----------------+



                                                                                

## 6. What is the top 10 ages of those that take the longest trips, and shortest?

In [None]:
trip_dur_by_age= (
    df_q4
    .groupBy("birthyear")
    .agg(sf.avg("tripduration").alias("avg_trip_dur"))
    .filter(df_q4.birthyear.isNotNull())
).persist()

In [None]:
trip_dur_by_age.show()

                                                                                

+---------+------------------+
|birthyear|      avg_trip_dur|
+---------+------------------+
|     1959| 736.0991064175467|
|     1990| 722.0950422599126|
|     1975|  798.899387462531|
|     1977| 750.4236185383244|
|     2003|1270.6935483870968|
|     1974| 756.5957397545728|
|     1955| 763.8329772079773|
|     1978| 734.6820239062318|
|     1961| 695.1775406681826|
|     1942|  574.969696969697|
|     1944| 946.4939759036145|
|     1939|           4122.75|
|     1899| 813.8524590163935|
|     1952|  781.208829712684|
|     1956| 757.5216712221788|
|     1934|             774.0|
|     1988| 755.7820942408378|
|     1997|1065.6947127352385|
|     1994| 720.3880443782083|
|     1968| 725.0322211318286|
+---------+------------------+
only showing top 20 rows


In [None]:
from datetime import date
curr_year = int(date.strftime(date.today(), "%Y"))

trip_dur_by_age = (
    trip_dur_by_age
    .withColumn("age", curr_year - trip_dur_by_age.birthyear )
).persist()

trip_dur_by_age.count()

                                                                                

77

In [None]:
top_10_longest_trip = (
    trip_dur_by_age
    .select("age", "avg_trip_dur")
    .limit(10)
    .orderBy(sf.desc("avg_trip_dur"))
)
top_10_shorted_trip = (
    trip_dur_by_age
    .select("age", "avg_trip_dur")
    .limit(10)
    .orderBy(sf.asc("avg_trip_dur"))
    .filter(trip_dur_by_age.age.isNotNull())
)

In [None]:
top_10_longest_trip.show(10)



+---+------------------+
|age|      avg_trip_dur|
+---+------------------+
| 22|1270.6935483870968|
| 73|  781.208829712684|
| 91|             774.0|
| 69| 757.5216712221788|
| 51| 756.5957397545728|
| 48| 750.4236185383244|
| 57| 725.0322211318286|
| 35| 722.0950422599126|
| 64| 695.1775406681826|
| 83|  574.969696969697|
+---+------------------+



                                                                                

In [None]:
top_10_shorted_trip.show(10)



+---+------------------+
|age|      avg_trip_dur|
+---+------------------+
| 83|  574.969696969697|
| 64| 695.1775406681826|
| 35| 722.0950422599126|
| 57| 725.0322211318286|
| 48| 750.4236185383244|
| 51| 756.5957397545728|
| 69| 757.5216712221788|
| 91|             774.0|
| 73|  781.208829712684|
| 22|1270.6935483870968|
+---+------------------+



                                                                                

In [None]:
trip_dur_by_age.select(sf.max("avg_trip_dur")).show()



+-----------------+
|max(avg_trip_dur)|
+-----------------+
|          4122.75|
+-----------------+



                                                                                

In [None]:
trip_dur_by_age.select(sf.min("avg_trip_dur")).show()




+-----------------+
|min(avg_trip_dur)|
+-----------------+
|            282.0|
+-----------------+



                                                                                

In [None]:
# spark.stop()