In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession\
    .builder\
    .appName("Bikes")\
    .config("spark.some.config.option","some-value")\
    .getOrCreate()

In [5]:
df_main = spark.read.format("csv").option("header",True).load(r'C:\Users\Bibhash Dutta\Documents\Pyspark First Project\nyc_city_bike_trip_data.csv')

df_main.columns

['ride_id',
 'rideable_type',
 'started_at',
 'ended_at',
 'start_station_name',
 'start_station_id',
 'end_station_name',
 'end_station_id',
 'start_lat',
 'start_lng',
 'end_lat',
 'end_lng',
 'member_casual']

In [18]:
df_light = df_main.select('ride_id','started_at','ended_at').orderBy('started_at')
df_light.show(5)

+----------------+----------------+----------------+
|         ride_id|      started_at|        ended_at|
+----------------+----------------+----------------+
|72BD34188D6A5ABF|01-07-2023 00:00|01-07-2023 00:10|
|61A144A6E68330E5|01-07-2023 00:00|01-07-2023 00:44|
|35FA7265F270475B|01-07-2023 00:01|01-07-2023 00:03|
|FB94D9A8B03CCB64|01-07-2023 00:01|01-07-2023 00:04|
|E571057A99230120|01-07-2023 00:01|01-07-2023 00:09|
+----------------+----------------+----------------+
only showing top 5 rows



In [24]:
from pyspark.sql.functions import to_date
from pyspark.sql.types import DateType
df_light = df_light.withColumn('date',to_date(df_light.started_at,'dd-MM-yyyy HH:mm'))
df_light.show(5)

+----------------+----------------+----------------+----------+
|         ride_id|      started_at|        ended_at|      date|
+----------------+----------------+----------------+----------+
|72BD34188D6A5ABF|01-07-2023 00:00|01-07-2023 00:10|2023-07-01|
|61A144A6E68330E5|01-07-2023 00:00|01-07-2023 00:44|2023-07-01|
|E571057A99230120|01-07-2023 00:01|01-07-2023 00:09|2023-07-01|
|35FA7265F270475B|01-07-2023 00:01|01-07-2023 00:03|2023-07-01|
|FB94D9A8B03CCB64|01-07-2023 00:01|01-07-2023 00:04|2023-07-01|
+----------------+----------------+----------------+----------+
only showing top 5 rows



In [31]:
max_value = df_light.agg({'date':'max'}).collect()[0][0]
min_value = df_light.agg({'date':'min'}).collect()[0][0]

In [32]:
print(max_value,"  ",min_value)

2023-07-31    2023-07-01


In [34]:
df_light_agg = df_light.select('date').groupBy('date').count().orderBy('date')
df_light_agg.show()

+----------+-----+
|      date|count|
+----------+-----+
|2023-07-01| 3123|
|2023-07-02| 2331|
|2023-07-03| 2892|
|2023-07-04| 3095|
|2023-07-05| 3589|
|2023-07-06| 3490|
|2023-07-07| 3747|
|2023-07-08| 3817|
|2023-07-09| 2752|
|2023-07-10| 3468|
|2023-07-11| 3837|
|2023-07-12| 3753|
|2023-07-13| 3731|
|2023-07-14| 3203|
|2023-07-15| 3413|
|2023-07-16| 1687|
|2023-07-17| 3443|
|2023-07-18| 3361|
|2023-07-19| 3631|
|2023-07-20| 3836|
+----------+-----+
only showing top 20 rows



In [37]:
import plotly.express as px
import pandas as pd
fig = px.scatter(df_light_agg.toPandas(),x = 'date',y = 'count')
fig.show()

In [39]:
df_main.columns

['ride_id',
 'rideable_type',
 'started_at',
 'ended_at',
 'start_station_name',
 'start_station_id',
 'end_station_name',
 'end_station_id',
 'start_lat',
 'start_lng',
 'end_lat',
 'end_lng',
 'member_casual']

In [41]:
df_stations = df_main.select('ride_id',
 'started_at',
 'start_station_name',
 'end_station_name',
 'start_lat',
 'start_lng',
 'end_lat',
 'end_lng')

