In [None]:
!pip3 install dask
!pip install "dask[complete]" 
!pip install tqdm

import dask.dataframe as dd#similar to pandas
import datetime #Convert to unix time

import pandas as pd
from tqdm import tqdm

import time #Convert to unix time

# if numpy is not installed already : pip3 install numpy
import numpy as np#Do aritmetic operations on arrays
import math
import requests
import os

In [None]:
# The following lines of code are inspired from https://github.com/Swetadas-1718/Taxi_demand_prediction/blob/main/New%20York%20Taxi%20demand%20forecasting%20.ipynb


#The timestamps are converted to unix so as to get duration(trip-time) & speed also pickup-times in unix are used while binning 

# in out data we have time in the formate "YYYY-MM-DD HH:MM:SS" we convert thiss sting to python time formate and then into unix time stamp
# https://stackoverflow.com/a/27914405
def convert_to_unix(s):
    return time.mktime(datetime.datetime.fromisoformat(s).timetuple())



# we return a data frame which contains the columns
# 1.'passenger_count' : self explanatory
# 2.'trip_distance' : self explanatory
# 3.'pickup_longitude' : self explanatory
# 4.'pickup_latitude' : self explanatory
# 5.'dropoff_longitude' : self explanatory
# 6.'dropoff_latitude' : self explanatory
# 7.'total_amount' : total fair that was paid
# 8.'trip_times' : duration of each trip
# 9.'pickup_times : pickup time converted into unix time 
# 10.'Speed' : velocity of each trip
def return_with_trip_times(month):
    duration = month[['tpep_pickup_datetime','tpep_dropoff_datetime']].compute()
    #pickups and dropoffs to unix time
    duration_pickup = []
    duration_drop = []
    for pu, do in tqdm(zip(duration['tpep_pickup_datetime'].values, \
                           duration['tpep_dropoff_datetime'].values)):
      duration_pickup.append(convert_to_unix(pu))
      duration_drop.append(convert_to_unix(do))

    [convert_to_unix(x) for x in duration['tpep_pickup_datetime'].values]
    duration_drop = [convert_to_unix(x) for x in duration['tpep_dropoff_datetime'].values]
    #calculate duration of trips
    durations = (np.array(duration_drop) - np.array(duration_pickup))/float(60)

    #append durations of trips and speed in miles/hr to a new dataframe
    new_frame = month[['passenger_count','trip_distance', 'pickup_longitude','pickup_latitude',\
                       'dropoff_longitude','dropoff_latitude', 'total_amount']].compute()
    
    new_frame['trip_times'] = durations
    new_frame['pickup_times'] = duration_pickup
    new_frame['Speed'] = 60*(new_frame['trip_distance']/new_frame['trip_times'])
    
    return new_frame

In [None]:
# The following lines of code are inspired from https://github.com/Swetadas-1718/Taxi_demand_prediction/blob/main/New%20York%20Taxi%20demand%20forecasting%20.ipynb

#removing all outliers based on our univariate analysis above
def remove_outliers(new_frame):

    
    a = new_frame.shape[0]
    print ("Number of pickup records = ",a)
    temp_frame = new_frame
    temp_frame = new_frame[((new_frame.dropoff_longitude >= -74.15) & (new_frame.dropoff_longitude <= -73.7004) &\
                       (new_frame.dropoff_latitude >= 40.5774) & (new_frame.dropoff_latitude <= 40.9176)) & \
                       ((new_frame.pickup_longitude >= -74.15) & (new_frame.pickup_latitude >= 40.5774)& \
                       (new_frame.pickup_longitude <= -73.7004) & (new_frame.pickup_latitude <= 40.9176))]
    b = temp_frame.shape[0]
    print ("Number of outlier coordinates lying outside NY boundaries:",(a-b))

    
    temp_frame = new_frame[(new_frame.trip_times > 0) & (new_frame.trip_times < 720)]
    c = temp_frame.shape[0]
    print ("Number of outliers from trip times analysis:",(a-c))
    
    
    temp_frame = new_frame[(new_frame.trip_distance > 0) & (new_frame.trip_distance < 23)]
    d = temp_frame.shape[0]
    print ("Number of outliers from trip distance analysis:",(a-d))
    
    temp_frame = new_frame[(new_frame.Speed <= 65) & (new_frame.Speed >= 0)]
    e = temp_frame.shape[0]
    print ("Number of outliers from speed analysis:",(a-e))
    
    temp_frame = new_frame[(new_frame.total_amount <1000) & (new_frame.total_amount >0)]
    f = temp_frame.shape[0]
    print ("Number of outliers from fare analysis:",(a-f))
    
    
    new_frame = new_frame[((new_frame.dropoff_longitude >= -74.15) & (new_frame.dropoff_longitude <= -73.7004) &\
                       (new_frame.dropoff_latitude >= 40.5774) & (new_frame.dropoff_latitude <= 40.9176)) & \
                       ((new_frame.pickup_longitude >= -74.15) & (new_frame.pickup_latitude >= 40.5774)& \
                       (new_frame.pickup_longitude <= -73.7004) & (new_frame.pickup_latitude <= 40.9176))]
    
    new_frame = new_frame[(new_frame.trip_times > 0) & (new_frame.trip_times < 720)]
    new_frame = new_frame[(new_frame.trip_distance > 0) & (new_frame.trip_distance < 23)]
    new_frame = new_frame[(new_frame.Speed < 45.31) & (new_frame.Speed > 0)]
    new_frame = new_frame[(new_frame.total_amount <1000) & (new_frame.total_amount >0)]
    
    print ("Total outliers removed",a - new_frame.shape[0])
    print ("---")
    return new_frame

