In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt

In [3]:
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .appName("Spark_practice")\
        .getOrCreate()

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 46.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=63c627c1af9dfa6fbf608e8e77e84e8739f70fa204186c96fdd29f983a24b697
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [4]:
spark

In [5]:
df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/201902-fordgobike-tripdata.csv',\
                    header=True, inferSchema=True)
                    

In [7]:
from pyspark.sql.functions import max, min, datediff, format_number, collect_list,\
struct, mean, when, size, collect_set, count, to_date, countDistinct, current_date,\
sumDistinct, column, year, date_format, month, dayofmonth

from pyspark.sql import Window
import pyspark.sql.functions as F

In [8]:
df.columns

['duration_sec',
 'start_time',
 'end_time',
 'start_station_id',
 'start_station_name',
 'start_station_latitude',
 'start_station_longitude',
 'end_station_id',
 'end_station_name',
 'end_station_latitude',
 'end_station_longitude',
 'bike_id',
 'user_type',
 'member_birth_year',
 'member_gender',
 'bike_share_for_all_trip']

In [60]:
df.show(2)

+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+---------+-----------------+-------------+-----------------------+
|duration_sec|          start_time|            end_time|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bike_id|user_type|member_birth_year|member_gender|bike_share_for_all_trip|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+---------+-----------------+-------------+-----------------------+
|       52185|2019-02-28 17:32:...|2019-03-01 08:01:...|              21|Montgomery St BAR...|            37.789625

In [11]:
print((df.count(),len(df.columns)))

(183412, 16)


### Distribution by Member Gender

In [13]:
df.groupBy('member_gender').agg(F.expr("count(member_gender)").alias("No. of records")).show()

+-------------+--------------+
|member_gender|No. of records|
+-------------+--------------+
|         null|             0|
|       Female|         40844|
|        Other|          3652|
|         Male|        130651|
+-------------+--------------+



### The minimum, maximum and average age of bicycle rentals

In [20]:
df_1 = df.withColumn("Age",2022 - df['member_birth_year'] ).alias('Age')
df_1.groupBy('member_gender').agg(\
                          F.expr("avg(Age)").alias("AVG Age"),\
                          F.expr("min(Age)").alias("MIN Age"),\
                          F.expr("max(Age)").alias("MAX Age"))\
                          .withColumn("AVG Age",F.round(F.col('AVG Age'),0).cast('integer')).show()

+-------------+-------+-------+-------+
|member_gender|AVG Age|MIN Age|MAX Age|
+-------------+-------+-------+-------+
|         null|   null|   null|   null|
|       Female|     36|     21|    144|
|        Other|     39|     22|    112|
|         Male|     37|     21|    122|
+-------------+-------+-------+-------+



### The number of unique bikes

In [21]:
df.select('bike_id').distinct().count()

4646

### Number of unique bike stations

In [38]:
df.dropDuplicates(['start_station_id','end_station_id']).select('start_station_id').distinct().count()

330

### Which bike was rented the longest and which was the shortest during the analyzed period (and for how long)

In [43]:
aggData = df.groupBy('bike_id').agg(F.expr("count(bike_id)").alias("Total Rents"),\
                                    F.expr("avg(duration_sec)").alias("AVG Rent Time"),\
                                    F.expr("min(duration_sec)").alias("MIN Rent Time"),\
                                    F.expr("max(duration_sec)").alias("MAX Rent Time"))\
                                    .orderBy("MAX Rent Time",ascending=False).show(1)

aggData = df.groupBy('bike_id').agg(F.expr("count(bike_id)").alias("Total Rents"),\
                                    F.expr("avg(duration_sec)").alias("AVG Rent Time"),\
                                    F.expr("min(duration_sec)").alias("MIN Rent Time"),\
                                    F.expr("max(duration_sec)").alias("MAX Rent Time"))\
                                    .orderBy("MAX Rent Time",ascending=True).show(1)

+-------+-----------+-------------+-------------+-------------+
|bike_id|Total Rents|AVG Rent Time|MIN Rent Time|MAX Rent Time|
+-------+-----------+-------------+-------------+-------------+
|   6168|         75|      1816.76|          184|        85444|
+-------+-----------+-------------+-------------+-------------+
only showing top 1 row

+-------+-----------+-------------+-------------+-------------+
|bike_id|Total Rents|AVG Rent Time|MIN Rent Time|MAX Rent Time|
+-------+-----------+-------------+-------------+-------------+
|   1679|          1|         76.0|           76|           76|
+-------+-----------+-------------+-------------+-------------+
only showing top 1 row



### Average duration of a single loan

In [46]:
df.select(mean('duration_sec').alias("AVG Duration")).show()

