In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


In [2]:
# Set Up Spark Session
spark = SparkSession.builder.appName("Joy Ride").master("local[6]").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/08 23:34:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [19]:
# Read data
path = "./data/202407-citibike-tripdata_5.csv"
df = spark.read.csv(path, header=True)
df.persist() # Only used for testing purposes

25/01/08 23:38:46 WARN CacheManager: Asked to cache already cached data.


DataFrame[Unnamed: 0: string, ride_id: string, rideable_type: string, started_at: string, ended_at: string, start_station_name: string, start_station_id: string, end_station_name: string, end_station_id: string, start_lat: string, start_lng: string, end_lat: string, end_lng: string, member_casual: string]

In [20]:
df.take(5)

[Row(Unnamed: 0='4000000', ride_id='557426B1CE207702', rideable_type='electric_bike', started_at='2024-07-17 19:30:15.471', ended_at='2024-07-17 19:34:22.646', start_station_name='Sullivan St & Washington Sq', start_station_id='5721.01', end_station_name='E 12 St & 3 Ave', end_station_id='5788.12', start_lat='40.73047747', start_lng='-73.99906065', end_lat='40.73223272', end_lng='-73.98889957', member_casual='member'),
 Row(Unnamed: 0='4000001', ride_id='A60E366FEE8B51E8', rideable_type='electric_bike', started_at='2024-07-15 19:17:52.523', ended_at='2024-07-15 19:25:24.399', start_station_name='Sullivan St & Washington Sq', start_station_id='5721.01', end_station_name='Forsyth St & Grand St', end_station_id='5382.07', start_lat='40.73047747', start_lng='-73.99906065', end_lat='40.71779817737835', end_lng='-73.99316132068634', member_casual='member'),
 Row(Unnamed: 0='4000002', ride_id='ACA30207F80B5DF9', rideable_type='classic_bike', started_at='2024-07-27 13:34:48.531', ended_at='202

# BikeRides Table Attributes
- __ride_id__  - unique ID for ride `PRIMARY KEY` `CHAR`
- __bike_type__ - type of bike used (E for electric, M for mechanical) `CHAR(1)`
- __start_date__ - Date ride started `DATE`
- __start_time__ - Time ride started `TIME`
- __end_date__ - Date ride ended `DATE`
- __end_time__ - Time ride ended `TIME`
- __strt_station_id__ - ID for start station `FOREIGN KEY` `CHAR`
- __end_station_id__ - ID for end station `FOREIGN KEY`
- __member_type__ - Membership type (Member or Casual) `CHAR`

In [34]:
# Set bike_type to be E or M
bike_type_mapper = F.udf(lambda val: 'E' if val=='electric_bike' else ('M' if val=='classic_bike' else None))
df = df.withColumn('bike_type', bike_type_mapper('rideable_type'))
# Get date frome start and end at fields
df = df.withColumn('start_date', F.regexp_extract('started_at', '(\\d{2}-\\d{2}-\\d{2})', 1))
df = df.withColumn('end_date', F.regexp_extract('ended_at', '(\\d{2}-\\d{2}-\\d{2})', 1))
# Get time from start and end at fields
df = df.withColumn('start_time', F.regexp_extract('started_at', '(\\d{2}:\\d{2}:\\d{2})', 1))
df = df.withColumn('end_time', F.regexp_extract('ended_at', '(\\d{2}:\\d{2}:\\d{2})', 1))

In [35]:
df.take(5)

[Row(Unnamed: 0='4000000', ride_id='557426B1CE207702', rideable_type='electric_bike', started_at='2024-07-17 19:30:15.471', ended_at='2024-07-17 19:34:22.646', start_station_name='Sullivan St & Washington Sq', start_station_id='5721.01', end_station_name='E 12 St & 3 Ave', end_station_id='5788.12', start_lat='40.73047747', start_lng='-73.99906065', end_lat='40.73223272', end_lng='-73.98889957', member_casual='member', bike_type='E', start_date='24-07-17', end_date='24-07-17', start_time='19:30:15', end_time='19:34:22'),
 Row(Unnamed: 0='4000001', ride_id='A60E366FEE8B51E8', rideable_type='electric_bike', started_at='2024-07-15 19:17:52.523', ended_at='2024-07-15 19:25:24.399', start_station_name='Sullivan St & Washington Sq', start_station_id='5721.01', end_station_name='Forsyth St & Grand St', end_station_id='5382.07', start_lat='40.73047747', start_lng='-73.99906065', end_lat='40.71779817737835', end_lng='-73.99316132068634', member_casual='member', bike_type='E', start_date='24-07-1