# Exploratory Data Analysis - NYC Green Taxi Commission

## Execution Time breakup for 80 Million Rows

1. Average, Monthly, Weekly and Hourly Pickups ~ 15-20 minutes each
2. Hourly Pickups for every month ~ 2.5 hours
3. HeatMap ~ 15 minutes

Total time of execution ~ 4 hours

## Objective

- As a startup who wants to launch a new taxi service in New York City, we plan to do Exploratory Data Analysis on the New York City Green Taxi Commision dataset to analyze the taxi pickups based on monthly, weekly and hourly basis to gain insights related to deployments of taxis.
- This dataset has 80 million rows in parquet format (around 2GB size) and is present on Microsoft Azure. So we have extracted it directly from cloud instead of storing the dataset localy in our system.

In [None]:
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import datetime
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from collections import defaultdict, OrderedDict
import folium
from folium.plugins import HeatMapWithTime
from datetime import datetime
import plotly.figure_factory as ff
import datashader as ds, colorcet as cc
import pandas as pd

In [None]:
spark = SparkSession.builder.master("local[*]")\
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.3.2')\
        .appName('NYC Taxi Data')\
        .getOrCreate()

In [None]:
ps.set_option('compute.ops_on_diff_frames', True)

## Reading data from azure server

In [None]:
# Azure storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "green"
blob_sas_token = r""

# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Remote blob path: ' + wasbs_path)

In [None]:
ny_trips = spark.read.parquet(wasbs_path)

In [None]:
def dropEmptyColumns(dataframe):
    cols = ('puLocationId','doLocationId', 'ehailFee')
    dataframe = dataframe.drop(*cols)
    return dataframe

In [None]:
trips_in_18 = dropEmptyColumns(ny_trips)

## Conversion to Pyspark Pandas dataframe

In [None]:
pickups_data = trips_in_18.to_pandas_on_spark()

In [None]:
def setDataRecords(dataFrame):
    dataFrame['Weekday'] = dataFrame['lpepPickupDatetime'].dt.day_name()
    dataFrame['Hour'] = dataFrame['lpepPickupDatetime'].dt.hour
    dataFrame['Month'] = dataFrame['lpepPickupDatetime'].dt.month_name()
    dataFrame['Day'] = dataFrame['lpepPickupDatetime'].dt.day
    dataFrame['Year'] = dataFrame['lpepPickupDatetime'].dt.year
    return dataFrame

In [None]:
pickups_data = setDataRecords(pickups_data)

In [None]:
# Mapping the Months of the Year
month_map = {
    1: 'January',
    2: 'February',
    3: 'March',
    4: 'April',
    5: 'May',
    6: 'June', 
    7: 'July',
    8: 'August',
    9: 'September',
    10: 'October',
    11: 'November',
    12: 'December'
}

pickups_data['Month'] = pickups_data['Month'].replace(month_map)

# Mapping the Days of the Week

weekday_map = {
    0: 'Monday',
    1: 'Tuesday',
    2: 'Wednesday',
    3: 'Thursday', 
    4: 'Friday', 
    5: 'Saturday',
    6: 'Sunday'
}
pickups_data['Weekday'] = pickups_data['Weekday'].replace(weekday_map)

# Exploratory Data Analysis

## Average Pickups per Day

In [None]:
num_pickups = pickups_data.shape[0]
num_day = len(pickups_data[['Year','Month','Day']].drop_duplicates())
daily_avg = np.round(num_pickups/num_day, 0)

In [None]:
stats_raw = 'Number of Pickups: {} \nAvg Daily Pickups: {}'
print(stats_raw.format(num_pickups, daily_avg))

Above values determines the daily average pickup from the entire data. With the help of this, we can easily figure out the number of taxi deployments on a daily basis.

## Monthly Taxi Pickups

In [None]:
month = list(month_map.values())

In [None]:
monthly_pickups = pickups_data.groupby('Month').sum().reindex(month)
fig = px.bar(monthly_pickups, x = monthly_pickups.index.to_numpy(), y=monthly_pickups['passengerCount'].to_numpy())

# Setting Graph Details
fig.update_layout(
    title="Monthly Pickups",
    xaxis_title="Months",
    yaxis_title="No of Passengers",
    legend_title="Legend Title",
    font=dict(
        size=14,
        color="RebeccaPurple"
    )
)

From the monthly pickup graphs it has been observed that pickup remains constant throughout the year. However during the summer we can see that there’s a downward trend until september. The downward trend is attributed to the fact that people move home during summers and hence the necessity for commuting is lower than other months.

## Weekday Taxi Pickups

In [None]:
day = list(weekday_map.values())