In [42]:
df_stations_combinations = df_stations.select('start_station_name',
 'end_station_name',
 'start_lat',
 'start_lng',
 'end_lat',
 'end_lng').groupBy('start_station_name',
 'end_station_name',
 'start_lat',
 'start_lng',
 'end_lat',
 'end_lng').count().orderBy('count',ascending = False)


In [44]:
df_stations_combinations.show()

+--------------------+--------------------+-----------+------------+-----------+------------+-----+
|  start_station_name|    end_station_name|  start_lat|   start_lng|    end_lat|     end_lng|count|
+--------------------+--------------------+-----------+------------+-----------+------------+-----+
|South Waterfront ...|South Waterfront ...|40.73698222|-74.02778059|40.73698222|-74.02778059|  430|
|12 St & Sinatra Dr N|South Waterfront ...|40.75060414|-74.02402014|40.73698222|-74.02778059|  395|
|12 St & Sinatra Dr N|12 St & Sinatra Dr N|40.75060414|-74.02402014|40.75060414|-74.02402014|  373|
|       Grove St PATH|    Marin Light Rail|   40.71941|   -74.04309|40.71458404|-74.04281706|  370|
|    Marin Light Rail|       Grove St PATH|40.71458404|-74.04281706|   40.71941|   -74.04309|  348|
|South Waterfront ...|12 St & Sinatra Dr N|40.73698222|-74.02778059|40.75060414|-74.02402014|  329|
|  Liberty Light Rail|  Liberty Light Rail| 40.7112423| -74.0557013| 40.7112423| -74.0557013|  324|


In [50]:
df_stations_combinations = df_stations_combinations.where(df_stations_combinations['start_station_name']
                                                          != df_stations_combinations['end_station_name']).orderBy('count',ascending = False)

df_stations_combinations.select('start_station_name',
 'end_station_name',
 'start_lat',
 'start_lng',
 'end_lat',
 'end_lng','count').show(5)



+--------------------+--------------------+-----------+------------+-----------+------------+-----+
|  start_station_name|    end_station_name|  start_lat|   start_lng|    end_lat|     end_lng|count|
+--------------------+--------------------+-----------+------------+-----------+------------+-----+
|12 St & Sinatra Dr N|South Waterfront ...|40.75060414|-74.02402014|40.73698222|-74.02778059|  395|
|       Grove St PATH|    Marin Light Rail|   40.71941|   -74.04309|40.71458404|-74.04281706|  370|
|    Marin Light Rail|       Grove St PATH|40.71458404|-74.04281706|   40.71941|   -74.04309|  348|
|South Waterfront ...|12 St & Sinatra Dr N|40.73698222|-74.02778059|40.75060414|-74.02402014|  329|
|       Hamilton Park|       Grove St PATH|40.72759597|-74.04424731|   40.71941|   -74.04309|  246|
+--------------------+--------------------+-----------+------------+-----------+------------+-----+
only showing top 5 rows



In [51]:
print('count of distinct journies : ',df_stations_combinations.count())

count of distinct journies :  45865


In [67]:
df_unique_stations = df_main.select('start_station_name').distinct().union(df_main.select('end_station_name').distinct())
df_unique_stations.count()


296

In [68]:
df_unique_stations.show()

+--------------------+
|  start_station_name|
+--------------------+
|      Lafayette Park|
|      Columbus Drive|
|    Marin Light Rail|
|Hoboken Terminal ...|
|      Riverview Park|
|     6 Ave & W 34 St|
|        Jersey & 3rd|
|         Paulus Hook|
|      Jackson Square|
|        Brunswick St|
|     Adams St & 2 St|
|Hoboken Terminal ...|
|        Glenwood Ave|
|         Astor Place|
|  Willow Ave & 12 St|
|         Exchange Pl|
|  Marshall St & 2 St|
|Warren St & W Bro...|
|     2 St & Park Ave|
|Southwest Park - ...|
+--------------------+
only showing top 20 rows