+----------------+
|    AVG Duration|
+----------------+
|726.078435434977|
+----------------+



### Between which stations there was the greatest traffic

In [47]:
df.groupBy('start_station_name','end_station_name').agg(\
    F.expr("count(bike_id)").alias("Total Connections")).orderBy('Total Connections',ascending=False).show(1)

+------------------+--------------------+-----------------+
|start_station_name|    end_station_name|Total Connections|
+------------------+--------------------+-----------------+
|Berry St at 4th St|San Francisco Fer...|              337|
+------------------+--------------------+-----------------+
only showing top 1 row



### At what time during the day were the most bicycles rented

In [49]:
df.withColumn("Time of the day",date_format('start_time','HH:mm')).select("Time of the day")\
.groupBy("Time of the day").count().orderBy('count',ascending=False).show()

+---------------+-----+
|Time of the day|count|
+---------------+-----+
|          17:36|  449|
|          17:11|  447|
|          08:42|  433|
|          08:54|  416|
|          08:31|  416|
|          17:03|  415|
|          17:07|  415|
|          08:59|  410|
|          17:10|  410|
|          08:47|  401|
|          17:38|  401|
|          17:17|  401|
|          08:33|  395|
|          08:27|  394|
|          17:44|  394|
|          17:15|  393|
|          17:35|  391|
|          17:09|  390|
|          08:48|  388|
|          17:13|  386|
+---------------+-----+
only showing top 20 rows



### The average number of rentals for individual days of the week

In [55]:
df.withColumn("Day of Rent",date_format('start_time','EEEE')).select("Day of Rent")\
.groupBy("Day of Rent").count()\
.orderBy('count',ascending=False).show()

+-----------+-----+
|Day of Rent|count|
+-----------+-----+
|   Thursday|35197|
|    Tuesday|31813|
|  Wednesday|29641|
|     Friday|28981|
|     Monday|26852|
|     Sunday|15523|
|   Saturday|15405|
+-----------+-----+



### The average number of rentals for individual months

In [56]:
df.withColumn("AVG Rentals / Month",date_format('start_time','MMMM')).select("AVG Rentals / Month")\
.groupBy("AVG Rentals / Month").count().orderBy('count').show()

+-------------------+------+
|AVG Rentals / Month| count|
+-------------------+------+
|           February|183412|
+-------------------+------+



### RDD dataDaily containing data aggregated down to the level of the day. Each day of the year (element in RDD) is to contain the following information

- 'date' : Date
- 'avg_duration_sec' : Average duration of rentals for the day
- 'n_trips' : number of trips on a given day
- 'n_bikes' : number of rentals on a given day
- 'n_subscriber' : number of rentals made by subscribers on a given day

In [76]:
df.withColumn("date", date_format('start_time', 'D')).groupBy('date')\
.agg(\
                     F.expr("avg(duration_sec)").alias("avg_duration_sec"),\
                     F.expr("count(bike_id)").alias("n_trips"),countDistinct("bike_id").alias("n_bikes"),\
                     count(when(column("user_type") == "Subscriber", True)).alias("n_subscriber"))\
                     .withColumn('avg_duration_sec', F.round(F.col('avg_duration_sec'), 0).cast('integer'))\
                     .orderBy("date",ascending=True).show(5)


+----+----------------+-------+-------+------------+
|date|avg_duration_sec|n_trips|n_bikes|n_subscriber|
+----+----------------+-------+-------+------------+
|  32|             690|   6133|   1873|        5553|
|  33|             795|   3231|   1227|        2731|
|  34|             746|   2841|   1163|        2461|
|  35|             650|   5491|   1781|        5042|
|  36|             663|   8486|   2177|        7799|
+----+----------------+-------+-------+------------+
only showing top 5 rows



### Number of unique combinations of stations (x -> y == y -> x) for the day

In [78]:
df2 = df.withColumn("sl2", when(df['end_station_id'] < df['start_station_id'],  df['end_station_id'])\
.otherwise(df['start_station_id']))\
.withColumn("el2", when(df['end_station_id'] > df['start_station_id'],  df['end_station_id'])\
.otherwise(df['start_station_id']))\
.drop("start_station_id", "end_station_id")

df2_agg = df2.withColumn("date", date_format('start_time', 'D'))\
.groupBy('date').agg(collect_set(struct(column('sl2'), column('el2'))).alias("n_routes")) 

df2_agg.select("date", size("n_routes")).alias("n_routes")\
.orderBy("date",ascending=True).show(5)

+----+--------------+
|date|size(n_routes)|
+----+--------------+
|  32|          3232|
|  33|          2043|
|  34|          1789|
|  35|          2930|
|  36|          4039|
+----+--------------+
only showing top 5 rows