In [None]:
# we return a data frame which contains the columns
# 1.'pickup_longitude' : self explanatory
# 2.'pickup_latitude' : self explanatory
# 3.'pickup_times : pickup time converted into unix time 
def clean(frame):
    print("Calculating durations...")
    frame_with_durations = return_with_trip_times(frame)
    print ("Removing outliers")
    frame_with_durations_outliers_removed = remove_outliers(frame_with_durations)
    print("fraction of data points that remain after removing outliers", float(len(frame_with_durations_outliers_removed))/len(frame_with_durations))

    frame = frame_with_durations_outliers_removed[['pickup_longitude','pickup_latitude','pickup_times']]
    return frame

In [None]:
# returns the name of the csv file given year and month values
def get_name(year, month):
    assert year >= 2009 and year <= 2020, "year should be in the range 2009-2020"
    assert month >= 1 and month <= 12, "month should be in the range 1-12"
    if month < 10:
        month = "0" + str(month)
    name = "yellow_tripdata_{}-{}.csv".format(year, month)
    return name


# downloads NYC taxi data given year and month values
def download_data(year, month, dw_path="./data"):
    name = get_name(year, month)
    save_path = os.path.join(dw_path, name)
    if not os.path.exists(save_path):
        print("Downloading {} ...".format(name))
        url = "https://s3.amazonaws.com/nyc-tlc/trip+data/" + name
        os.system("wget -P {} {}".format(dw_path, url))
        print("Download finished. File saved to {} ...".format(save_path))
    else:
        print(name, "already exists, skipping the download...")
    return save_path

In [None]:
# downloads NYC taxi data, cleans, and saves it given year and month values
def download_clean_save(year, month, save_dir="./clean"):
    file_name = "clean_" + get_name(year, month)
    save_path = os.path.join(save_dir, file_name)
    if os.path.exists(save_path):
      print(file_name, "already exists, skipping cleaning...".format(file_name))
      return save_path
    file_path = download_data(year, month)
    frame = dd.read_csv(file_path, error_bad_lines=False, assume_missing=True)
    clean_frame = clean(frame)
    if "Unnamed: 0" in clean_frame.columns:
        clean_frame = clean_frame.drop("Unnamed: 0", axis=1)
    clean_frame.to_csv(save_path, index=False)
    del clean_frame
    # delete the original file
    os.system("rm {}".format(file_path))
    return save_path

In [None]:
year = 2016
month = 2
save_dir = "/content/gdrive/MyDrive/New_York_Data/clean/"
download_clean_save(year, month, save_dir)

In [None]:
path = download_data(2020, 1)
df = dd.read_csv(path, assume_missing=True)
print(df.columns)
df.head()

yellow_tripdata_2020-01.csv already exists, skipping...
Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
       'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
       'total_amount', 'congestion_surcharge'],
      dtype='object')


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,1.2,1.0,N,238.0,239.0,1.0,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5
1,1.0,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,1.2,1.0,N,239.0,238.0,1.0,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5
2,1.0,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,0.6,1.0,N,238.0,238.0,1.0,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5
3,1.0,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,0.8,1.0,N,238.0,151.0,1.0,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0
4,2.0,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,0.0,1.0,N,193.0,193.0,2.0,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0


In [None]:
!cp /content/data/yellow_tripdata_2020-01.csv /content/drive/MyDrive/New_York_Data/clean/yellow_tripdata_2020-01.csv

# Daily

In [None]:
from collections import OrderedDict

In [None]:
def unix2datetime(unix):
  return datetime.datetime.utcfromtimestamp(int(unix))

