# Understanding Hired Rides in NYC

_[Project prompt](https://docs.google.com/document/d/1VERPjEZcC1XSs4-02aM-DbkNr_yaJVbFjLJxaYQswqA/edit#)_

_This scaffolding notebook may be used to help setup your final project. It's **totally optional** whether you make use of this or not._

_If you do use this notebook, everything provided is optional as well - you may remove or add prose and code as you wish._

_Anything in italics (prose) or comments (in code) is meant to provide you with guidance. **Remove the italic lines and provided comments** before submitting the project, if you choose to use this scaffolding. We don't need the guidance when grading._

_**All code below should be consider "pseudo-code" - not functional by itself, and only a suggestion at the approach.**_

## Project Setup

In [1]:
# all import statements needed for the project, for example:
import os
import re
import requests
import bs4
from bs4 import BeautifulSoup
import math
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import geopandas as gpd
import sqlalchemy as db
import pyarrow.parquet as pq
from datetime import datetime
import sqlite3
import matplotlib.pyplot as plt
import seaborn as sns
from sqlalchemy import text
import folium
from folium.plugins import HeatMap
from IPython.display import display
from IPython.display import HTML
from matplotlib.animation import FuncAnimation
from typing import List, Optional, Tuple, Dict, Any, Union

In [2]:
# any constants you might need; some have been added for you, and 
# some you need to fill in

TLC_URL = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

TAXI_ZONES_DIR = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"
TAXI_ZONES_SHAPEFILE = f"{TAXI_ZONES_DIR}/taxi_zones.shp"
WEATHER_CSV_DIR = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"

CRS = 4326  # coordinate reference system

# (lat, lon)
NEW_YORK_BOX_COORDS = ((40.560445, -74.242330), (40.908524, -73.717047))
LGA_BOX_COORDS = ((40.763589, -73.891745), (40.778865, -73.854838))
JFK_BOX_COORDS = ((40.639263, -73.795642), (40.651376, -73.766264))
EWR_BOX_COORDS = ((40.686794, -74.194028), (40.699680, -74.165205))

DATABASE_URL = "sqlite:///project.db"
DATABASE_SCHEMA_FILE = "schema.sql"
QUERY_DIRECTORY = "queries"

In [None]:
# Make sure the QUERY_DIRECTORY exists
try:
    os.mkdir(QUERY_DIRECTORY)
except Exception as e:
    if e.errno == 17:
        # the directory already exists
        pass
    else:
        raise

## Part 1: Data Preprocessing

### 1.1 Downloading data

#### 1.1.1 Define the function ***filter_links_by_date*** to scrap the data only from January 2020 to August 2024

In [None]:
def filter_links_by_date(
    links: List[str], 
    start_year: int, 
    start_month: int, 
    end_year: int, 
    end_month: int
) -> List[str]:
    """
    Filters a list of links to include only those within the specified date range.

    Args:
        links (list): List of URLs to filter.
        start_year (int): Starting year of the range (inclusive).
        start_month (int): Starting month of the range (inclusive).
        end_year (int): Ending year of the range (inclusive).
        end_month (int): Ending month of the range (inclusive).

    Returns:
        list: Filtered list of URLs.
    """
    filtered_links = []
    for link in links:
        # Extract year and month using regex
        match = re.search(r"(\d{4})-(\d{2})", link)
        if match:
            year, month = int(match.group(1)), int(match.group(2))
            # Check if the year and month fall within the specified range
            if (start_year < year < end_year) or \
               (year == start_year and start_month <= month) or \
               (year == end_year and month <= end_month):
                filtered_links.append(link)
    return filtered_links

start_year, start_month = 2020, 1
end_year, end_month = 2024, 8

#### 1.1.2 Extract the file links for Yellow Taxi & Uber trip data from the TLC Trip Record Data web pages and then download the filtered dated files to save them to a local directory.

In [None]:
# URL of the NYC Trip Data page
url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

# Define function to download Parquet files
def download_parquet_files(links: List[str], save_dir: str):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    for link in links:
        file_name = link.split("/")[-1]
        file_path = os.path.join(save_dir, file_name)
        print(f"Downloading {file_name}...")
        
        headers = {'User-Agent': 'Mozilla/5.0'}
        response = requests.get(link, headers=headers)
        
        if response.status_code == 200:
            with open(file_path, "wb") as file:
                file.write(response.content)
            print(f"Saved to {file_path}")
        else:
            print(f"Failed to download {file_name}: HTTP {response.status_code}")

# Scrape the webpage
response = requests.get(url)
soup = BeautifulSoup(response.content, "html.parser")

# Find all anchor tags with href attributes
all_links = [a["href"].strip() for a in soup.find_all("a", href=True)]

# Include only Yellow Taxi and HVFHV trip data using re module
yellow_taxi_pattern = r".*yellow_tripdata.*\.parquet"
hvfhv_pattern = r".*fhvhv_tripdata.*\.parquet"

yellow_taxi_links = [link for link in all_links if re.search(yellow_taxi_pattern, link)]
filtered_yellow_taxi_links = filter_links_by_date(yellow_taxi_links, start_year, start_month, end_year, end_month)
hvfhv_links = [link for link in all_links if re.search(hvfhv_pattern, link)]
filtered_hvfhv_links = filter_links_by_date(hvfhv_links, start_year, start_month, end_year, end_month)

# Make full URLs if relative paths are present
base_url = "https://www.nyc.gov"
yellow_taxi_links = [
    link if link.startswith("http") else base_url + link for link in filtered_yellow_taxi_links
]
hvfhv_links = [
    link if link.startswith("http") else base_url + link for link in hvfhv_links
]

# Debugging: Print the found links
print("Yellow Taxi Links:", filtered_yellow_taxi_links)
print("HVFHV Links:", filtered_hvfhv_links)

# Download Parquet files
print("Downloading Yellow Taxi Parquet files...")
download_parquet_files(filtered_yellow_taxi_links, save_dir = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project")

print("\nDownloading HVFHV Parquet files...")
download_parquet_files(filtered_hvfhv_links, save_dir = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project")

#### 1.1.3 Loading the Taxi Zones shapefile

In [None]:
# Path to the Taxi Zones shapefile
TAXI_ZONES_SHAPEFILE = "taxi_zones.shp"

# Load the shapefile into a GeoDataFrame
taxi_zones_gdf = gpd.read_file(TAXI_ZONES_SHAPEFILE)
taxi_zones_gdf = taxi_zones_gdf.to_crs(epsg=4326)

# Inspect the GeoDataFrame
print(taxi_zones_gdf.head())

### 1.2 Related function

#### 1.2.1 Load Taxi Zones

In [None]:
def load_taxi_zones(shapefile: str) -> Optional[gpd.GeoDataFrame]:
    """
    Load and process the taxi zones shapefile, extracting longitude and latitude from the geometry column.

    Parameters:
        shapefile_path (str): Path to the taxi zones shapefile.

    Returns:
        gpd.GeoDataFrame: Processed GeoDataFrame with longitude and latitude columns added.
    """
    try:
        # Load the Taxi Zones GeoDataFrame
        taxi_zones_gdf = gpd.read_file(shapefile)

        # Check CRS and reproject to WGS84 (EPSG:4326) if necessary
        if taxi_zones_gdf.crs != "EPSG:4326":
            taxi_zones_gdf = taxi_zones_gdf.to_crs(epsg=4326)

        # Extract longitude and latitude from the geometry column
        taxi_zones_gdf["longitude"] = taxi_zones_gdf.geometry.centroid.x
        taxi_zones_gdf["latitude"] = taxi_zones_gdf.geometry.centroid.y

        return taxi_zones_gdf

    except Exception as e:
        print(f"Error loading and processing shapefile: {e}")
        return None

In [None]:
def lookup_coords_for_taxi_zone_id(
    zone_loc_id: int, 
    loaded_taxi_zones: gpd.GeoDataFrame
) -> Tuple[Optional[float], Optional[float]]:
    """
    Look up the coordinates (longitude, latitude) of a taxi zone given its Location ID.

    Parameters:
    - zone_loc_id (int): The Location ID of the taxi zone.
    - loaded_taxi_zones (gpd.GeoDataFrame): The GeoDataFrame containing the taxi zones.

    Returns:
    - tuple: A tuple (longitude, latitude) representing the coordinates of the taxi zone.
    """
    try:
        # Filter the GeoDataFrame for the given Location ID
        zone = loaded_taxi_zones[loaded_taxi_zones["LocationID"] == zone_loc_id]

        # Check if the zone is valid
        if zone.empty:
            raise ValueError(f"Location ID {zone_loc_id} not found in the Taxi Zones dataset.")

        # Use the centroid of the zone polygon for coordinates
        longitude = zone.geometry.centroid.x.values[0]
        latitude = zone.geometry.centroid.y.values[0]

        # Return as a tuple
        return longitude, latitude

    except Exception as e:
        print(f"Error looking up coordinates for Location ID {zone_loc_id}: {e}")
        return None, None  # Return None for both longitude and latitude if there's an error


#### 1.2.2 Calculate Sample Size

In [None]:
def calculate_taxi_sample_size(population: Union[int, float]) -> int:
    '''
    Calculate the sample size needed for a given population

    Parameters:
    - population: total population size

    Returns:
    - the sample size needed
    '''
    # set confidence_level = 0.95
    margin_of_error = 0.05
    z_score = 1.96 # for 95% confidence level
    proportion = 0.5 # proportion of the population that has the attribute of interest (use 0.5 for max variance)

    # implement Cochran's formula
    sample_size = (z_score**2 * proportion * (1 - proportion)) / (margin_of_error**2)

    return math.ceil(sample_size)

In [None]:
def calculate_fhvhv_sample_size(population: Union[int, float]) -> int:
    '''
    Calculate the sample size needed for a given population

    Parameters:
    - population: total population size

    Returns:
    - the sample size needed
    '''
    # set confidence_level = 0.99
    margin_of_error = 0.05
    z_score = 2.58 # for 95% confidence level
    proportion = 0.5 # proportion of the population that has the attribute of interest (use 0.5 for max variance)

    # implement Cochran's formula
    sample_size = (z_score**2 * proportion * (1 - proportion)) / (margin_of_error**2)

    return math.ceil(sample_size)

### 1.2.3 Common Functions

In [None]:
def get_all_urls_from_tlc_page(taxi_page: str) -> List[str]:
    '''
    Get all the URLs from the TLC page
    
    Parameters:
    - taxi_page: the URL of the TLC page
    
    Returns:
    - a list of all URLs on the page
    '''
    
    response = requests.get(taxi_page)
    soup = BeautifulSoup(response.content, "html.parser")
    all_links = [a["href"] for a in soup.find_all("a", href=True)]
    
    return all_links

In [None]:
def filter_parquet_urls(all_urls: List[str]) -> Dict[str, List[str]]:
    '''
    Filter the URLs to get only the Parquet files
    
    Parameters:
    - all_urls: a list of all URLs
    
    Returns:
    - a list of URLs that contain the word "parquet"
    '''
    
    # Define patterns for Yellow Taxi and HVFHV Parquet files
    yellow_taxi_pattern = r".*yellow_tripdata.*\.parquet"
    hvfhv_pattern = r".*fhvhv_tripdata.*\.parquet"

    # Filter URLs for each dataset
    yellow_taxi_urls = [url for url in all_urls if re.search(yellow_taxi_pattern, url)]
    hvfhv_urls = [url for url in all_urls if re.search(hvfhv_pattern, url)]

    return {
        "yellow_taxi": yellow_taxi_urls,
        "hvfhv": hvfhv_urls,
    }

### 1.3 Process Taxi Data

#### 1.3.1 Generate random samples of taxi

In [None]:
# Define the directory containing the taxi monthly datasets
directory = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"
output_directory = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"

# Create output directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True)

# Iterate over each file in the directory
for file_name in os.listdir(directory):
    if "yellow" in file_name and file_name.endswith(".parquet"):
        file_path = os.path.join(directory, file_name)

        # Read the dataset
        data = pd.read_parquet(file_path)
        population_size = len(data)  # Get total rows in the dataset

        # Calculate sample size dynamically
        sample_size = calculate_taxi_sample_size(population_size)

        # Create a random sample
        sampled_data = data.sample(n=sample_size)

        # Save the sampled data to a new file
        output_path = os.path.join(output_directory, f"sampled_{file_name}")
        sampled_data.to_parquet(output_path)

        # Print information for debugging
        print(f"Processed {file_name}: Population = {population_size}, Sample = {sample_size}")


#### 1.3.2 Cleaning and processing taxi sample data

In [None]:
# NYC bounding box coordinates (lat, lon)
NEW_YORK_BOX_COORDS = ((40.560445, -74.242330), (40.908524, -73.717047))
LAT_MIN, LON_MIN = NEW_YORK_BOX_COORDS[0]
LAT_MAX, LON_MAX = NEW_YORK_BOX_COORDS[1]
CRS = 4326

def get_and_clean_taxi_data(
    file_path: str, 
    save_dir: str, 
    taxi_zones_gdf: gpd.GeoDataFrame
) -> Optional[pd.DataFrame]:
    """
    Load and clean a single month's taxi data from a local Parquet file.

    Parameters:
        file_path (str): Path to the Parquet file.
        save_dir (str): Directory where cleaned files will be saved.
        taxi_zones_gdf (gpd.GeoDataFrame): GeoDataFrame of taxi zones.

    Returns:
        pd.DataFrame: Cleaned DataFrame for the given month.
    """
    file_name = os.path.basename(file_path)

    try:
        # Step 1: Load the Parquet file
        print(f"Loading file: {file_name}...")
        df = pd.read_parquet(file_path)

        # Step 2: Normalize column names
        df.columns = df.columns.str.strip().str.lower()

        # Step 3: Look up and add coordinates for pulocationid and dolocationid
        if "pulocationid" in df.columns and "dolocationid" in df.columns:
            valid_location_ids = taxi_zones_gdf["LocationID"]
            initial_rows = len(df)
            df = df[df["pulocationid"].isin(valid_location_ids) & df["dolocationid"].isin(valid_location_ids)]
            dropped_rows = initial_rows - len(df)
            print(f"Dropped {dropped_rows} rows with invalid Location IDs.")
        else:
            print(f"Columns 'pulocationid' or 'dolocationid' not found in {file_name}. Skipping location ID filtering.")
            return None

        # Step 5: Drop rows with the same pickup and dropoff location
        initial_rows = len(df)
        df = df[df["pulocationid"] != df["dolocationid"]]
        filtered_rows = initial_rows - len(df)
        print(f"Filtered out {filtered_rows} rows with the same pickup and dropoff location.")

        # Step 6: Look up and add coordinates for pulocationid and dolocationid
        print("Looking up coordinates for location IDs...")
        df["pickup_coords"] = df["pulocationid"].apply(
            lambda loc_id: lookup_coords_for_taxi_zone_id(loc_id, taxi_zones_gdf)
        )
        df["dropoff_coords"] = df["dolocationid"].apply(
            lambda loc_id: lookup_coords_for_taxi_zone_id(loc_id, taxi_zones_gdf)
        )

        # Split coordinates into latitude and longitude columns
        df[["pickup_longitude", "pickup_latitude"]] = pd.DataFrame(
            df["pickup_coords"].tolist(), index=df.index
        )
        df[["dropoff_longitude", "dropoff_latitude"]] = pd.DataFrame(
            df["dropoff_coords"].tolist(), index=df.index
        )

        # Drop temporary coordinate columns
        df.drop(["pickup_coords", "dropoff_coords"], axis=1, inplace=True)


        # Step 7: Remove unnecessary columns
        columns_to_keep = [
            'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count',
            'trip_distance', 'pulocationid', 'dolocationid', 'fare_amount', 'extra',
            'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 
            'congestion_surcharge', 'airport_fee', 'pickup_latitude', 'pickup_longitude',
            'dropoff_latitude', 'dropoff_longitude'
        ]
        df = df[[col for col in columns_to_keep if col in df.columns]]

        # Step 8: Normalize datetime columns
        for time_col in ['tpep_pickup_datetime', 'tpep_dropoff_datetime']:
            if time_col in df.columns:
                df[time_col] = pd.to_datetime(df[time_col], errors='coerce')
                
        if "airport_fee" in df.columns:
            df["airport_fee"] = df["airport_fee"].fillna(0) 
            
        # Step 9: Filter trips within NYC bounding box
        if {"pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"}.issubset(df.columns):
            df = df[
                (df['pickup_latitude'] >= LAT_MIN) & (df['pickup_latitude'] <= LAT_MAX) &
                (df['pickup_longitude'] >= LON_MIN) & (df['pickup_longitude'] <= LON_MAX) &
                (df['dropoff_latitude'] >= LAT_MIN) & (df['dropoff_latitude'] <= LAT_MAX) &
                (df['dropoff_longitude'] >= LON_MIN) & (df['dropoff_longitude'] <= LON_MAX)
            ]

        # Step 10: Remove rides with the same pickup and dropoff locations or zero distance
        if "pulocationid" in df.columns and "dolocationid" in df.columns:
            df = df[df["pulocationid"] != df["dolocationid"]]
        if "trip_distance" in df.columns:
            df = df[df["trip_distance"] > 0]

        # Step 11: Rename columns for clarity
        df.rename(columns={
            "tpep_pickup_datetime": "pickup_time",
            "tpep_dropoff_datetime": "dropoff_time",
            "pulocationid": "pickup_location_id",
            "dolocationid": "dropoff_location_id",
            "extra": "miscellaneous_extra_charges"
        }, inplace=True)

        # Save the cleaned data
        output_path = os.path.join(save_dir, f"cleaned_{file_name}")
        df.to_parquet(output_path)
        print(f"Cleaned {len(df)} rows and saved to {output_path}.")
        return df

    except Exception as e:
        print(f"Error processing file {file_name}: {e}")
        return None


# Main Script
directory = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"
output_directory = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"
os.makedirs(output_directory, exist_ok=True)

# Load the Taxi Zones GeoDataFrame
TAXI_ZONES_SHAPEFILE = os.path.join(directory, "taxi_zones.shp")
taxi_zones_gdf = load_taxi_zones(TAXI_ZONES_SHAPEFILE)

# Iterate over each Parquet file in the directory
for file_name in os.listdir(directory):
    # Process only files that contain "yellow" and end with ".parquet"
    if "sampled_yellow" in file_name.lower() and file_name.endswith(".parquet"):
        file_path = os.path.join(directory, file_name)
        get_and_clean_taxi_data(file_path, output_directory, taxi_zones_gdf)

#### 1.3.3 Merge the cleaned .parquet files into a Pandas DataFrame

In [None]:
def get_taxi_data(cleaned_files_dir: str) -> pd.DataFrame:
    """
    Load and combine cleaned taxi data from local cleaned Parquet files.

    Parameters:
        cleaned_files_dir (str): Directory where cleaned Parquet files are stored.

    Returns:
        pd.DataFrame: Combined DataFrame of all months.
    """
    all_taxi_dataframes = []

    # Iterate through all cleaned Parquet files in the directory
    for file_name in os.listdir(cleaned_files_dir):
        if file_name.startswith("cleaned_") and file_name.endswith(".parquet"):
            file_path = os.path.join(cleaned_files_dir, file_name)
            print(f"Loading cleaned file: {file_path}...")
            
            # Load the cleaned Parquet file into a DataFrame
            df = pd.read_parquet(file_path)
            all_taxi_dataframes.append(df)

    # Combine all cleaned data into a single DataFrame
    if all_taxi_dataframes:
        taxi_data = pd.concat(all_taxi_dataframes, ignore_index=True)
        print(f"Combined {len(taxi_data)} rows from all cleaned files.")
    else:
        taxi_data = pd.DataFrame()
        print("No cleaned files found to process.")

    return taxi_data


In [None]:
def get_all_taxi_data(cleaned_files_dir: str) -> pd.DataFrame:
    """
    Combine all cleaned Yellow Taxi and HVFHV data from cleaned Parquet files.

    Parameters:
        cleaned_files_dir (str): Directory where cleaned Parquet files are stored.

    Returns:
        pd.DataFrame: Combined DataFrame of all cleaned taxi data.
    """
    # Process all cleaned Parquet files in the directory
    print("Combining all cleaned taxi data...")
    all_data = get_taxi_data(cleaned_files_dir=cleaned_files_dir)

    print(f"Final combined data contains {len(all_data)} rows.")
    return all_data


In [None]:
CLEANED_FILES_DIR = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"

# Load and combine all cleaned data into a single DataFrame
taxi_data = get_all_taxi_data(cleaned_files_dir=CLEANED_FILES_DIR)

In [None]:
taxi_data.head()

In [None]:
taxi_data.info()

In [None]:
taxi_data.describe()

### 1.4 Processing Uber Data

#### 1.4.1 Generate random samples of uber

In [None]:
# Define the directory containing the taxi monthly datasets
directory = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"
output_directory = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"

# Create output directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True)

# Iterate over each file in the directory
for file_name in os.listdir(directory):
    if "fhvhv" in file_name and file_name.endswith(".parquet"):
        file_path = os.path.join(directory, file_name)

        # Read the dataset
        data = pd.read_parquet(file_path)
        population_size = len(data)  # Get total rows in the dataset

        # Calculate sample size dynamically
        sample_size = calculate_fhvhv_sample_size(population_size)

        # Create a random sample
        sampled_data = data.sample(n=sample_size)

        # Save the sampled data to a new file
        output_path = os.path.join(output_directory, f"sampled_{file_name}")
        sampled_data.to_parquet(output_path)

        # Print information for debugging
        print(f"Processed {file_name}: Population = {population_size}, Sample = {sample_size}")


#### 1.4.2 Cleaning and processing uber sample data

In [None]:
def get_and_clean_uber_data(
    file_path: str, 
    save_dir: str, 
    taxi_zones_gdf: gpd.GeoDataFrame
) -> Optional[pd.DataFrame]:
    """
    Load and clean a single month's Uber data from a local Parquet file.

    Parameters:
        file_path (str): Path to the Parquet file.
        save_dir (str): Directory where cleaned files will be saved.
        taxi_zones_gdf (gpd.GeoDataFrame): GeoDataFrame of taxi zones.

    Returns:
        pd.DataFrame: Cleaned DataFrame for the given month.
    """
    file_name = os.path.basename(file_path)

    try:
        # Step 1: Load the Parquet file
        print(f"Loading file: {file_name}...")
        df = pd.read_parquet(file_path)

        # Step 2: Normalize column names
        df.columns = df.columns.str.strip().str.lower()

        # Step 3: Filter out non-Uber rides
        if "hvfhs_license_num" in df.columns:
            df = df[df["hvfhs_license_num"].str.contains("HV0003", na=False)]
            print(f"Filtered to Uber rides only: {len(df)} rows remaining.")
        else:
            print(f"'hvfhs_license_num' column not found in {file_name}. Skipping Uber filtering.")
            return None

        # Step 4: Drop rows with invalid Location IDs
        if "pulocationid" in df.columns and "dolocationid" in df.columns:
            valid_location_ids = set(taxi_zones_gdf["LocationID"])
            initial_rows = len(df)
            df = df[df["pulocationid"].isin(valid_location_ids) & df["dolocationid"].isin(valid_location_ids)]
            dropped_rows = initial_rows - len(df)
            print(f"Dropped {dropped_rows} rows with invalid Location IDs.")
        else:
            print(f"Columns 'pulocationid' or 'dolocationid' not found in {file_name}. Skipping location ID filtering.")
            return None

        # Step 5: Drop rows with the same pickup and dropoff location
        initial_rows = len(df)
        df = df[df["pulocationid"] != df["dolocationid"]]
        filtered_rows = initial_rows - len(df)
        print(f"Filtered out {filtered_rows} rows with the same pickup and dropoff location.")

        # Step 6: Look up and add coordinates for pulocationid and dolocationid
        print("Looking up coordinates for location IDs...")
        df["pickup_coords"] = df["pulocationid"].apply(
            lambda loc_id: lookup_coords_for_taxi_zone_id(loc_id, taxi_zones_gdf)
        )
        df["dropoff_coords"] = df["dolocationid"].apply(
            lambda loc_id: lookup_coords_for_taxi_zone_id(loc_id, taxi_zones_gdf)
        )

        # Split coordinates into latitude and longitude columns
        df[["pickup_longitude", "pickup_latitude"]] = pd.DataFrame(
            df["pickup_coords"].tolist(), index=df.index
        )
        df[["dropoff_longitude", "dropoff_latitude"]] = pd.DataFrame(
            df["dropoff_coords"].tolist(), index=df.index
        )

        # Drop temporary coordinate columns
        df.drop(["pickup_coords", "dropoff_coords"], axis=1, inplace=True)

        # Step 7: Remove unnecessary columns
        columns_to_keep = [
            'pickup_datetime', 'dropoff_datetime', 'pulocationid', 'dolocationid',
            'trip_miles', 'base_passenger_fare', 'tolls', 'sales_tax', 
            'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay',
            'pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'
        ]
        df = df[[col for col in columns_to_keep if col in df.columns]]

        # Step 8: Normalize datetime columns
        for time_col in ['pickup_datetime', 'dropoff_datetime']:
            if time_col in df.columns:
                df[time_col] = pd.to_datetime(df[time_col], errors='coerce')
        if 'trip_time' in df.columns:
            df['trip_time'] = pd.to_timedelta(df['trip_time'], errors='coerce')

        if "airport_fee" in df.columns:
            df["airport_fee"] = df["airport_fee"].fillna(0) 

        # Step 9: Filter trips within NYC bounding box
        if {"pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"}.issubset(df.columns):
            df = df[
                (df['pickup_latitude'] >= LAT_MIN) & (df['pickup_latitude'] <= LAT_MAX) &
                (df['pickup_longitude'] >= LON_MIN) & (df['pickup_longitude'] <= LON_MAX) &
                (df['dropoff_latitude'] >= LAT_MIN) & (df['dropoff_latitude'] <= LAT_MAX) &
                (df['dropoff_longitude'] >= LON_MIN) & (df['dropoff_longitude'] <= LON_MAX)
            ]

        # Step 10: Rename columns for clarity
        df.rename(columns={
            "pickup_datetime": "pickup_time",
            "dropoff_datetime": "dropoff_time",
            "pulocationid": "pickup_location_id",
            "dolocationid": "dropoff_location_id",
            "trip_miles": "trip_distance",
        }, inplace=True)

        # Save the cleaned data
        output_path = os.path.join(save_dir, f"cleaned_{file_name}")
        df.to_parquet(output_path)
        print(f"Cleaned {len(df)} rows and saved to {output_path}.")
        return df

    except Exception as e:
        print(f"Error processing file {file_name}: {e}")
        return None

# Main Script
directory = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"
output_directory = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"
os.makedirs(output_directory, exist_ok=True)

# Load the Taxi Zones GeoDataFrame
TAXI_ZONES_SHAPEFILE = os.path.join(directory, "taxi_zones.shp")
taxi_zones_gdf = load_taxi_zones(TAXI_ZONES_SHAPEFILE)

# Iterate over each Parquet file in the directory
for file_name in os.listdir(directory):
    # Process only files that contain "yellow" and end with ".parquet"
    if "sampled_fhvhv" in file_name.lower() and file_name.endswith(".parquet"):
        file_path = os.path.join(directory, file_name)
        get_and_clean_uber_data(file_path, output_directory, taxi_zones_gdf)

#### 1.4.3 Merge the cleaned .parquet files into a Pandas DataFrame

In [None]:
def get_uber_data(
    cleaned_files_dir: str, 
    prefix: str = "cleaned_sampled_fhvhv", 
    file_extension: str = ".parquet"
) -> pd.DataFrame:
    """
    Load and combine cleaned Uber data from local cleaned Parquet files.

    Parameters:
        cleaned_files_dir (str): Directory where cleaned Parquet files are stored.
        prefix (str): Prefix to identify Uber files (default is "cleaned_uber_").
        file_extension (str): File extension to identify Parquet files (default is ".parquet").

    Returns:
        pd.DataFrame: Combined DataFrame of all months.
    """
    all_uber_dataframes = []

    # Verify that the directory exists
    if not os.path.exists(cleaned_files_dir):
        print(f"Directory does not exist: {cleaned_files_dir}")
        return pd.DataFrame()

    # Iterate through all files in the directory
    for file_name in os.listdir(cleaned_files_dir):
        # Filter files by prefix and file extension
        if file_name.startswith(prefix) and file_name.endswith(file_extension):
            file_path = os.path.join(cleaned_files_dir, file_name)
            print(f"Loading cleaned Uber file: {file_path}...")
            
            # Load the cleaned Parquet file into a DataFrame
            try:
                df = pd.read_parquet(file_path)
                all_uber_dataframes.append(df)
            except Exception as e:
                print(f"Error loading file {file_path}: {e}")

    # Combine all cleaned data into a single DataFrame
    if all_uber_dataframes:
        uber_data = pd.concat(all_uber_dataframes, ignore_index=True)
        print(f"Combined {len(uber_data)} rows from all cleaned Uber files.")
    else:
        uber_data = pd.DataFrame()
        print("No cleaned Uber files found to process.")

    return uber_data


In [None]:
def get_all_uber_data(
    cleaned_files_dir: str, 
    prefix: str = "cleaned_sampled_fhvhv", 
    file_extension: str = ".parquet"
) -> pd.DataFrame:
    """
    Combine all cleaned Uber data from cleaned Parquet files.

    Parameters:
        cleaned_files_dir (str): Directory where cleaned Parquet files are stored.
        prefix (str): Prefix to identify Uber files (default is "cleaned_uber_").
        file_extension (str): File extension to identify Parquet files (default is ".parquet").

    Returns:
        pd.DataFrame: Combined DataFrame of all cleaned Uber data.
    """
    # Check if the directory exists
    if not os.path.exists(cleaned_files_dir):
        print(f"Directory does not exist: {cleaned_files_dir}")
        return pd.DataFrame()

    # Combine all cleaned Uber data
    print(f"Combining all cleaned Uber data from directory: {cleaned_files_dir}...")
    all_uber_data = get_uber_data(cleaned_files_dir=cleaned_files_dir, prefix=prefix, file_extension=file_extension)

    # Check if the resulting DataFrame is empty
    if all_uber_data.empty:
        print("No Uber data found or combined. Returning an empty DataFrame.")
    else:
        print(f"Final combined Uber data contains {len(all_uber_data)} rows.")
    
    return all_uber_data


In [None]:
# Main Script for Uber Data
CLEANED_UBER_FILES_DIR = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"

# Load and combine all cleaned Uber data into a single DataFrame
uber_data = get_all_uber_data(cleaned_files_dir=CLEANED_UBER_FILES_DIR)

In [None]:
uber_data.head()

In [None]:
uber_data.info()

In [None]:
uber_data.describe()

### 1.5 Processing Weather Data

In [None]:
def get_all_weather_csvs(directory: str) -> List[str]:
    """
    Returns a list of all CSV files in the given directory that are related to weather data.
    Weather-related files are identified by the presence of the keyword 'weather' in the filename.

    Args:
        directory (str): Path to the directory to search for files.

    Returns:
        list: A list of weather-related CSV filenames.
    """
    if not os.path.isdir(directory):
        raise ValueError(f"Invalid directory: {directory}")
    
    weather_csvs = [
        file for file in os.listdir(directory)
        if file.endswith('.csv') and 'weather' in file.lower()
    ]
    
    return weather_csvs

#### 1.5.1 Cleaning and processing weather data

##### Hourly

In [None]:
def clean_month_weather_data_hourly(csv_file: str) -> pd.DataFrame:
    """
    Cleans the weather dataset by:
    - Extracting specific columns.
    - Normalizing column names by adding underscores between words.
    - Converting weather codes to descriptive terms.
    - Handling multiple data points within the same hour by selecting the ideal one.
    - Filling missing hourly data by interpolating between previous and next data points.

    Parameters:
        csv_file (str): Path to the CSV file containing the weather data.

    Returns:
        pd.DataFrame: A cleaned DataFrame with selected and transformed columns.
    """
    # AU code mapping for HourlyPresentWeatherType
    au_code_mapping = {
        "DZ": "Drizzle",
        "RA": "Rain",
        "SN": "Snow",
        "SG": "Snow Grains",
        "IC": "Ice Crystals",
        "PL": "Ice Pellets",
        "GR": "Hail",
        "GS": "Small Hail",
        "UP": "Unknown Precipitation",
        "BR": "Mist",
        "FG": "Fog",
        "FU": "Smoke",
        "VA": "Volcanic Ash",
        "DU": "Dust",
        "SA": "Sand",
        "HZ": "Haze",
        "PY": "Spray",
        "PO": "Sand Whirls",
        "SQ": "Squalls",
        "FC": "Funnel Cloud",
        "SS": "Sandstorm",
        "DS": "Duststorm"
    }

    # Sky condition mapping for HourlySkyConditions
    sky_condition_mapping = {
        "CLR": "Clear",
        "FEW": "Few Clouds",
        "SCT": "Scattered Clouds",
        "BKN": "Broken Clouds",
        "OVC": "Overcast",
        "VV": "Obscured Sky"
    }

    # Function to interpret HourlyPresentWeatherType using AU codes
    def interpret_weather_type(weather_string):
        if pd.isnull(weather_string):
            return "Unknown"
        matches = re.findall(r"([A-Z]{2}):\d+", weather_string)
        descriptions = [au_code_mapping.get(code, "Unknown") for code in matches]
        return ", ".join(set(descriptions)) if descriptions else "Unknown"

    # Function to interpret HourlySkyConditions
    def interpret_sky_conditions(sky_string):
        if pd.isnull(sky_string):
            return "Unknown"
        pattern = r"(\w{3}):(\d{2})(?:\s(\d+))?"
        matches = re.findall(pattern, sky_string)
        interpreted_conditions = []
        for condition, octa, elevation in matches:
            description = sky_condition_mapping.get(condition, "Unknown")
            detail = f"{description}, Octas: {int(octa)}"
            if elevation:
                detail += f", Elevation: {int(elevation)} feet"
            interpreted_conditions.append(detail)
        return "; ".join(interpreted_conditions) if interpreted_conditions else "Unknown"

    # Function to select the ideal row for each hour
    def select_ideal_row(group):
        if group.empty:
            return None
        if len(group) == 1:
            return group.iloc[0]
        middle_of_hour = group.index[0].replace(minute=30, second=0, microsecond=0)
        time_diffs = abs((group.index - middle_of_hour).total_seconds())
        return group.iloc[time_diffs.argmin()]

    # Load the dataset
    df = pd.read_csv(csv_file, low_memory=False)

    # Normalize column names (add underscores between words and make lowercase)
    df.columns = df.columns.str.replace(r"([a-z])([A-Z])", r"\1_\2", regex=True).str.lower().str.replace(" ", "_")

    # Rename columns for consistency
    column_renaming = {
        "date": "hourly_time",
        "hourly_dry_bulb_temperature": "hourly_temperature"
    }
    df.rename(columns=column_renaming, inplace=True)

    # Select only the required columns
    columns_to_extract = [
        "hourly_time", "hourly_temperature", "hourly_present_weather_type", 
        "hourly_sky_conditions", "hourly_visibility", "hourly_precipitation", "hourly_wind_speed"
    ]
    df = df[[col for col in columns_to_extract if col in df.columns]]

    # Convert hourly_time to datetime
    df["hourly_time"] = pd.to_datetime(df["hourly_time"], errors="coerce")
    df = df.dropna(subset=["hourly_time"])  # Drop rows with invalid dates
    df = df.sort_values("hourly_time")  # Sort by hourly_time
    df.set_index("hourly_time", inplace=True)  # Set hourly_time as the index

    # Group by hourly and select the ideal row
    df_hourly = df.groupby(pd.Grouper(freq='H')).apply(select_ideal_row)
    df_hourly = df_hourly.dropna()  # Drop any None rows from empty groups

    # Reindex to include all hourly intervals and interpolate missing data
    all_hours = pd.date_range(start=df_hourly.index.min(), end=df_hourly.index.max(), freq='H')
    df_hourly = df_hourly.reindex(all_hours)
    df_hourly.index.name = "hourly_time"

    # Replace NaN in hourly_precipitation with 0, and handle 'T' values
    if "hourly_precipitation" in df_hourly.columns:
    # Replace known non-numeric values
        df_hourly["hourly_precipitation"] = df_hourly["hourly_precipitation"].replace("T", "0.01")
        df_hourly["hourly_precipitation"] = df_hourly["hourly_precipitation"].replace("", "0")
    # Remove any leftover non-numeric characters (e.g., "s")
        df_hourly["hourly_precipitation"] = df_hourly["hourly_precipitation"].str.replace(r"[^\d.]", "", regex=True)
    # Convert to float, coercing any remaining errors to NaN
        df_hourly["hourly_precipitation"] = pd.to_numeric(df_hourly["hourly_precipitation"], errors="coerce").fillna(0)


    # Apply forward-fill for hourly_wind_speed -- assume wind doesn't change much within an hour
    if "hourly_wind_speed" in df_hourly.columns:
        df_hourly["hourly_wind_speed"] = df_hourly["hourly_wind_speed"].fillna(method='ffill')

    # Convert numerical columns to numeric, coercing errors to NaN
    numerical_cols = ["hourly_temperature", "hourly_visibility"]
    for col in numerical_cols:
        df_hourly[col] = pd.to_numeric(df_hourly[col], errors="coerce")

    # Interpolate numerical columns
    df_hourly[numerical_cols] = df_hourly[numerical_cols].interpolate(method='time')

    # Fill categorical columns with previous value or next value if previous is missing
    categorical_cols = ["hourly_present_weather_type", "hourly_sky_conditions"]
    df_hourly[categorical_cols] = df_hourly[categorical_cols].fillna(method='ffill').fillna(method='bfill')

    # Reset index to bring hourly_time back as a column
    df_hourly.reset_index(inplace=True)

    # Convert hourly_present_weather_type to descriptive terms
    df_hourly["hourly_present_weather_type"] = df_hourly["hourly_present_weather_type"].apply(interpret_weather_type)

    # Convert hourly_sky_conditions to descriptive terms
    df_hourly["hourly_sky_conditions"] = df_hourly["hourly_sky_conditions"].apply(interpret_sky_conditions)

    return df_hourly


##### Daily

In [None]:
def clean_month_weather_data_daily(csv_file: str) -> pd.DataFrame:
    """
    Cleans the weather dataset for daily data by:
    - Extracting specific columns.
    - Interpreting the DailyWeather column using AU codes.
    - Converting Sunrise and Sunset columns to time format.
    - Replacing None in Sunrise and Sunset columns with the previous day's value.
    - Removing rows with NaN values.
    - Normalizing column names by adding underscores between words.
    - Replacing 'dry_bulb_temperature' with 'temperature' in column names.

    Parameters:
        csv_file (str): Path to the CSV file containing the weather data.

    Returns:
        pd.DataFrame: A cleaned DataFrame with selected and transformed columns.
    """
    # AU code mapping for DailyWeather
    au_code_mapping = {
        "DZ": "Drizzle",
        "RA": "Rain",
        "SN": "Snow",
        "SG": "Snow Grains",
        "IC": "Ice Crystals",
        "PL": "Ice Pellets",
        "GR": "Hail",
        "GS": "Small Hail",
        "UP": "Unknown Precipitation",
        "BR": "Mist",
        "FG": "Fog",
        "FU": "Smoke",
        "VA": "Volcanic Ash",
        "DU": "Dust",
        "SA": "Sand",
        "HZ": "Haze",
        "PY": "Spray",
        "PO": "Sand Whirls",
        "SQ": "Squalls",
        "FC": "Funnel Cloud",
        "SS": "Sandstorm",
        "DS": "Duststorm"
    }

    # Function to interpret DailyWeather using AU codes
    def interpret_daily_weather(weather_string):
        if pd.isnull(weather_string):
            return "Unknown"
        matches = re.findall(r"([A-Z]{2})[:\d]*", weather_string)
        descriptions = [au_code_mapping.get(code, "Unknown") for code in matches]
        return ", ".join(set(descriptions)) if descriptions else "Unknown"

    # Function to convert time in integer format (e.g., 720 -> 7:20 am)
    def convert_to_time_format(time_int):
        if pd.isnull(time_int):
            return None
        try:
            time_str = f"{int(time_int):04d}"  # Ensure it's always 4 digits (e.g., 720 -> "0720")
            return datetime.strptime(time_str, "%H%M").time()
        except ValueError:
            return None

    # Load the dataset
    df = pd.read_csv(csv_file, low_memory=False)

    # Select only the required columns
    columns_to_extract = [
        "DATE", "Sunrise", "Sunset", "DailyAverageDryBulbTemperature", 
        "DailyAverageWindSpeed", "DailyMaximumDryBulbTemperature", 
        "DailyMinimumDryBulbTemperature", "DailyWeather", "DailySnowDepth"
    ]
    df = df[[col for col in columns_to_extract if col in df.columns]]

    # Convert DATE to datetime
    df["DATE"] = pd.to_datetime(df["DATE"], errors="coerce")
    df = df.dropna(subset=["DATE"])  # Drop rows with invalid dates
    df = df.sort_values("DATE")  # Sort by date

    # Convert Sunrise and Sunset columns to time format
    if "Sunrise" in df.columns:
        df["Sunrise"] = df["Sunrise"].apply(convert_to_time_format)
    if "Sunset" in df.columns:
        df["Sunset"] = df["Sunset"].apply(convert_to_time_format)

    # Replace None in Sunrise and Sunset columns with the previous day's value
    df["Sunrise"] = df["Sunrise"].fillna(method='ffill')
    df["Sunset"] = df["Sunset"].fillna(method='ffill')

    # Interpret DailyWeather column
    if "DailyWeather" in df.columns:
        df["DailyWeather"] = df["DailyWeather"].apply(interpret_daily_weather)

    # Convert DailySnowDepth column to numeric and handle 'T' values
    if "DailySnowDepth" in df.columns:
        df["DailySnowDepth"] = df["DailySnowDepth"].replace("T", 0.01).astype(float)

    # Remove rows with NaN values
    df = df.dropna()

    # Rename columns to use underscores between words
    df.columns = [re.sub(r'(?<!^)(?=[A-Z])', '_', col).lower() for col in df.columns]

    # Replace 'dry_bulb_temperature' with 'temperature' in column names
    df.columns = [col.replace("dry_bulb_temperature", "temperature") for col in df.columns]

    df = df.rename(columns={"d_a_t_e": "date"})
    return df


In [None]:
WEATHER_CSV_DIR = "/Users/victorzhang/Desktop/Columbia S1/IEOR E4501 Tools for Analytics/Final Project"
def load_and_clean_weather_data():
    weather_csv_files = get_all_weather_csvs(WEATHER_CSV_DIR)
    
    hourly_dataframes = []
    daily_dataframes = []
        
    for csv_file in weather_csv_files:
        hourly_dataframe = clean_month_weather_data_hourly(csv_file)
        daily_dataframe = clean_month_weather_data_daily(csv_file)
        hourly_dataframes.append(hourly_dataframe)
        daily_dataframes.append(daily_dataframe)
        
    # create two dataframes with hourly & daily data from every month
    hourly_data = pd.concat(hourly_dataframes)
    daily_data = pd.concat(daily_dataframes)
    
    return hourly_data, daily_data

In [None]:
hourly_weather_data, daily_weather_data = load_and_clean_weather_data()

In [None]:
hourly_weather_data.head()

In [None]:
hourly_weather_data.info()

In [None]:
hourly_weather_data.describe()

In [None]:
daily_weather_data.head()

In [None]:
daily_weather_data.info()

In [None]:
daily_weather_data.describe()

## Part 2: Storing Cleaned Data

In [None]:
engine = db.create_engine(DATABASE_URL)

In [None]:
# if using SQL (as opposed to SQLAlchemy), define the commands 
# to create your 4 tables/dataframes
HOURLY_WEATHER_SCHEMA = """
TODO
"""

DAILY_WEATHER_SCHEMA = """
TODO
"""

TAXI_TRIPS_SCHEMA = """
TODO
"""

UBER_TRIPS_SCHEMA = """
TODO
"""

In [None]:
# create that required schema.sql file
with open(DATABASE_SCHEMA_FILE, "w") as f:
    f.write(HOURLY_WEATHER_SCHEMA)
    f.write(DAILY_WEATHER_SCHEMA)
    f.write(TAXI_TRIPS_SCHEMA)
    f.write(UBER_TRIPS_SCHEMA)

In [None]:
# create the tables with the schema files
with engine.connect() as connection:
    pass

### Add Data to Database

In [None]:
def write_dataframes_to_table(table_to_df_dict):
    raise NotImplemented()

In [None]:
map_table_name_to_dataframe = {
    "taxi_trips": taxi_data,
    "uber_trips": uber_data,
    "hourly_weather": hourly_data,
    "daily_weather": daily_data,
}

In [None]:
write_dataframes_to_table(map_table_name_to_dataframe)

## Part 3: Understanding the Data

In [None]:
# Helper function to write the queries to file
def write_query_to_file(query, outfile):
    raise NotImplementedError()

### Query 1

In [None]:
QUERY_1_FILENAME = ""

QUERY_1 = """
TODO
"""

In [None]:
# execute query either via sqlalchemy
with engine.connect() as con:
    results = con.execute(db.text(QUERY_1)).fetchall()
results

# or via pandas
pd.read_sql(QUERY_1, con=engine)

In [None]:
write_query_to_file(QUERY_1, QUERY_1_FILENAME)

## Part 4: Visualizing the Data

### Visualization 1

In [None]:
# use a more descriptive name for your function
def plot_visual_1(dataframe):
    figure, axes = plt.subplots(figsize=(20, 10))
    
    values = "..."  # use the dataframe to pull out values needed to plot
    
    # you may want to use matplotlib to plot your visualizations;
    # there are also many other plot types (other 
    # than axes.plot) you can use
    axes.plot(values, "...")
    # there are other methods to use to label your axes, to style 
    # and set up axes labels, etc
    axes.set_title("Some Descriptive Title")
    
    plt.show()

In [None]:
def get_data_for_visual_1():
    # Query SQL database for the data needed.
    # You can put the data queried into a pandas dataframe, if you wish
    raise NotImplementedError()

In [None]:
some_dataframe = get_data_for_visual_1()
plot_visual_1(some_dataframe)