In [None]:
daily_pickups = pickups_data.groupby('Weekday').sum().reindex(day)
fig = px.bar(daily_pickups, x = daily_pickups.index.to_numpy(), y=daily_pickups['passengerCount'].to_numpy())
fig.update_layout(
    title="Weekday Pickups",
    xaxis_title="Days of the week",
    yaxis_title="No of Passengers",
    legend_title="Legend Title",
    font=dict(
        size=14,
        color="RebeccaPurple"
    )
)

From the weekly pickup graphs we can see that there’s a linear increase in the demand for cabs from Monday to Saturday with the peaks being on Friday and Saturdays, and deep in demand on Sundays. 

## Hourly Taxi Pickups

In [None]:
hour = pickups_data.groupby('Hour').sum().sort_index()
fig = px.bar(hour, x = hour.index.to_numpy(), y=hour['passengerCount'].to_numpy())
fig.update_layout(
    title="Hourly Pickups",
    xaxis_title="Hours",
    yaxis_title="No of Passengers",
    legend_title="Legend Title",
    font=dict(
        size=14,
        color="RebeccaPurple"
    )
)

From the Hourly Pickup graphs it is evident that there’s an increase in the demands for taxis in the working hours viz. From 8am to 11am. 
Hence we conclude that we need more deployments of taxis in the peak office hours of the day and slowly reduce it as the day progresses. 


## Weekday Pickups every Month

In [None]:
monthly_weekdays = pickups_data.groupby('Month')['Weekday'].value_counts().unstack()
monthly_weekdays = monthly_weekdays.reindex(month)
monthly_weekdays.loc[month_map.values(),weekday_map.values()].plot(kind='bar')

It is evident that, for all months the weekly pickups trend remain the same

## Comparing Hourly Pickups for each months

In [None]:
var_name = 'Month'
var_values = list(month_map.values())
fig = make_subplots(rows=4, cols=3, subplot_titles=var_values)

In [None]:
r = 1
c = 1
for i in range(0,12):
    value_df = pickups_data[pickups_data[var_name] == var_values[i]].groupby('Hour').sum().sort_index()
    fig.add_trace(go.Bar(x = value_df.index.to_numpy(), y = value_df['passengerCount'].to_numpy()), row=r, col=c, )
    fig.update_xaxes(title_text="Hour", row=r, col=c)
    fig.update_yaxes(title_text="Proportion of Pickups", row=r, col=c)
    if(c%3 == 0):
        c = 1
        r += 1
    else:
        c += 1

fig.update_layout(title_text="Hourly Pickups Every Month wise", height=800, width=1500)

From the Hourly Pickup graphs it is evident that there’s an increase in the demands for taxis in the working hours viz. From 8am to 11am. This trend remains the same across all the 12 months. Hence we conclude that we need more deployments of taxis in the peak office hours of the day and slowly reduce it as the day progresses. 

## Animated Heat Map for the busiest day

In [None]:
daily_pickup = pickups_data.groupby(['Month', 'Day','Year']).count()
maxCount_of_busiestDay = daily_pickup['passengerCount'].idxmax()

In [None]:
maxCount_of_busiestDay

We observe that from our data set, the busiest day we obtain is as above.

In [None]:
month_map_keyList = list(month_map.keys())

In [None]:
# Calculating the key for the obtained Month
val = month_map_keyList[month.index(str(maxCount_of_busiestDay[0]))]

In [None]:
busiest_day = pickups_data[(pickups_data['Month'] == maxCount_of_busiestDay[0]) & (pickups_data['Day'] == maxCount_of_busiestDay[1]) & (pickups_data['Year'] == maxCount_of_busiestDay[2])]


In [None]:
# Creating string format of busiest day
strDate = str(maxCount_of_busiestDay[2])+"-"+str(val)+"-"+str(maxCount_of_busiestDay[1])+"  {}"

In [None]:
hourly = defaultdict(list)
for pickup in busiest_day.itertuples():
    pickup_time = datetime.strptime(strDate.format(pickup.Hour), '%Y-%m-%d %H')
    hourly[str(pickup_time)].append([pickup.pickupLatitude, pickup.pickupLongitude])   
hourly = OrderedDict(sorted(hourly.items(), key=lambda t: t[0]))

In [None]:
# Feeding the dictionary data to the Map for plotting the HeatMap
mean_location = [pickups_data['pickupLatitude'].mean(),pickups_data['pickupLongitude'].mean()]
pickup_map = folium.Map(mean_location, zoom_start=12, tiles="Stamen Terrain")

hourly_pickups = HeatMapWithTime(
    data=list(hourly.values()),
    index=list(hourly.keys()), 
    radius=10,
    auto_play=True,
    max_opacity=0.4
)
hourly_pickups.add_to(pickup_map)
pickup_map

We have visualized the busiest day from the entire dataset and found that Manhattan, Brooklyn , Queens are the busiest parts of New york.
