In [1]:
%%HTML
<script src="require.js"></script>

In [2]:
from IPython.display import HTML
HTML(
    """
    <script
        src='https://cdnjs.cloudflare.com/ajax/libs/jquery/2.0.3/jquery.min.js'>
    </script>
    <script>
        code_show=true;
        function code_toggle() {
        if (code_show){
        $('div.jp-CodeCell > div.jp-Cell-inputWrapper').hide();
        } else {
        $('div.jp-CodeCell > div.jp-Cell-inputWrapper').show();
        }
        code_show = !code_show
        }
        $( document ).ready(code_toggle);
    </script>
    <form action='javascript:code_toggle()'>
        <input type="submit" value='Click here to toggle on/off the raw code.'>
    </form>
    """
)

<a id='Title'></a>
<h1 style="color:#000000; background-color:#f7b731; padding: 20px 0; text-align: center; font-weight: bold;">Data Preprocessing</h1><a id='ExecSum'></a><a id='Title'></a><a id='Title'></a>

## Summary

This notebook serves the purpose of transforming the raw data into a merged and preprocessed data repository which will then be used for analysis and training ML models. This process involves **standardizing** and **transforming** the raw data from various taxi datasets and **merging** them into strategically **partitioned Parquet files** which are written to storage. This approach frontloads the computational expense, which reduces further computational costs downstream by eliminating repetitive processing tasks. Additionally, it leads to lower memory usage and optimized querying and reading capabilities. 

The output is a table stored in Parquet files collectively containing 32.2 million data points. This was achieved after transforming and aggregating the original 1.9 billion raw data points.

## About the Data

The orginal data source used for this project is New York City (NYC) Taxi and Limousine Commission (TLC) Trip Record Data for Yellow Taxis, Green Taxis, For-Hire Vehicle (“FHV”) and High Volume For-Hire Vehicle (“FHVHV”). The data was accessed from the AWS Marketplace as part of the AWS Open Data Sponsorship Program. 

The original data came from four main datasets:

| Source | Description                                                                                                                                                                        |
|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Yellow   | Traditionally hailed by signaling to a driver who is on duty and seeking a passenger (street hail), but now they may also be hailed using an e-hail app like Curb or Arro. Yellow taxis are the only vehicles permitted to respond to a street hail from a passenger in all five boroughs. |
| Green    | Also known as boro taxis and street-hail liveries. Green taxis may respond to street hails, but only in the areas outside of lower Manhattan.                                        |
| FHV      | These are smaller-volume for-hire vehicle bases (Uber Pool, Lyft Line), community livery bases, luxury limousine bases, and black car bases.                                         |
| FHVHV    | These are high-volume for-hire vehicle bases (bases for companies dispatching 10,000+ trip per day, meaning Uber, Lyft, Via, and Juno), community livery bases, luxury limousine bases, and black car bases.                          |
<center><b>Table 1. </b> Dataset Description</center>


<br>
Each of the datasets were originally partitioned into separate Parquet files corresponding to a unique date for that source. These were not unified but separate, meaning the same date was often repeated four times for each source. Each row of each file represented a unique trip, with features containing details about the trip. The column names and datatypes were inconsistent and had to made consistent. The table below describes the general structure of the relevant columns for this project using aliases and the casted data type.
<br>
<br>


| Alias        | Data Type (Casted) | Description                                                      |
|--------------|--------------------|------------------------------------------------------------------|
| pu_id        | float64            | The ID of the Pick-Up location.                                  |
| do_id        | float64            | The ID of the Drop-Off location.                                 |
| pu_datetime  | timestamp          | The timestamp of pick-up, containing the precise date and time.  |
| do_datetime  | timestamp          | The timestamp of drop-off, containing the precise date and time. |

<center><b>Table 2. </b> Relevant Column Aliases</center>

## Setup and Imports

This sets up the Spark context and imports the necessary libraries.

In [None]:
sc

