<a href="https://colab.research.google.com/github/gborn/Taxi-Demand-Prediction/blob/main/Taxi_Demand_Prediction_Setup.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# New York Taxi Trip Data

In this notebook, we download NewYork city Taxi Trip data provided by TLC from an AWS S3 bucket, and store them in Google Cloud Storage bucket after indexing on pickupdatetime column, and converting them into parquet files. Parquet is an open source column-oriented data format that is widely used in the Apache Hadoop ecosystem. This made computation on dataframe 93% faster.



In [1]:
import requests
import time
import json
import pandas as pd
import numpy as np

In [2]:
from google.colab import auth
auth.authenticate_user()

# project in Google Cloud
project_id = 'taxi-demand-prediction-333318'

# Make a unique bucket to which we'll upload the file.
# (GCS buckets are part of a single global namespace.)
bucket_name = project_id

# setup Google Cloud project
!gcloud config set project {project_id}

Updated property [core/project].


In [3]:
from concurrent.futures import ThreadPoolExecutor
import multiprocessing as mp

def download_from_s3(fn):
    """
    Downloads New York Taxi data for a given month, and year from a public AWS S3 bucket
    Downloaded files will be stored in current file system of colab
    """
    print(f'{mp.current_process().name}: downloading {fn}')
    !wget --header="Host: s3.amazonaws.com" --header="User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36" --header="Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9" --header="Accept-Language: en-US,en;q=0.9" "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_{fn}.csv" -c -O 'yellow_tripdata_{fn}.csv'

def download_to_gcs():
    """
    Downloads from data from AWS S3 bucket in parallel, and
    copies files from colab to a Google Storage bucket 
    """
    # check if files are present

    # download Jan 2020 to Dec 2020 data, and Jan 2021, Feb 2021, March 2021 data parallely
    filenames = [f'2020-{i:02d}' for i in range(1, 13)] # 2020 data
    filenames.extend([f'2021-{i:02d}' for i in range(1, 4)]) # 2021 data till March

    with ThreadPoolExecutor() as pool:
        pool.map(download_from_s3, filenames)

    # bucket already exists, but incase it doesn't use the following cmd to create one
    # !gsutil mb gs://{bucket_name}

    # Copy all files to our new bucket.
    !gsutil cp yellow_tripdata_*.csv gs://{bucket_name}/raw_data

download_to_gcs()

MainProcess: downloading 2020-01MainProcess: downloading 2020-02
MainProcess: downloading 2020-03

MainProcess: downloading 2020-04
MainProcess: downloading 2020-05
MainProcess: downloading 2020-06MainProcess: downloading 2020-07
--2021-11-27 12:49:19--  https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-05.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... MainProcess: downloading 2020-08

MainProcess: downloading 2020-09
MainProcess: downloading 2020-10--2021-11-27 12:49:19--  https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-07.csv

--2021-11-27 12:49:19--  https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-08.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.137.56
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.137.56|:443... MainProcess: downloading 2020-11
54.231.137.56
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.137.56|:443... MainProcess: downloading 2020-

In [21]:
# Load csv data into Dask DataFrame
!pip install dask > /dev/null
!pip install dask[complete] >/dev/null
!pip install gcsfs > /dev/null

from dask import dataframe as dd

trips = dd.read_csv(
    f'gs://{bucket_name}/raw_data/yellow_tripdata_*.csv',
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    dtype={
    'store_and_fwd_flag': 'str',
    'RatecodeID': 'float64',
    'VendorID': 'float64',
    'passenger_count': 'float64',
    'payment_type': 'float64'
    }
)

trips.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,1.2,1.0,N,238,239,1.0,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5
1,1.0,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,1.2,1.0,N,239,238,1.0,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5
2,1.0,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,0.6,1.0,N,238,238,1.0,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5
3,1.0,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,0.8,1.0,N,238,151,1.0,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0
4,2.0,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,0.0,1.0,N,193,193,2.0,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0


In [6]:
len(trips)

29315124

In [7]:
trips.dtypes

VendorID                        float64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                    float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

## Store Data in Apache Parquet Format

Dask encourage dataframe users to store and load data using Parquet instead of csv. [Apache Parquet](http://parquet.apache.org/) is a columnar binary format that is easy to split into multiple files (easier for parallel loading) and is generally much simpler to deal with than HDF5 (from the Dask library’s perspective). It is also a common format used by other big data systems like [Apache Spark](http://spark.apache.org/) and so is useful to interchange with other systems.

In [27]:
# Index data on pickup datetime column using following division
# This will help partition our data month wise
divisions = [
 '2020-01-01', '2020-02-01', '2020-03-01', '2020-04-01',
 '2020-05-01', '2020-06-01', '2020-07-01', '2020-08-01',
 '2020-09-01', '2020-10-01', '2020-11-01', '2020-12-01',
 '2021-01-01', '2021-02-01', '2021-03-01', '2021-04-01'
 ]

trips = trips.set_index('tpep_pickup_datetime', drop=False, divisions=divisions)

# save to parquet files, which we will use in upcoming notebooks for EDA, and preprocessing
trips.to_parquet(f'gs://{bucket_name}/nyctaxidata', write_index=False, append=True, ignore_divisions=False)  # save csv files using parquet format
trips = dd.read_parquet(f'gs://{bucket_name}/nyctaxidata/*.parquet')
trips.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2020-01-01,2020-01-01 00:13:03,1.0,2.2,1.0,N,68,170,1.0,10.5,3.0,0.5,2.85,0.0,0.3,17.15,2.5
1,2.0,2020-01-01,2020-01-01 03:48:49,5.0,5.44,1.0,N,144,146,2.0,18.5,0.5,0.5,0.0,0.0,0.3,22.3,2.5
2,2.0,2020-01-01,2020-01-01 04:17:14,5.0,0.96,1.0,N,68,50,2.0,5.5,0.5,0.5,0.0,0.0,0.3,9.3,2.5
3,2.0,2020-01-01,2020-01-01 01:08:55,5.0,1.43,1.0,N,48,239,2.0,6.5,0.5,0.5,0.0,0.0,0.3,10.3,2.5
4,2.0,2020-01-01,2020-01-01 01:25:09,5.0,1.74,1.0,N,142,264,2.0,7.5,3.0,0.5,0.0,0.0,0.3,11.3,0.0


In [28]:
trips.npartitions

15

In [35]:
# In csv format this code takes around 1:30 minutes to complete for 29.3 million rows
# With parquet format it computes within 6s - 93% faster.
trips.get_partition(1).tail()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
6299468,2.0,2020-02-29 23:59:58,2020-03-01 00:07:28,1.0,1.03,1.0,N,113,114,1.0,6.5,0.5,0.5,2.06,0.0,0.3,12.36,2.5
6299469,2.0,2020-02-29 23:59:58,2020-03-01 00:22:25,3.0,5.37,1.0,N,163,13,1.0,19.5,0.5,0.5,6.99,0.0,0.3,30.29,2.5
6299470,2.0,2020-02-29 23:59:59,2020-03-01 00:05:34,6.0,1.73,1.0,N,137,229,1.0,7.0,0.5,0.5,2.16,0.0,0.3,12.96,2.5
6299471,2.0,2020-02-29 23:59:59,2020-03-01 00:03:42,1.0,0.71,1.0,N,255,256,2.0,4.5,0.5,0.5,0.0,0.0,0.3,5.8,0.0
6299472,2.0,2020-02-29 23:59:59,2020-03-01 00:13:00,1.0,2.92,1.0,N,90,13,1.0,12.0,0.5,0.5,3.95,0.0,0.3,19.75,2.5
