# MAST30034_Applied Data Science_Project1

## Import Libraries

In [3]:
from pyspark.sql.functions import col, sum
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sbs
import geopandas as gpd
import folium
import os

In [4]:
# Create a spark session
spark = (
SparkSession.builder.appName("ADS project 1")
.config("spark.sql.repl.eagerEval.enabled", True)
.config("spark.driver.memory","4G")
.config("spark.executor.memory","4G")
.config("spark.sql.parquet.cacheMetadata", "true")
.config("spark.sql.session.timeZone", "Etc/UTC")
.getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/17 13:13:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read data

Read the raw data of yellow taxi and green taxi

In [5]:
taxi_yellow_raw = spark.read.parquet('../data/raw/NYCTLC_data/yellow_taxi/2023/*.parquet')
taxi_green_raw = spark.read.parquet('../data/raw/NYCTLC_data/green_taxi/2023/*.parquet')

                                                                                

Read the raw data of cityBike

In [6]:
cityBike_data_dir = '../data/raw/external_data/cityBike_data/citybike_raw_data'
cityBike_file = []

# Read all the csv file in the cityBike_data_dir
for root, dirs, files in os.walk(cityBike_data_dir):
    for file in files:
        if file.endswith('.csv'):
            file_path = os.path.join(root, file)
            df_spark = spark.read.csv(file_path, header=True, inferSchema=True)
            cityBike_file.append(df_spark)

# Merge all the data together
citybike_raw_data = cityBike_file[0]
for data in cityBike_file[1:]:
    citybike_raw_data = citybike_raw_data.union(data)

                                                                                

Read the raw weather data of New York

In [7]:
weather_raw = spark.read.csv('../data/raw/external_data/weather_raw_data.csv',header=True)

## Data Exploration

### Cursory look at the data

In [8]:
taxi_yellow_raw.show()

[Stage 55:>                                                         (0 + 1) / 1]

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2023-10-01 00:16:44|  2023-10-01 00:16:49|              1|          0.0|         1|                 N|         168|         168|           2|        3.0|  1.0|    0.5|       0.

                                                                                

In [9]:
taxi_green_raw.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2023-10-01 00:57:33|  2023-10-01 01:07:58|                 N|         1|         166|          74|              1|         1.45|       12.1|  1.0|    0.

In [10]:
citybike_raw_data.show()

+----------------+-------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------+-----------------+------------------+------------------+------------------+-------------+
|         ride_id|rideable_type|          started_at|            ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|        start_lat|         start_lng|           end_lat|           end_lng|member_casual|
+----------------+-------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------+-----------------+------------------+------------------+------------------+-------------+
|5772778972800B87|electric_bike|2023-12-20 06:18:...|2023-12-20 06:24:...|     W 41 St & 8 Ave|         6602.03|Madison Ave & E 2...|       6131.12|      40.75633049|     -73.989260077|40.742684734875695|-73.98671329021454|       member|
|32DD834BEA2ABA5B| classic_bike|2023-12-21 16:15

In [11]:
weather_raw.show()

+---------+-------------------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+--------------+-----------+-------+----------+----------+-----------+--------------------+
|     name|           datetime|temp|feelslike| dew|humidity|precip|precipprob|preciptype|snow|snowdepth|windgust|windspeed|winddir|sealevelpressure|cloudcover|visibility|solarradiation|solarenergy|uvindex|severerisk|conditions|       icon|            stations|
+---------+-------------------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+--------------+-----------+-------+----------+----------+-----------+--------------------+
| new York|2023-06-01T00:00:00|16.2|     16.2|12.9|   80.58|     0|         0|      NULL|   0|        0|     4.7|      3.1|    264|          1021.3|       0.4|       9.9|             0|          0|      0|        10| 

### view the shape of data

In [12]:
print(taxi_yellow_raw.count())
print(len(taxi_yellow_raw.columns))



22123840
19


                                                                                

In [13]:
print(taxi_green_raw.count())
print(len(taxi_green_raw.columns))

447430
20


In [14]:
print(citybike_raw_data.count())
print(len(citybike_raw_data.columns))



23293647
13


                                                                                

In [15]:
print(weather_raw.count())
print(len(weather_raw.columns))

5137
24


### view the sturcture of data

In [16]:
taxi_yellow_raw.describe()

24/08/17 13:15:18 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

summary,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
count,22123840.0,21243149.0,22123840.0,21243149.0,21243149,22123840.0,22123840.0,22123840.0,22123840.0,22123840.0,22123840.0,22123840.0,22123840.0,22123840.0,22123840.0,21243149.0,21243149.0
mean,1.744783681313913,1.3768554746756236,4.138134361846366,1.6973556039172912,,164.74984622922602,163.72607680221878,1.1811978842732545,19.91599391425425,1.535119246478009,0.4843583252274474,3.55351801314934,0.6113420671984153,0.9780966956911208,28.91803135752539,2.2573809090168315,0.1557659106943137
stddev,0.4408131219072949,0.8934144716666255,231.6132794170355,7.753205957530598,,63.95315301157141,69.83725967712695,0.5705591326111771,98.40060717874314,2.818654054245507,0.1140290722447579,4.2634282098855785,2.2607831981883404,0.206485378866258,99.5766922890574,0.8124008194586704,0.5077763506757014
min,1.0,0.0,0.0,1.0,N,1.0,1.0,0.0,-1087.3,-39.17,-0.5,-330.88,-91.3,-1.0,-1094.05,-2.5,-1.75
max,6.0,9.0,345729.44,99.0,Y,265.0,265.0,5.0,386983.63,10002.5,52.09,4174.0,665.56,1.0,386987.63,2.75,1.75


In [17]:
taxi_green_raw.describe()

                                                                                

summary,VendorID,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
count,447430.0,414713,414713.0,447430.0,447430.0,414713.0,447430.0,447430.0,447430.0,447430.0,447430.0,447430.0,0.0,447430.0,447430.0,414713.0,414696.0,414713.0
mean,1.8715463871443576,,1.1847639210731278,97.84006660259703,139.69795051739936,1.2975841123861562,23.48522942136199,19.170112151621293,0.884228259169032,0.5645509912165031,2.4059156069105376,0.2752651364459159,,0.9884958541000868,24.82819296873185,1.34397523106341,1.040195709628258,0.7511785258721091
stddev,0.3345945791769592,,1.0326078083461323,59.34216491647937,76.35085281014177,0.933896788145398,1159.2878509104833,21.248198466366237,1.3596291108231942,0.3860758614158047,3.4055874778163733,1.4417650143872411,,0.1293566613648961,22.981226155832992,0.5059270541796154,0.1964181956642563,1.2251731263569072
min,1.0,N,1.0,1.0,1.0,0.0,0.0,-500.0,-5.0,-0.5,-22.2,0.0,,-1.0,-501.0,1.0,1.0,-2.75
max,2.0,Y,99.0,265.0,265.0,9.0,278990.28,4003.0,10.0,4.25,224.09,51.0,,1.0,4004.5,5.0,2.0,2.75


In [18]:
citybike_raw_data.describe()

                                                                                

summary,ride_id,rideable_type,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
count,23293647,23293647,23276461,23276461,23221842,23221842,23293647.0,23293647.0,23276442.0,23276442.0,23293647
mean,Infinity,,,5993.860117316656,,5988.479185889627,40.73961409901128,-73.97228230846285,40.73941025445844,-73.97232270144664,
stddev,,,,1135.8469612211168,,1134.8848996025342,0.0409484765336578,0.0283107943494001,0.0507777387879373,0.0578245194499352,
min,00000210330DABDE,classic_bike,1 Ave & E 110 St,2733.03,1 Ave & E 110 St,190 Morgan,40.57541226666667,-74.072216153,-37.26,-173.37,casual
max,FFFFFEDA1AFF4882,electric_bike,York St & Marin Blvd,SYS039,York St & Marin Blvd,SYS039,40.894158244,-73.79639436666666,41.17,0.0,member


In [55]:
weather_raw.describe()

                                                                                

summary,name,datetime,temp,feelslike,dew,humidity,precip,precipprob,preciptype,snow,snowdepth,windgust,windspeed,winddir,sealevelpressure,cloudcover,visibility,solarradiation,solarenergy,uvindex,severerisk,conditions,icon,stations
count,5137,5137,5137.0,5137.0,5137.0,5137.0,5137.0,5137.0,535,5137.0,5137.0,5137.0,5137.0,5137.0,5137.0,5137.0,5137.0,5137.0,5137.0,5137.0,5137.0,5137,5137,5137
mean,,,17.932003114658606,17.662468366751295,11.330309519174564,67.13228927389541,0.0194527934592174,10.414638894296282,,0.0,0.0,15.043955616118437,6.177165660891565,185.7264940626825,1015.7618649016936,41.3217636752964,9.365115826358489,133.6642008954643,0.4812536499902619,1.3243138018298617,12.046330543118552,,,
stddev,,,7.70665042698376,8.66876828589003,8.025821727999638,15.939477311695102,0.1972492092395847,30.54800885247808,,0.0,0.0,9.47125934945806,3.8918691585807705,116.79751456648928,7.817804079025535,44.39835105152495,1.5059480172640691,234.7926679291073,0.8454206779821207,2.3562112688071486,7.893308383516578,,,
min,new York,2023-06-01 00:00:00,-0.1,-0.1,-0.1,28.32,0.0,0.0,rain,0.0,0.0,1.1,0.0,0.0,1000.0,0.0,1.2,0.0,0.0,0.0,10.0,Clear,clear-day,"72055399999,KJRB,..."
max,new York,2023-12-31 23:00:00,9.9,9.9,9.9,99.71,9.126,100.0,rain,0.0,0.0,9.2,9.9,99.0,999.9,99.9,9.9,992.0,4.2,9.0,8.0,"Rain, Partially c...",rain,"72505394728,KLGA,..."


### Checking for missing values

In [20]:
taxi_yellow_raw.select([sum(col(c).isNull().cast("int")).alias(c) for c in taxi_yellow_raw.columns]).show()



+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|         880691|            0|    880691|            880691|           0|           0|           0|          0|    0|      0|         

                                                                                

In [21]:
taxi_green_raw.select([sum(col(c).isNull().cast("int")).alias(c) for c in taxi_green_raw.columns]).show()



+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       0|                   0|                    0|             32717|     32717|           0|           0|          32717|            0|          0|    0|      

                                                                                

In [22]:
citybike_raw_data.select([sum(col(c).isNull().cast("int")).alias(c) for c in citybike_raw_data.columns]).show()



+-------+-------------+----------+--------+------------------+----------------+----------------+--------------+---------+---------+-------+-------+-------------+
|ride_id|rideable_type|started_at|ended_at|start_station_name|start_station_id|end_station_name|end_station_id|start_lat|start_lng|end_lat|end_lng|member_casual|
+-------+-------------+----------+--------+------------------+----------------+----------------+--------------+---------+---------+-------+-------+-------------+
|      0|            0|         0|       0|             17186|           17186|           71805|         71805|        0|        0|  17205|  17205|            0|
+-------+-------------+----------+--------+------------------+----------------+----------------+--------------+---------+---------+-------+-------+-------------+



                                                                                

In [23]:
weather_raw.select([sum(col(c).isNull().cast("int")).alias(c) for c in weather_raw.columns]).show()

+----+--------+----+---------+---+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+--------------+-----------+-------+----------+----------+----+--------+
|name|datetime|temp|feelslike|dew|humidity|precip|precipprob|preciptype|snow|snowdepth|windgust|windspeed|winddir|sealevelpressure|cloudcover|visibility|solarradiation|solarenergy|uvindex|severerisk|conditions|icon|stations|
+----+--------+----+---------+---+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+--------------+-----------+-------+----------+----------+----+--------+
|   0|       0|   0|        0|  0|       0|     0|         0|      4602|   0|        0|       0|        0|      0|               0|         0|         0|             0|          0|      0|         0|         0|   0|       0|
+----+--------+----+---------+---+--------+------+----------+----------+----+---------+--------+----

## Pre-Processing

### combine the data of Yellow taxi and Green Taxi

In [24]:
taxi_yellow_raw.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [25]:
taxi_green_raw.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [26]:
# Unified Yellow taxi and Green taxi features
taxi_green_raw = taxi_green_raw.withColumnRenamed("lpep_pickup_datetime" ,"tpep_pickup_datetime")
taxi_green_raw = taxi_green_raw.withColumnRenamed("lpep_dropoff_datetime" ,"tpep_dropoff_datetime")
taxi_green_raw = taxi_green_raw.drop('ehail_fee').drop('trip_type')
taxi_yellow_raw = taxi_yellow_raw.drop('Airport_fee')

In [27]:
# Reorder the columns and join the two data sets
common_columns = ['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

taxi_yellow_raw = taxi_yellow_raw.select(common_columns)
taxi_green_raw = taxi_green_raw.select(common_columns)
taxi_raw_data = taxi_yellow_raw.union(taxi_green_raw)

In [28]:
taxi_raw_data.count()

22571270

In [29]:
# Remove the duplicated data of taxi
taxi_raw_data.dropDuplicates()

24/08/17 13:34:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/08/17 13:34:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/08/17 13:34:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/08/17 13:34:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/08/17 13:35:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/08/17 13:35:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/08/17 13:35:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/08/17 13:35:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2023-10-01 00:29:26,2023-10-01 00:46:19,2,6.1,1,N,161,160,2,26.8,3.5,0.5,0.0,6.94,1.0,38.74,2.5
1,2023-10-01 00:48:56,2023-10-01 00:56:21,2,1.4,1,N,186,158,2,8.6,3.5,0.5,0.0,0.0,1.0,13.6,2.5
2,2023-10-01 00:53:01,2023-10-01 01:04:07,3,2.61,1,N,79,170,1,14.9,1.0,0.5,3.98,0.0,1.0,23.88,2.5
1,2023-10-01 00:25:11,2023-10-01 00:33:43,1,1.6,1,N,79,233,1,9.3,3.5,0.5,2.85,0.0,1.0,17.15,2.5
2,2023-10-01 00:06:02,2023-10-01 00:11:19,1,0.63,1,N,234,107,1,6.5,1.0,0.5,3.45,0.0,1.0,14.95,2.5
2,2023-10-01 00:16:59,2023-10-01 00:32:28,1,2.72,1,N,79,162,4,-17.0,-1.0,-0.5,0.0,0.0,-1.0,-22.0,-2.5
1,2023-10-01 00:36:26,2023-10-01 00:52:33,0,2.6,1,N,68,164,2,15.6,3.5,0.5,0.0,0.0,1.0,20.6,2.5
1,2023-10-01 00:56:37,2023-10-01 01:18:54,3,4.8,1,N,79,236,1,24.7,3.5,0.5,5.9,0.0,1.0,35.6,2.5
2,2023-10-01 00:24:28,2023-10-01 00:35:48,1,2.07,1,N,249,170,1,12.8,1.0,0.5,3.56,0.0,1.0,21.36,2.5
2,2023-10-01 00:42:07,2023-10-01 00:46:11,3,0.71,1,N,90,186,1,5.8,1.0,0.5,2.7,0.0,1.0,13.5,2.5


In [32]:
print(taxi_raw_data.count())
print(len(taxi_raw_data.columns))

22571270
18


In [31]:
# save the parquet file which is combination of yellow taxi data and green taxi data
taxi_raw_data.write.mode('overwrite').parquet('../data/raw/NYCTLC_data/taxi_combined_raw')

                                                                                

### Unify weather time data with taxi time data and Citybike time data

In [33]:
weather_raw = weather_raw.withColumn("datetime", date_format(weather_raw['datetime'], 'yyyy-MM-dd HH:mm:ss'))

### Data Wrangling for taxi_raw_data

In [41]:
# Remove lines with passenger_count less than 0 or more than 6
taxi_data = taxi_raw_data.filter((col('passenger_count') > 0) & (col('passenger_count') <= 6))
# Remove lines with Rate Code ID greater than 6
taxi_data = taxi_data.filter((col('RateCodeID') <= 6))
# Remove lines with trip diatance smaller than 0
taxi_data = taxi_data.filter(col('trip_distance') > 0)
# Remove lines with payment type is 0
taxi_data = taxi_data.filter(col('payment_type') > 0)
# Remove the fees-related columns smaller than 0 
columns_to_check = ['fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
                    'improvement_surcharge', 'total_amount']
for column in columns_to_check:
    taxi_data = taxi_data.filter(col(column) >= 0)

In [47]:
print(taxi_data.count())
print(len(taxi_data.columns))



20713300
18


                                                                                

In [45]:
print((taxi_raw_data.count()-taxi_data.count())/taxi_raw_data.count()*100)

                                                                                

8.231570487615452


### Data Wrangling for cityBike_raw_data

In [53]:
# remove the data which not in New York
lat_min, lat_max = 40.4774, 40.9176
lng_min, lng_max = -74.2591, -73.7004

citybike_data = citybike_raw_data.filter(
    (col('start_lat').between(lat_min, lat_max)) &
    (col('start_lng').between(lng_min, lng_max)) &
    (col('end_lat').between(lat_min, lat_max)) &
    (col('end_lng').between(lng_min, lng_max))
)

In [54]:
(citybike_raw_data.count()-citybike_data.count())/citybike_raw_data.count()

                                                                                

0.0007407169860520338

## Save data Which after preprocessing

In [59]:
taxi_data.write.parquet('../data/data_after_preprocessing/taxi')
weather_raw.write.parquet('../data/data_after_preprocessing/weather')
citybike_data.write.parquet('../data/data_after_preprocessing/citybike')

                                                                                