In [None]:
sc.install_pypi_package('pandas')
sc.install_pypi_package('numpy')
sc.install_pypi_package('pyspark')
sc.install_pypi_package('pyarrow')
sc.install_pypi_package('boto3')
sc.install_pypi_package('s3fs')
sc.install_pypi_package('holidays')

In [None]:
import numpy as np
import pandas as pd
import boto3
import s3fs

import pyarrow as pa
import pyarrow.parquet as pq

import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, DoubleType,
    StringType, TimestampType, IntegerType
)
from pyspark.sql.window import Window


In [None]:
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.access.key", "****")
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.secret.key", "*****")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
s3 = boto3.client('s3')
s3fs_ = s3fs.S3FileSystem(anon=False)

<a id='Title'></a>
<h1 style="color:#000000; background-color:#f7b731; padding: 20px 0; text-align: center; font-weight: bold;">Functions</h1><a id='ExecSum'></a><a id='Title'></a><a id='Title'></a>

| Function | Description |
|----------|----------------|
| get_filesnames | Retrieves all the filenames for a given vendor and time window.|
| is_holiday (UDF) | Accepts a date and returns 1 if it is a US holiday, 0 if not.|
| preproc_df | Filters out missing data of the df and extracts time-related features.|
<center><b>Table 3. </b> Functions Overview</center>

In [None]:
import holidays
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType


def get_filenames(vendor, years=range(2015, 2024)):
    """Retrieve file names for a given vendor across specified years."""
    files = []
    for year in years:
        prefix = f"trip data/{vendor}_tripdata_{year}-"
        response = s3.list_objects_v2(Bucket='nyc-tlc', Prefix=prefix)
        if 'Contents' in response:
            files.extend([obj['Key'] for obj in response['Contents']
                          if obj['Key'].endswith(".parquet")])
    return files


def is_holiday(date):
    """Check if a given date is a US holiday."""
    us_holidays = holidays.US(years=range(2012, 2023))
    return 1 if date in us_holidays else 0


is_holiday_udf = udf(is_holiday, IntegerType())


def preproc_df(df):
    """Preprocess dataframe to clean and aggregate data."""
    df = (df.filter(
        (F.col("pu_id").isNotNull()) &
        (F.col("do_id").isNotNull()) &
        (F.col("pu_datetime").isNotNull()) &
        (F.col("do_datetime").isNotNull()) &
        (~F.col("pu_id").isin([264, 265, 0])) &
        (~F.col("do_id").isin([264, 265, 0]))
    )
        .withColumn("pu_datetime", F.date_trunc("hour", F.col("pu_datetime")))
        .groupBy("pu_id", "pu_datetime")
        .agg(F.count("*").alias("count"))
        .withColumn("date_day", F.to_date(F.col("pu_datetime")))
        .withColumn("DayOfWeek", F.date_format(F.col("pu_datetime"), "E"))
        .withColumn("Month", F.month(F.col("pu_datetime")))
        .withColumn("HourOfDay", F.hour(F.col("pu_datetime")))
        .withColumn("Year", F.year(F.col("pu_datetime")))
        .withColumn("is_Holiday", is_holiday_udf(F.col("date_day").cast(
            "date")))
    )
    return df

<a id='Title'></a>
<h1 style="color:#000000; background-color:#f7b731; padding: 20px 0; text-align: center; font-weight: bold;">Data Cleaning</h1><a id='ExecSum'></a><a id='Title'></a><a id='Title'></a>

This step involves cleaning the raw data by filtering incomplete data and standardizing the column names and data types. The raw data of each vendor is read separately, and then made to conform to a new common schema. The outputs are intermediary tables written to storage due to their high memory costs. Some partial aggregation also occurs here to reduce the output size. 

PyArrow was used to perform this task for the following reasons:

* **Type Safety and Consistency**: PyArrow validates and converts data types to maintain consistency and reduce runtime errors in downstream processing.