def print_unix(unix):
  print(unix2datetime(unix).strftime('%Y-%m-%d %H:%M:%S'))

In [None]:
# calculates weekdays for each entry in the data
def get_weekdays_list(df):
  weekdays = []
  print("Getting weekdays list...")
  for t in tqdm(df['pickup_times'].values):
    weekdays.append(unix2datetime(t).weekday())
  return weekdays

# saves data for each weekday separately
def save_weekdays(year, month, dir="/content/drive/MyDrive/New_York_Data/clean"):
  full_name = "clean_" + get_name(year, month)
  path = os.path.join(dir, full_name)
  df = pd.read_csv(path)
  weekdays = get_weekdays_list(df)
  df['weekday'] = weekdays

  # save each weekday separately
  name, ext = os.path.splitext(full_name)
  print("Saving each weekday separately...")
  for i in tqdm(range(7)):
    new_name = name + "_" + str(i) + ext
    new_path = os.path.join(dir, new_name)
    if os.path.exists(new_path):
      print("{} already exists, skipping...".format(new_path))
      continue
    day = df[df['weekday'] == i].drop("weekday", axis=1)
    name, ext = os.path.splitext(full_name)
    new_name = name + "_" + str(i) + ext
    new_path = os.path.join(dir, new_name)
    day.to_csv(new_path, index=False)

In [None]:
save_weekdays(2016, 1)

Getting weekdays list...


100%|██████████| 10609074/10609074 [00:10<00:00, 970123.13it/s]


Saving each weekday separately...


100%|██████████| 7/7 [01:02<00:00,  8.87s/it]


In [None]:
dir = "/content/drive/MyDrive/New_York_Data/clean"
full_name = "clean_yellow_tripdata_2016-01.csv"
path = os.path.join(dir, full_name)
df = pd.read_csv(path)
df.head()

Unnamed: 0,pickup_longitude,pickup_latitude,pickup_times
0,-73.980118,40.74305,1451606000.0
1,-73.994057,40.71999,1451606000.0
2,-73.979424,40.744614,1451606000.0
3,-73.947151,40.791046,1451606000.0
4,-73.998344,40.723896,1451606000.0


In [None]:
save_dir = "/content/drive/MyDrive/New_York_Data/clean"
def download_clean_save_weekday(year, month):
  save_path = download_clean_save(year, month, save_dir=save_dir)
  save_weekdays(year, month, dir=save_dir)

In [None]:
for i in range(9, 12):
  try:
    download_clean_save_weekday(2016, i)
  except Exception as e:
    print("Problem with month", i, ":", e)
    continue

yellow_tripdata_2016-09.csv already exists, skipping the download...
Calculating durations...


0it [00:00, ?it/s]


Problem with month 9 : fromisoformat: argument must be str
Downloading yellow_tripdata_2016-10.csv ...
Download finished. File saved to ./data/yellow_tripdata_2016-10.csv ...
Calculating durations...


0it [00:00, ?it/s]


Problem with month 10 : fromisoformat: argument must be str
Downloading yellow_tripdata_2016-11.csv ...
Download finished. File saved to ./data/yellow_tripdata_2016-11.csv ...
Calculating durations...


0it [00:00, ?it/s]

Problem with month 11 : fromisoformat: argument must be str





In [None]:
df = pd.read_csv("/content/data/yellow_tripdata_2016-01.csv", error_bad_lines=False)

In [None]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2,2016-01-01 00:00:00,2016-01-01 00:00:00,2,1.1,-73.990372,40.734695,1,N,-73.981842,40.732407,2,7.5,0.5,0.5,0.0,0.0,0.3,8.8
1,2,2016-01-01 00:00:00,2016-01-01 00:00:00,5,4.9,-73.980782,40.729912,1,N,-73.944473,40.716679,1,18.0,0.5,0.5,0.0,0.0,0.3,19.3
2,2,2016-01-01 00:00:00,2016-01-01 00:00:00,1,10.54,-73.98455,40.679565,1,N,-73.950272,40.788925,1,33.0,0.5,0.5,0.0,0.0,0.3,34.3
3,2,2016-01-01 00:00:00,2016-01-01 00:00:00,1,4.75,-73.993469,40.71899,1,N,-73.962242,40.657333,2,16.5,0.0,0.5,0.0,0.0,0.3,17.3
4,2,2016-01-01 00:00:00,2016-01-01 00:00:00,3,1.76,-73.960625,40.78133,1,N,-73.977264,40.758514,2,8.0,0.0,0.5,0.0,0.0,0.3,8.8


In [None]:
del df