* **Schema Evolution**: PyArrow facilitates schema evolution, allowing updates like adding new columns or modifying existing ones without losing compatibility with older data versions.

* **Efficient Null Handling**: PyArrow efficiently manages null values to prevent errors and ensure data integrity, even in large datasets with missing information.

* **Writing Parquet Files**: PyArrow writes Parquet files with a consistent schema and detailed metadata, enhancing stability and reliability for subsequent data usage.

In [2]:
cols_map = {
            'green': {'pu_id': "PULocationID",
                      'do_id': "DOLocationID",
                      'pu_datetime': "lpep_pickup_datetime",
                      'do_datetime': "lpep_dropoff_datetime"},
            'yellow': {'pu_id': "PULocationID",
                       'do_id': "DOLocationID",
                       'pu_datetime': "tpep_pickup_datetime",
                       'do_datetime': "tpep_dropoff_datetime"},
            'fhv': {'pu_id': "PUlocationID",
                   'do_id': "DOlocationID",
                   'pu_datetime': "pickup_datetime",
                   'do_datetime': "dropOff_datetime"},
            'fhvhv': {'pu_id': "PULocationID",
                   'do_id': "DOLocationID",
                   'pu_datetime': "pickup_datetime",
                   'do_datetime': "dropoff_datetime"},
           }

In [None]:
vendors = ['yellow', 'green', 'fhv', 'fhvhv']
years = range(2015, 2024)

for vendor in vendors:
    files = get_filenames(vendor, years)
    cols = list(cols_map[vendor].values())
    pu_id, do_id, pickup_datetime, dropoff_datetime = cols
    schema_map = {
        pu_id: pa.float64(),
        do_id: pa.float64(),
        pickup_datetime: pa.timestamp('us'),
        dropoff_datetime: pa.timestamp('us')
    }

    for file in files:
        file_path = f"s3://nyc-tlc/{file}"
        table = pq.read_table(file_path, filesystem=s3fs_).select(cols)

        new_fields = []
        for field in table.schema:
            dtype = schema_map.get(field.name, field.type)
            if dtype == pa.string():
                new_fields.append(pa.field(field.name, pa.string()))
            elif dtype == pa.timestamp('us'):
                new_fields.append(pa.field(field.name, pa.timestamp('us')))
            else:
                new_fields.append(pa.field(field.name, pa.float64()))
        new_schema = pa.schema(new_fields)

        df = table.to_pandas()
        for column, dtype in schema_map.items():
            if column in df.columns:
                if dtype == pa.string():
                    df[column] = df[column].fillna('').astype(str)
                elif dtype != pa.timestamp('us'):
                    df[column] = pd.to_numeric(df[column], errors='coerce')

        table = pa.Table.from_pandas(df, schema=new_schema)

        df = table.to_pandas()
        df.columns = ['pu_id', 'do_id', 'pu_datetime', 'do_datetime']
        df = preproc_df(df)
        folder = file.replace("trip data/", "").replace(".parquet", "")
        df.write.mode('overwrite').parquet(
            f"s3://<personal bucket>/agg/{folder}")

<a id='Title'></a>
<h1 style="color:#000000; background-color:#f7b731; padding: 20px 0; text-align: center; font-weight: bold;">Feature Extraction</h1><a id='ExecSum'></a><a id='Title'></a><a id='Title'></a>

Using the cleaned tables from the previous step, this step performs feature extraction and again writes these intermediary tables to storage rather than retaining the data in memory due to limitations.

In [None]:
def get_filenames_s3(vendor, years=range(2015, 2024)):
    """
    Retrieve a list of Parquet file names from an S3 bucket for a specified 
    vendor and range of years.

    Parameters:
    vendor (str): The vendor name to include in the file search prefix.
    years (iterable, optional): A range or list of years to search for files.
    Defaults to range(2015, 2024).

    Returns:
    list: A list of strings, where each string is the key of a Parquet file
    stored in the S3 bucket.
    """
    files = []
    for year in years:
        prefix = f"agg/{vendor}_tripdata_{year}-"
        response = s3.list_objects_v2(
            Bucket='<personal bucket>', Prefix=prefix)
        if 'Contents' in response:
            files.extend([obj['Key'] for obj in response['Contents']
                         if obj['Key'].endswith(".parquet")])
    return files

The code below reads the data from each vendor separately and performs feature extraction. The intermediary tables are written to storage.

In [None]:
vendors = ['yellow', 'green', 'fhvhv', 'fhv']
s3_path = "s3://<personal bucket>/"

for vendor in vendors:
    file_names = get_filenames_s3(vendor)
    for file in file_names:
        file_path = s3_path + file
        out_path = file_path.replace("/agg/", "/nyc_trips/")
        table = pq.read_table(file_path)
        table = table.set_column(
            table.schema.get_field_index('pu_datetime'), pa.field(
                'pu_datetime', pa.timestamp('ms')),
            table.column('pu_datetime').cast(pa.timestamp('ms'))
        ).set_column(
            table.schema.get_field_index(
                'pu_id'), pa.field('pu_id', pa.float64()),
            table.column('pu_id').cast(pa.float64())
        ).set_column(
            table.schema.get_field_index(
                'count'), pa.field('count', pa.int64()),
            table.column('count').cast(pa.int64())
        ).set_column(
            table.schema.get_field_index(
                'date_day'), pa.field('date_day', pa.date32()),
            table.column('date_day').cast(pa.date32())
        ).set_column(
            table.schema.get_field_index('DayOfWeek'), pa.field(
                'DayOfWeek', pa.string()),
            table.column('DayOfWeek').cast(pa.string())
        ).set_column(
            table.schema.get_field_index(
                'Month'), pa.field('Month', pa.int32()),
            table.column('Month').cast(pa.int32())
        ).set_column(
            table.schema.get_field_index(
                'HourOfDay'), pa.field('HourOfDay', pa.int32()),
            table.column('HourOfDay').cast(pa.int32())
        ).set_column(
            table.schema.get_field_index('Year'), pa.field(
                'Year', pa.int32()),
            table.column('Year').cast(pa.int32())
        ).set_column(
            table.schema.get_field_index('is_Holiday'), pa.field(
                'is_Holiday', pa.int32()),
            table.column('is_Holiday').cast(pa.int32())
        )

        pq.write_table(table, out_path)

## Merging and Writing

The data can now be read collectively into a unified Spark dataframe containing all the necessary information for downstream steps. The resulting table is then written to storage in Parquet files which are partitioned by `pu_id` and `HourOfDay`. 

The partitions were chosen as the reading and querying actions of downstream steps will extensively use those fields for identifying relevant subsets of data. This was found to be the optimal balance as overly granular partitioning can lead to an excessive number of small files, which can degrade performance due to increased overhead in file management and metadata processing. Conversely, overly broad partitioning can result in larger files that contain more irrelevant data, increasing the I/O and processing time required for queries that do not require the full dataset. This trade-off involves balancing file management efficiency and query performance. 

The resulting table can now be written to storage to be used for performing analysis, training a hybrid ML model, and developing predictions.

In [None]:
df = spark.read.parquet(
    "s3://<personal bucket>/*/*.parquet"
)

In [None]:
out_path = 's3://<personal bucket>/trips0/'
df.write.partitionBy('pu_id', 'HourOfDay') \
    .parquet(out_path, mode='overwrite')

<a id='Title'></a>
<h1 style="color:#000000; background-color:#f7b731; padding: 20px 0; text-align: center; font-weight: bold;">References</h1><a id='ExecSum'></a><a id='Title'></a><a id='Title'></a>

[AWS Marketplace: New York City Taxi and Limousine Commission (TLC) Trip Record Data](https://aws.amazon.com/marketplace/pp/prodview-okyonroqg5b2u#links) (accessed June 1, 2024).