# Understanding Hired Rides in NYC

_[Project prompt](https://docs.google.com/document/d/1VERPjEZcC1XSs4-02aM-DbkNr_yaJVbFjLJxaYQswqA/edit#)_

_This scaffolding notebook may be used to help setup your final project. It's **totally optional** whether you make use of this or not._

_If you do use this notebook, everything provided is optional as well - you may remove or add prose and code as you wish._

_Anything in italics (prose) or comments (in code) is meant to provide you with guidance. **Remove the italic lines and provided comments** before submitting the project, if you choose to use this scaffolding. We don't need the guidance when grading._

_**All code below should be consider "pseudo-code" - not functional by itself, and only a suggestion at the approach.**_

## Project Setup

In [1]:
# all import statements needed for the project, for example:

import os

from bs4 import BeautifulSoup
import matplotlib.pyplot as plt
import pandas as pd
import requests
import sqlalchemy as db
import re
import geopandas as gpd
from shapely.geometry import Point

## Part 1: Data Preprocessing

### Calculate Sample Size

### Common Functions

In [141]:
def get_all_urls_from_tlc_page(tlc_page_url):
    """
    Extract all URLs from the NYC TLC webpage.

    Parameters:
        tlc_page_url (str): The URL of the NYC TLC webpage.

    Returns:
        list: A list of all stripped URLs found on the page.
    """
    # Fetch the webpage content
    response = requests.get(tlc_page_url)
    response.raise_for_status()  # Raise an error for invalid response
    
    # Parse the HTML content with BeautifulSoup
    soup = BeautifulSoup(response.text, 'html.parser')
    
    # Extract and strip all <a> tag href attributes
    urls = [link['href'].strip() for link in soup.find_all('a', href=True)]
    return urls


In [None]:
def filter_taxi_and_hvfhv_urls(all_urls):
    """
    Filter URLs for Yellow Taxi and HVFHV Parquet files for the years 2020–2024.

    Parameters:
        all_urls (list): A list of URLs.

    Returns:
        tuple: (list of Yellow Taxi URLs, list of HVFHV URLs)
    """
    taxi_pattern = re.compile(r".*yellow_tripdata_20(20|21|22|23|24)-.*\.parquet$", re.IGNORECASE)
    hvfhv_pattern = re.compile(r".*fhvhv_tripdata_20(20|21|22|23|24)-.*\.parquet$", re.IGNORECASE)

    yellow_taxi_urls = [url for url in all_urls if taxi_pattern.search(url)]
    hvfhv_urls = [url for url in all_urls if hvfhv_pattern.search(url)]

    return yellow_taxi_urls, hvfhv_urls

### Download Taxi and HVFHV Data

In [150]:
def download_yellow_taxi_data(tlc_page_url, save_dir="yellow_taxi_data"):
    """
    Download all Yellow Taxi Parquet files for the years 2020–2024.

    Parameters:
        tlc_page_url (str): URL of the TLC page containing data links.
        save_dir (str): Directory to save Yellow Taxi data files.
    """
    # Fetch and process URLs
    all_urls = get_all_urls_from_tlc_page(tlc_page_url)
    base_url = "https://www1.nyc.gov"
    all_urls = convert_to_absolute_urls(all_urls, base_url)
    yellow_taxi_urls, _ = filter_taxi_and_hvfhv_urls(all_urls)

    print(f"Found {len(yellow_taxi_urls)} Yellow Taxi Parquet files.")

    # Download files
    os.makedirs(save_dir, exist_ok=True)
    for url in yellow_taxi_urls:
        download_parquet_file(url, save_dir)


In [151]:
# TLC page URL
tlc_page_url = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

# Download Yellow Taxi data
download_yellow_taxi_data(tlc_page_url, save_dir="yellow_taxi_data")

Found 57 Yellow Taxi Parquet files.
Downloaded yellow_tripdata_2024-01.parquet to yellow_taxi_data/yellow_tripdata_2024-01.parquet
Downloaded yellow_tripdata_2024-02.parquet to yellow_taxi_data/yellow_tripdata_2024-02.parquet
Downloaded yellow_tripdata_2024-03.parquet to yellow_taxi_data/yellow_tripdata_2024-03.parquet
Downloaded yellow_tripdata_2024-04.parquet to yellow_taxi_data/yellow_tripdata_2024-04.parquet
Downloaded yellow_tripdata_2024-05.parquet to yellow_taxi_data/yellow_tripdata_2024-05.parquet
Downloaded yellow_tripdata_2024-06.parquet to yellow_taxi_data/yellow_tripdata_2024-06.parquet
Downloaded yellow_tripdata_2024-07.parquet to yellow_taxi_data/yellow_tripdata_2024-07.parquet
Downloaded yellow_tripdata_2024-08.parquet to yellow_taxi_data/yellow_tripdata_2024-08.parquet
Downloaded yellow_tripdata_2024-09.parquet to yellow_taxi_data/yellow_tripdata_2024-09.parquet
Downloaded yellow_tripdata_2023-01.parquet to yellow_taxi_data/yellow_tripdata_2023-01.parquet
Downloaded yel

In [153]:
def download_hvfhv_data(tlc_page_url, save_dir="hvfhv_data"):
    """
    Download all HVFHV Parquet files for the years 2020–2024.

    Parameters:
        tlc_page_url (str): URL of the TLC page containing data links.
        save_dir (str): Directory to save HVFHV data files.
    """
    # Fetch and process URLs
    all_urls = get_all_urls_from_tlc_page(tlc_page_url)
    base_url = "https://www1.nyc.gov"
    all_urls = convert_to_absolute_urls(all_urls, base_url)
    _, hvfhv_urls = filter_taxi_and_hvfhv_urls(all_urls)

    print(f"Found {len(hvfhv_urls)} HVFHV Parquet files.")

    # Download files
    os.makedirs(save_dir, exist_ok=True)
    for url in hvfhv_urls:
        download_parquet_file(url, save_dir)


In [154]:
# TLC page URL
tlc_page_url = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

# Download HVFHV data
download_hvfhv_data(tlc_page_url, save_dir="hvfhv_data")


Found 57 HVFHV Parquet files.
Downloaded fhvhv_tripdata_2024-01.parquet to hvfhv_data/fhvhv_tripdata_2024-01.parquet
Downloaded fhvhv_tripdata_2024-02.parquet to hvfhv_data/fhvhv_tripdata_2024-02.parquet
Downloaded fhvhv_tripdata_2024-03.parquet to hvfhv_data/fhvhv_tripdata_2024-03.parquet
Downloaded fhvhv_tripdata_2024-04.parquet to hvfhv_data/fhvhv_tripdata_2024-04.parquet
Downloaded fhvhv_tripdata_2024-05.parquet to hvfhv_data/fhvhv_tripdata_2024-05.parquet
Downloaded fhvhv_tripdata_2024-06.parquet to hvfhv_data/fhvhv_tripdata_2024-06.parquet
Downloaded fhvhv_tripdata_2024-07.parquet to hvfhv_data/fhvhv_tripdata_2024-07.parquet
Downloaded fhvhv_tripdata_2024-08.parquet to hvfhv_data/fhvhv_tripdata_2024-08.parquet
Downloaded fhvhv_tripdata_2024-09.parquet to hvfhv_data/fhvhv_tripdata_2024-09.parquet
Downloaded fhvhv_tripdata_2023-01.parquet to hvfhv_data/fhvhv_tripdata_2023-01.parquet
Downloaded fhvhv_tripdata_2023-02.parquet to hvfhv_data/fhvhv_tripdata_2023-02.parquet
Downloaded fh

## Sampleing Taxi Data

In [4]:
import math

def cochran_sample_size(population, confidence_level=0.95, margin_of_error=0.05, proportion=0.5):
    """
    Calculate sample size using Cochran's formula with finite population correction.

    Parameters:
        population (int): Total population size (e.g., number of records for a month).
        confidence_level (float): Desired confidence level (e.g., 0.95 for 95%).
        margin_of_error (float): Desired margin of error (e.g., 0.05 for 5%).
        proportion (float): Estimated proportion of the population (default 0.5 for max variability).

    Returns:
        int: Sample size.
    """
    # Z-score for given confidence level
    z_scores = {0.9: 1.645, 0.95: 1.96, 0.99: 2.576}
    z = z_scores[confidence_level]

    # Initial sample size calculation
    n0 = (z**2 * proportion * (1 - proportion)) / (margin_of_error**2)

    # Apply finite population correction
    if population > 0:
        n = n0 / (1 + (n0 - 1) / population)
    else:
        n = n0

    return math.ceil(n)


## Calculate sample size for taxi

In [155]:
# Directory containing cleaned files
input_dir = "yellow_taxi_data"
output_dir = "sampled_taxi_data"
os.makedirs(output_dir, exist_ok=True)  # Create the output directory if it doesn't exist

# Sampling parameters
confidence_level = 0.99
margin_of_error = 0.05
proportion = 0.5

# Process each file
for file_name in os.listdir(input_dir):
    if file_name.endswith(".parquet"):
        file_path = os.path.join(input_dir, file_name)
        
        print(f"Processing file: {file_name}")
        
        # Load the monthly data
        monthly_data = pd.read_parquet(file_path)
        population_size = len(monthly_data)
        
        # Calculate the sample size for the month
        sample_size = cochran_sample_size(population_size, confidence_level, margin_of_error, proportion)
        print(f"Population size: {population_size}, Sample size: {sample_size}")
        
        # Perform sampling
        sampled_data = monthly_data.sample(n=sample_size, random_state=42)
        
        # Save the sampled data
        output_file = os.path.join(output_dir, f"sampled_{file_name}")
        sampled_data.to_parquet(output_file)
        print(f"Saved sampled data to: {output_file}")



Processing file: yellow_tripdata_2023-06.parquet
Population size: 3307234, Sample size: 664
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2023-06.parquet
Processing file: yellow_tripdata_2022-10.parquet
Population size: 3675411, Sample size: 664
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2022-10.parquet
Processing file: yellow_tripdata_2020-03.parquet
Population size: 3007687, Sample size: 664
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2020-03.parquet
Processing file: yellow_tripdata_2021-05.parquet
Population size: 2507109, Sample size: 664
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2021-05.parquet
Processing file: yellow_tripdata_2022-09.parquet
Population size: 3183767, Sample size: 664
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2022-09.parquet
Processing file: yellow_tripdata_2024-04.parquet
Population size: 3514289, Sample size: 664
Saved sampled data to: sampled_taxi_data/sa

## Set sample size = 664 for taxi 

In [171]:
fixed_sample_size = 664  # we set 664 as the desired sample size 

input_dir = "yellow_taxi_data"
output_dir = "sampled_taxi_data"
for file_name in os.listdir(input_dir):
    if file_name.endswith(".parquet"):
        file_path = os.path.join(input_dir, file_name)
        
        print(f"Processing file: {file_name}")
        
        # Load the monthly data
        monthly_data = pd.read_parquet(file_path)
        
        # Perform sampling
        sampled_data = monthly_data.sample(n=fixed_sample_size, random_state=42)
        
        # Save the sampled data
        output_file = os.path.join(output_dir, f"sampled_{file_name}")
        sampled_data.to_parquet(output_file)
        print(f"Saved sampled data to: {output_file}")

Processing file: yellow_tripdata_2023-06.parquet
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2023-06.parquet
Processing file: yellow_tripdata_2022-10.parquet
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2022-10.parquet
Processing file: yellow_tripdata_2020-03.parquet
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2020-03.parquet
Processing file: yellow_tripdata_2021-05.parquet
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2021-05.parquet
Processing file: yellow_tripdata_2022-09.parquet
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2022-09.parquet
Processing file: yellow_tripdata_2024-04.parquet
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2024-04.parquet
Processing file: yellow_tripdata_2020-12.parquet
Saved sampled data to: sampled_taxi_data/sampled_yellow_tripdata_2020-12.parquet
Processing file: yellow_tripdata_2020-02.parquet
Saved sampled data to: sampled_taxi_data/

## Calculate Sample size for HVFHV

In [168]:
# Directory containing cleaned files
input_dir = "hvfhv_data"
output_dir = "sampled_hvfhv_data"
os.makedirs(output_dir, exist_ok=True)  # Create the output directory if it doesn't exist

# Sampling parameters
confidence_level = 0.99
margin_of_error = 0.05
proportion = 0.5

# Process each file
for file_name in os.listdir(input_dir):
    if file_name.endswith(".parquet"):
        file_path = os.path.join(input_dir, file_name)
        
        print(f"Processing file: {file_name}")
        
        # Load the monthly data
        monthly_data = pd.read_parquet(file_path)
        population_size = len(monthly_data)
        
        # Calculate the sample size for the month
        sample_size = cochran_sample_size(population_size, confidence_level, margin_of_error, proportion)
        print(f"Population size: {population_size}, Sample size: {sample_size}")
        
        # Perform sampling
        sampled_data = monthly_data.sample(n=sample_size, random_state=42)
        
        # Save the sampled data
        output_file = os.path.join(output_dir, f"sampled_{file_name}")
        sampled_data.to_parquet(output_file)
        print(f"Saved sampled data to: {output_file}")



Processing file: fhvhv_tripdata_2021-03.parquet
Population size: 14227393, Sample size: 664
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2021-03.parquet
Processing file: fhvhv_tripdata_2024-02.parquet
Population size: 19359148, Sample size: 664
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2024-02.parquet
Processing file: fhvhv_tripdata_2023-09.parquet
Population size: 19851123, Sample size: 664
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2023-09.parquet
Processing file: fhvhv_tripdata_2020-05.parquet
Population size: 6089999, Sample size: 664
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2020-05.parquet
Processing file: fhvhv_tripdata_2022-06.parquet
Population size: 17780075, Sample size: 664
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2022-06.parquet
Processing file: fhvhv_tripdata_2023-10.parquet
Population size: 20186330, Sample size: 664
Saved sampled data to: sampled_hvfhv_data/sa

## Set sample size = 664 for HVFHV 

In [170]:
fixed_sample_size = 664  # we set 64 as the desired sample size 

for file_name in os.listdir(input_dir):
    if file_name.endswith(".parquet"):
        file_path = os.path.join(input_dir, file_name)
        
        print(f"Processing file: {file_name}")
        
        # Load the monthly data
        monthly_data = pd.read_parquet(file_path)
        
        # Perform sampling
        sampled_data = monthly_data.sample(n=fixed_sample_size, random_state=42)
        
        # Save the sampled data
        output_file = os.path.join(output_dir, f"sampled_{file_name}")
        sampled_data.to_parquet(output_file)
        print(f"Saved sampled data to: {output_file}")

Processing file: fhvhv_tripdata_2021-03.parquet
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2021-03.parquet
Processing file: fhvhv_tripdata_2024-02.parquet
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2024-02.parquet
Processing file: fhvhv_tripdata_2023-09.parquet
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2023-09.parquet
Processing file: fhvhv_tripdata_2020-05.parquet
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2020-05.parquet
Processing file: fhvhv_tripdata_2022-06.parquet
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2022-06.parquet
Processing file: fhvhv_tripdata_2023-10.parquet
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2023-10.parquet
Processing file: fhvhv_tripdata_2022-07.parquet
Saved sampled data to: sampled_hvfhv_data/sampled_fhvhv_tripdata_2022-07.parquet
Processing file: fhvhv_tripdata_2023-01.parquet
Saved sampled data to: sampled_hvfhv_data/sampled

### Filtering Uber Data

In [174]:
def filter_uber_by_license(input_dir, output_dir):
    """
    Filter Uber rides from HVFHV dataset using 'Hvfhs_license_num'.

    Parameters:
        input_dir (str): Directory containing raw HVFHV Parquet files.
        output_dir (str): Directory to save filtered Uber rides.

    Returns:
        None: Saves filtered Uber rides to the output directory.
    """
    os.makedirs(output_dir, exist_ok=True)

    for file_name in os.listdir(input_dir):
        if file_name.endswith(".parquet"):
            file_path = os.path.join(input_dir, file_name)
            print(f"Processing file: {file_name}")

            # Load the HVFHV data
            hvfhv_data = pd.read_parquet(file_path)

            # Filter for Uber rides where Hvfhs_license_num is 'HV0003'
            uber_data = hvfhv_data[hvfhv_data['hvfhs_license_num'] == 'HV0003']

            # Save the filtered data
            output_file = os.path.join(output_dir, f"uber_{file_name}")
            uber_data.to_parquet(output_file)
            print(f"Saved filtered Uber data to: {output_file}")


In [173]:
input_dir = "sampled_hvfhv_data"
output_dir = "uber_data"

# Filter Uber rides
filter_uber_by_license(hvfhv_input_dir, uber_output_dir)

Processing file: sampled_fhvhv_tripdata_2022-01.parquet
Saved filtered Uber data to: uber_data/uber_sampled_fhvhv_tripdata_2022-01.parquet
Processing file: sampled_fhvhv_tripdata_2022-11.parquet
Saved filtered Uber data to: uber_data/uber_sampled_fhvhv_tripdata_2022-11.parquet
Processing file: sampled_fhvhv_tripdata_2023-07.parquet
Saved filtered Uber data to: uber_data/uber_sampled_fhvhv_tripdata_2023-07.parquet
Processing file: sampled_fhvhv_tripdata_2024-05.parquet
Saved filtered Uber data to: uber_data/uber_sampled_fhvhv_tripdata_2024-05.parquet
Processing file: sampled_fhvhv_tripdata_2022-08.parquet
Saved filtered Uber data to: uber_data/uber_sampled_fhvhv_tripdata_2022-08.parquet
Processing file: sampled_fhvhv_tripdata_2021-04.parquet
Saved filtered Uber data to: uber_data/uber_sampled_fhvhv_tripdata_2021-04.parquet
Processing file: sampled_fhvhv_tripdata_2020-12.parquet
Saved filtered Uber data to: uber_data/uber_sampled_fhvhv_tripdata_2020-12.parquet
Processing file: sampled_fh

## Data Cleaning for taxi

In [219]:
# Define directories
input_directory = "sampled_taxi_data/"
output_directory = "Cleaned_Taxi_Data/"
os.makedirs(output_directory, exist_ok=True)

# Load taxi zone shapefile
zones_gdf = gpd.read_file('taxi_zones')  # Replace with your actual path
zones_gdf = zones_gdf.to_crs(epsg=4326)  # Ensure CRS is WGS84 for latitude and longitude

# Calculate centroids for pickup and dropoff locations
zones_gdf['centroid'] = zones_gdf.geometry.centroid
zones_gdf['latitude'] = zones_gdf.centroid.y
zones_gdf['longitude'] = zones_gdf.centroid.x
zones_df = zones_gdf[['LocationID', 'latitude', 'longitude']]

# Latitude and longitude bounds for NYC
LAT_MIN, LAT_MAX = 40.4774, 40.9176
LON_MIN, LON_MAX = -74.2591, -73.7004

def clean_taxi_data(file_path):
    # Read the Parquet file
    trips_df = pd.read_parquet(file_path)
    print('Processing file:', file_path)

    # Merge trip data with zone centroids for pickups
    trips_with_pickup = trips_df.merge(
        zones_df,
        how='left',
        left_on='PULocationID',
        right_on='LocationID'
    ).rename(columns={'latitude': 'pickup_latitude', 'longitude': 'pickup_longitude'})

    # Merge trip data with zone centroids for dropoffs
    trips_with_locations = trips_with_pickup.merge(
        zones_df,
        how='left',
        left_on='DOLocationID',
        right_on='LocationID',
        suffixes=('', '_dropoff')
    ).rename(columns={'latitude': 'dropoff_latitude', 'longitude': 'dropoff_longitude'})

    # Filter out trips with invalid location IDs
    valid_trips = trips_with_locations.dropna(subset=['pickup_latitude', 'dropoff_latitude'])
    valid_trips = valid_trips[
        (valid_trips['pickup_latitude'].between(LAT_MIN, LAT_MAX)) &
        (valid_trips['pickup_longitude'].between(LON_MIN, LON_MAX)) &
        (valid_trips['dropoff_latitude'].between(LAT_MIN, LAT_MAX)) &
        (valid_trips['dropoff_longitude'].between(LON_MIN, LON_MAX))
    ]

    # Drop original LocationID columns
    valid_trips.drop(['PULocationID', 'DOLocationID', 'LocationID', 'LocationID_dropoff'], axis=1, inplace=True)

    # Convert column names to lowercase
    valid_trips.columns = valid_trips.columns.str.lower()

    # Filter out trips with non-positive or missing trip distances
    valid_trips = valid_trips.dropna(subset=['trip_distance'])
    valid_trips = valid_trips[valid_trips['trip_distance'] > 0]
    valid_trips['trip_distance'] = valid_trips['trip_distance'].astype(float)

    # Filter out trips with non-positive or missing passenger counts
    valid_trips = valid_trips.dropna(subset=['passenger_count'])
    valid_trips = valid_trips[valid_trips['passenger_count'] > 0]
    valid_trips['passenger_count'] = valid_trips['passenger_count'].astype(int)

    # Filter out trips with negative fare amounts
    valid_trips = valid_trips[
        (valid_trips['fare_amount'] >= 0) &
        (valid_trips['total_amount'] >= 0) &
        (valid_trips['tolls_amount'] >= 0)
    ]

    # Filter out trips with invalid payment types
    valid_trips['payment_type'] = valid_trips['payment_type'].astype(int)
    valid_trips = valid_trips[valid_trips['payment_type'].between(1, 6)]

    # Filter out trips with invalid RateCodeID values
    valid_trips['ratecodeid'] = valid_trips['ratecodeid'].astype(int)
    valid_trips = valid_trips[valid_trips['ratecodeid'].between(1, 6)]

    # Convert store_and_fwd_flag to binary
    valid_trips['store_and_fwd_flag'] = valid_trips['store_and_fwd_flag'].map({'Y': 1, 'N': 0}).fillna(0)

    # Convert airport_fee to float
    valid_trips['airport_fee'] = pd.to_numeric(valid_trips['airport_fee'], errors='coerce').fillna(0)

    # Rename columns
    valid_trips = valid_trips.rename(
        columns={'extra': 'Miscellaneous_Extras', 'tpep_pickup_datetime': 'pickup_datetime', 'tpep_dropoff_datetime': 'dropoff_datetime'}
    )

    # Filter out trips where dropoff is earlier than pickup
    valid_trips = valid_trips[valid_trips['dropoff_datetime'] >= valid_trips['pickup_datetime']]

    return valid_trips

# Process all files in the input directory
all_cleaned_data = pd.DataFrame()
taxi_file_names = [f for f in os.listdir(input_directory) if f.endswith('.parquet')]

for file in taxi_file_names:
    file_path = os.path.join(input_directory, file)
    cleaned_data = clean_taxi_data(file_path)
    output_file = os.path.join(output_directory, file)
    cleaned_data.to_parquet(output_file)
    all_cleaned_data = pd.concat([all_cleaned_data, cleaned_data], axis=0)
    print(f'File {file} processed and saved.')

# Save the consolidated cleaned data
final_output_file = os.path.join(output_directory, 'Taxi_all.parquet')
all_cleaned_data.to_parquet(final_output_file)
print('All files have been processed and consolidated.')


  zones_gdf['centroid'] = zones_gdf.geometry.centroid

  zones_gdf['latitude'] = zones_gdf.centroid.y

  zones_gdf['longitude'] = zones_gdf.centroid.x


Processing file: sampled_taxi_data/sampled_yellow_tripdata_2020-03.parquet
File sampled_yellow_tripdata_2020-03.parquet processed and saved.
Processing file: sampled_taxi_data/sampled_yellow_tripdata_2024-04.parquet
File sampled_yellow_tripdata_2024-04.parquet processed and saved.
Processing file: sampled_taxi_data/sampled_yellow_tripdata_2021-05.parquet
File sampled_yellow_tripdata_2021-05.parquet processed and saved.
Processing file: sampled_taxi_data/sampled_yellow_tripdata_2022-09.parquet
File sampled_yellow_tripdata_2022-09.parquet processed and saved.
Processing file: sampled_taxi_data/sampled_yellow_tripdata_2023-06.parquet
File sampled_yellow_tripdata_2023-06.parquet processed and saved.
Processing file: sampled_taxi_data/sampled_yellow_tripdata_2022-10.parquet
File sampled_yellow_tripdata_2022-10.parquet processed and saved.
Processing file: sampled_taxi_data/sampled_yellow_tripdata_2023-07.parquet
File sampled_yellow_tripdata_2023-07.parquet processed and saved.
Processing fi

In [252]:
# Define the path to the consolidated Parquet file
file_path = 'Cleaned_Taxi_Data/Taxi_all.parquet'

# Load the dataset into a DataFrame
df_taxi = pd.read_parquet(file_path)

# Display the first few rows of the DataFrame
df_taxi.describe()

Unnamed: 0,vendorid,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,payment_type,fare_amount,Miscellaneous_Extras,...,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude
count,33773.0,33773,33773,33773.0,33773.0,33773.0,33773.0,33773.0,33773.0,33773.0,...,33773.0,33773.0,33773.0,33773.0,33773.0,33773.0,33773.0,33773.0,33773.0,33773.0
mean,1.720576,2022-05-14 04:58:48.261599744,2022-05-14 05:14:45.448405760,1.428656,3.153477,1.044473,0.009593,1.230954,15.024451,1.261126,...,2.790389,0.41687,0.559586,22.302831,2.335223,0.084513,40.753638,-73.967629,40.755853,-73.97179
min,1.0,2020-01-01 00:11:06,2020-01-01 00:30:50,1.0,0.01,1.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,40.525495,-74.233534,40.525495,-74.233534
25%,1.0,2021-03-15 11:32:47,2021-03-15 12:02:01,1.0,1.06,1.0,0.0,1.0,7.2,0.0,...,0.05,0.0,0.3,12.6,2.5,0.0,40.740439,-73.989845,40.740337,-73.989845
50%,2.0,2022-05-14 20:06:07,2022-05-14 20:16:51,1.0,1.75,1.0,0.0,1.0,10.5,0.5,...,2.2,0.0,0.3,16.63,2.5,0.0,40.758028,-73.977698,40.758028,-73.977698
75%,2.0,2023-07-13 19:09:09,2023-07-13 19:30:28,1.0,3.17,1.0,0.0,1.0,16.5,2.5,...,3.55,0.0,1.0,23.88,2.5,0.0,40.773633,-73.965146,40.774376,-73.959635
max,2.0,2024-09-30 23:52:50,2024-09-30 23:56:08,6.0,73.19,5.0,1.0,4.0,228.0,11.75,...,422.7,40.0,1.0,453.55,2.5,1.75,40.899529,-73.711026,40.899529,-73.711026
std,0.448723,,,0.971209,4.037154,0.277529,0.097477,0.459047,13.716804,1.526065,...,3.935447,1.756412,0.338178,17.813367,0.620324,0.349249,0.030732,0.04478,0.031435,0.034966


In [233]:
import os
import pandas as pd
import geopandas as gpd

# Define directories
input_directory = "uber_data/"
output_directory = "Cleaned_Uber_Data/"
os.makedirs(output_directory, exist_ok=True)

# Load taxi zone shapefile
zones_gdf = gpd.read_file('taxi_zones')  # Replace with your actual path
zones_gdf = zones_gdf.to_crs(epsg=4326)  # Ensure CRS is WGS84 for latitude and longitude

# Calculate centroids for pickup and dropoff locations
zones_gdf['centroid'] = zones_gdf.geometry.centroid
zones_gdf['latitude'] = zones_gdf.centroid.y
zones_gdf['longitude'] = zones_gdf.centroid.x
zones_df = zones_gdf[['LocationID', 'latitude', 'longitude']]

# Latitude and longitude bounds for NYC
LAT_MIN, LAT_MAX = 40.560445, 40.908524
LON_MIN, LON_MAX = -74.242330, -73.717047

def clean_uber_data(file_path):
    # Read the Parquet file
    trips_df = pd.read_parquet(file_path)
    print('Processing file:', file_path)
    
    # Retain records that are Uber rides
    trips_df = trips_df[trips_df['hvfhs_license_num'] == 'HV0003']

    # Merge trip data with zone centroids for pickups
    trips_with_pickup = trips_df.merge(
        zones_df,
        how='left',
        left_on='PULocationID',
        right_on='LocationID'
    ).rename(columns={'latitude': 'pickup_latitude', 'longitude': 'pickup_longitude'})

    # Merge trip data with zone centroids for dropoffs
    trips_with_locations = trips_with_pickup.merge(
        zones_df,
        how='left',
        left_on='DOLocationID',
        right_on='LocationID',
        suffixes=('', '_dropoff')
    ).rename(columns={'latitude': 'dropoff_latitude', 'longitude': 'dropoff_longitude'})

    # Filter out trips with invalid location IDs
    valid_trips = trips_with_locations.dropna(subset=['pickup_latitude', 'dropoff_latitude'])

    # Delete records that start_pos or end_pos is out of range
    valid_trips = valid_trips[
        (valid_trips['pickup_latitude'].between(LAT_MIN, LAT_MAX)) &
        (valid_trips['pickup_longitude'].between(LON_MIN, LON_MAX)) &
        (valid_trips['dropoff_latitude'].between(LAT_MIN, LAT_MAX)) &
        (valid_trips['dropoff_longitude'].between(LON_MIN, LON_MAX))
    ]
    
    # Delete original LocationID columns
    valid_trips.drop(['PULocationID', 'DOLocationID', 'LocationID', 'LocationID_dropoff'], axis=1, inplace=True)
    
    # Convert column names to lowercase
    valid_trips.columns = valid_trips.columns.str.lower()
    
    # Delete records where trip_miles is missing or <= 0, and convert datatype into float
    valid_trips = valid_trips.dropna(subset=['trip_miles'])
    valid_trips = valid_trips[valid_trips['trip_miles'] > 0]
    valid_trips['trip_miles'] = valid_trips['trip_miles'].astype(float)
    
    # Delete records where trip_time is missing or <= 0, and convert datatype into float
    valid_trips = valid_trips.dropna(subset=['trip_time'])
    valid_trips = valid_trips[valid_trips['trip_time'] > 0]
    valid_trips['trip_time'] = valid_trips['trip_time'].astype(float)
    
    # Delete records where base_passenger_fare, tolls, sales_tax, bcf, tips, congestion_surcharge, or driver_pay are negative
    valid_trips = valid_trips[
        (valid_trips['base_passenger_fare'] >= 0) &
        (valid_trips['tolls'] >= 0) &
        (valid_trips['sales_tax'] >= 0) &
        (valid_trips['bcf'] >= 0) &
        (valid_trips['tips'] >= 0) &
        (valid_trips['congestion_surcharge'] >= 0) &
        (valid_trips['driver_pay'] >= 0)
    ]
    
    # Convert certain flags into 0 and 1
    flag_columns = ['shared_request_flag', 'shared_match_flag', 'access_a_ride_flag', 'wav_request_flag', 'wav_match_flag']
    for col in flag_columns:
        valid_trips[col] = valid_trips[col].map({'Y': 1, 'N': 0}).fillna(0)
    
    # Delete records where dropoff_datetime is earlier than pickup_datetime
    valid_trips = valid_trips[valid_trips['dropoff_datetime'] >= valid_trips['pickup_datetime']]
    
    # Delete records where on_scene_datetime is earlier than request_datetime
    valid_trips = valid_trips[valid_trips['on_scene_datetime'] >= valid_trips['request_datetime']]
    
    # Rename 'bcf' to 'Black_Car_Fund'
    valid_trips = valid_trips.rename(columns={'bcf': 'Black_Car_Fund'})
    
    # Delete unnecessary columns
    valid_trips = valid_trips.drop(['hvfhs_license_num', 'dispatching_base_num', 'originating_base_num'], axis=1)
    
    return valid_trips

# Process all files in the input directory
all_cleaned_data = pd.DataFrame()
uber_file_names = [f for f in os.listdir(input_directory) if f.endswith('.parquet')]

for file in uber_file_names:
    file_path = os.path.join(input_directory, file)
    cleaned_data = clean_uber_data(file_path)
    output_file = os.path.join(output_directory, file)
    cleaned_data.to_parquet(output_file)
    all_cleaned_data = pd.concat([all_cleaned_data, cleaned_data], axis=0)
    print(f'File {file} processed and saved.')

# Save the consolidated cleaned data
final_output_file = os.path.join(output_directory, 'Uber_all.parquet')
all_cleaned_data.to_parquet(final_output_file)
print('All files have been processed and consolidated.')



  zones_gdf['centroid'] = zones_gdf.geometry.centroid

  zones_gdf['latitude'] = zones_gdf.centroid.y

  zones_gdf['longitude'] = zones_gdf.centroid.x


Processing file: uber_data/uber_sampled_fhvhv_tripdata_2023-07.parquet
File uber_sampled_fhvhv_tripdata_2023-07.parquet processed and saved.
Processing file: uber_data/uber_sampled_fhvhv_tripdata_2022-01.parquet
File uber_sampled_fhvhv_tripdata_2022-01.parquet processed and saved.
Processing file: uber_data/uber_sampled_fhvhv_tripdata_2022-11.parquet
File uber_sampled_fhvhv_tripdata_2022-11.parquet processed and saved.
Processing file: uber_data/uber_sampled_fhvhv_tripdata_2020-12.parquet
File uber_sampled_fhvhv_tripdata_2020-12.parquet processed and saved.
Processing file: uber_data/uber_sampled_fhvhv_tripdata_2020-02.parquet
File uber_sampled_fhvhv_tripdata_2020-02.parquet processed and saved.
Processing file: uber_data/uber_sampled_fhvhv_tripdata_2024-05.parquet
File uber_sampled_fhvhv_tripdata_2024-05.parquet processed and saved.
Processing file: uber_data/uber_sampled_fhvhv_tripdata_2022-08.parquet
File uber_sampled_fhvhv_tripdata_2022-08.parquet processed and saved.
Processing fi

In [253]:
# Define the path to the consolidated Parquet file
file_path = 'Cleaned_Uber_Data/Uber_all.parquet'

# Load the dataset into a DataFrame
df_uber = pd.read_parquet(file_path)

# Display the first few rows of the DataFrame
df_uber.describe()

Unnamed: 0,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,trip_miles,trip_time,base_passenger_fare,tolls,Black_Car_Fund,sales_tax,...,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude
count,26339,26339,26339,26339,26339.0,26339.0,26339.0,26339.0,26339.0,26339.0,...,26339.0,26339.0,26339.0,26339.0,26339.0,26339.0,26339.0,26339.0,26339.0,26339.0
mean,2022-05-18 10:37:28.525760256,2022-05-18 10:41:09.112912384,2022-05-18 10:42:14.951972096,2022-05-18 11:00:06.771365632,4.382245,1071.832833,21.071663,0.664221,0.616029,1.878243,...,16.940955,0.021565,0.009074,0.000152,0.001898,0.062455,40.737396,-73.934473,40.737116,-73.93487
min,2020-01-01 03:15:18,2020-01-01 03:24:58,2020-01-01 03:27:51,2020-01-01 03:30:21,0.01,47.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,40.561994,-74.170887,40.561994,-74.186419
25%,2021-03-04 15:12:59,2021-03-04 15:16:29,2021-03-04 15:16:40.500000,2021-03-04 15:50:35,1.54,557.0,10.535,0.0,0.29,0.92,...,8.32,0.0,0.0,0.0,0.0,0.0,40.690787,-73.984196,40.690787,-73.984052
50%,2022-05-16 04:43:12,2022-05-16 04:50:20,2022-05-16 04:50:26,2022-05-16 04:54:27,2.8,882.0,16.64,0.0,0.47,1.46,...,13.36,0.0,0.0,0.0,0.0,0.0,40.736824,-73.948522,40.737699,-73.947442
75%,2023-07-29 11:57:39,2023-07-29 12:01:35,2023-07-29 12:03:14,2023-07-29 12:30:24.500000,5.55,1380.0,26.41,0.0,0.77,2.36,...,21.65,0.0,0.0,0.0,0.0,0.0,40.774376,-73.899735,40.774376,-73.899536
max,2024-09-30 21:32:29,2024-09-30 21:35:32,2024-09-30 21:35:40,2024-09-30 21:50:59,35.71,8886.0,188.58,43.6,5.45,17.57,...,130.66,1.0,1.0,1.0,1.0,1.0,40.899529,-73.726655,40.899529,-73.726655
std,,,,,4.293501,727.302471,15.238093,2.640032,0.491771,1.417914,...,12.156578,0.145261,0.094826,0.012323,0.043529,0.241985,0.068783,0.064737,0.069002,0.067779


In [274]:
df_uber.columns.tolist()

['request_datetime',
 'on_scene_datetime',
 'pickup_datetime',
 'dropoff_datetime',
 'trip_miles',
 'trip_time',
 'base_passenger_fare',
 'tolls',
 'Black_Car_Fund',
 'sales_tax',
 'congestion_surcharge',
 'airport_fee',
 'tips',
 'driver_pay',
 'shared_request_flag',
 'shared_match_flag',
 'access_a_ride_flag',
 'wav_request_flag',
 'wav_match_flag',
 'pickup_latitude',
 'pickup_longitude',
 'dropoff_latitude',
 'dropoff_longitude']

### Processing Weather Data

We load weather data from CSV files stored in the Datasets/weather directory and select relevant columns related to answering the 6 queries in part III.
Relevant columns include hourly weather conditions, such as temperature, humidity, and wind speed.
We exclude all other columns because we believe they are not as useful in answering the questions and thus are irrelevant to our analysis.
We split the original DATE entries into Date, Hour, and Minute.
Data after August 2024 are excluded to correspond to the range data range of Trip Data

In [279]:
from typing import List
import pandas as pd
from pandas import DataFrame

def get_weather_from_files(file_paths: List[str]) -> DataFrame:
    """
    Reads weather data from multiple CSV files, processes the data to include only relevant columns, 
    splits the 'DATE' column into 'Date' and 'Hour', and filters out rows after August 2024.

    Args:
        file_paths (List[str]): A list of file paths to the CSV files containing weather data.

    Returns:
        DataFrame: A single DataFrame containing the processed weather data.
    """
    # Relevant columns to load
    relevant_columns = [
        'DATE', 
        'DailyPrecipitation', 'DailyAverageWindSpeed',
        'DailySnowfall', 'DailySnowDepth',
        'HourlyPrecipitation', 'HourlyWindSpeed', 'HourlyWindDirection', 'Sunset', 'Sunrise'
    ]
    
    # Initialize an empty list to store dataframes
    weather_data: List[DataFrame] = []

    for file_path in file_paths:
        # Read the CSV, only load relevant columns
        df = pd.read_csv(file_path, usecols=relevant_columns, low_memory=False)

        # Split the 'DATE' column into date and hour
        df[['Date', 'Hour']] = df['DATE'].str.split('T', expand=True)

        # Extract hour and minute from the 'Hour' column
        df['Minute'] = df['Hour'].str.split(':').str[1].astype(int)  # Extract minute part as integer
        df['Hour'] = df['Hour'].str.split(':').str[0].astype(int)  # Extract and convert hour to integer

        # Drop the original 'DATE' column
        df = df.drop(columns=['DATE'])
        
        # Reorder columns: 'Date' and 'Hour' should be the first two columns
        df = df[['Date', 'Hour', 'Minute'] + [col for col in df.columns if col not in ['Date', 'Hour', 'Minute']]]
        
        weather_data.append(df)

    # Concatenate all dataframes into one
    weather_df = pd.concat(weather_data, ignore_index=True)

    # Filter out rows where the date is after August 2024
    filtered_df = weather_df[weather_df['Date'] < '2024-09-01']
    
    return filtered_df


# Loading csv:
file_paths = [
    'Datasets/weather/2020_weather.csv',  
    'Datasets/weather/2021_weather.csv',
    'Datasets/weather/2022_weather.csv',  
    'Datasets/weather/2023_weather.csv',
    'Datasets/weather/2024_weather.csv'
]
weather_data = get_weather_from_files(file_paths)

Generate Hourly Weather Data

In [280]:
def weather_hourly(df: DataFrame) -> DataFrame:
    """Processes the weather data to select relevant columns and fill missing values."""
    relevant_columns = ['Date', 'Hour', 'Minute', 'HourlyPrecipitation', 'HourlyWindSpeed']
    df = df[relevant_columns].copy()  # Create a copy to avoid SettingWithCopyWarning

    # Replace missing values (NaN) in 'HourlyPrecipitation' with 0
    df['HourlyPrecipitation'] = df['HourlyPrecipitation'].fillna(0.00)

    # Replace missing values in 'HourlyWindSpeed' with the mean
    df['HourlyWindSpeed'] = df['HourlyWindSpeed'].fillna(df['HourlyWindSpeed'].mean())

    return df


In [282]:
weather_hour = weather_hourly(weather_data)
weather_hour

Unnamed: 0,Date,Hour,Minute,HourlyPrecipitation,HourlyWindSpeed
0,2020-01-01,0,51,0.00,8.000000
1,2020-01-01,1,51,0.00,8.000000
2,2020-01-01,2,51,0.00,14.000000
3,2020-01-01,3,51,0.00,11.000000
4,2020-01-01,4,51,0.00,6.000000
...,...,...,...,...,...
54587,2024-08-31,22,5,0.0,0.000000
54588,2024-08-31,22,51,0.00,0.000000
54589,2024-08-31,23,51,0.00,0.000000
54590,2024-08-31,23,59,0.0,5.148104


In [283]:
def aggregate_weather_hourly(df: DataFrame) -> DataFrame:
    """Aggregates hourly weather data by date and hour, and fills missing values."""
    df = df.copy()

    # Convert 'HourlyPrecipitation' to numeric if it's still an object
    df['HourlyPrecipitation'] = pd.to_numeric(df['HourlyPrecipitation'], errors='coerce')

    # Fill missing wind speed values with the daily mean wind speed
    df['HourlyWindSpeed'] = df.groupby('Date')['HourlyWindSpeed'].transform(
        lambda x: x.fillna(x.mean())
    )

    # Aggregate the data
    aggregated_df = df.groupby(['Date', 'Hour'], as_index=False).agg({
        'HourlyPrecipitation': 'sum',  # Sum of precipitation for the hour
        'HourlyWindSpeed': 'mean',  # Mean wind speed for the hour
    })

    # Ensure there are no NaN values in the aggregated dataframe
    aggregated_df['HourlyPrecipitation'] = aggregated_df['HourlyPrecipitation'].fillna(0.0)
    aggregated_df['HourlyWindSpeed'] = aggregated_df['HourlyWindSpeed'].fillna(
        aggregated_df['HourlyWindSpeed'].mean()
    )

    return aggregated_df

In [285]:
aggregated_hourly = aggregate_weather_hourly(weather_hour)

In [286]:
aggregated_hourly.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 40905 entries, 0 to 40904
Data columns (total 4 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   Date                 40905 non-null  object 
 1   Hour                 40905 non-null  int64  
 2   HourlyPrecipitation  40905 non-null  float64
 3   HourlyWindSpeed      40905 non-null  float64
dtypes: float64(2), int64(1), object(1)
memory usage: 1.2+ MB


Generate Daily Weather Data

In [287]:
def weather_daily(df: DataFrame) -> DataFrame:
    """Processes daily weather data to select relevant columns."""
    relevant_columns = ['Date', 'DailyAverageWindSpeed', 'DailyPrecipitation', 'DailySnowDepth', 'DailySnowfall']
    return df[relevant_columns].copy()

In [290]:
raw_weather_daily = weather_daily(weather_data)

In [291]:
raw_weather_daily.info()

<class 'pandas.core.frame.DataFrame'>
Index: 54592 entries, 0 to 54591
Data columns (total 5 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Date                   54592 non-null  object 
 1   DailyAverageWindSpeed  1646 non-null   float64
 2   DailyPrecipitation     1704 non-null   object 
 3   DailySnowDepth         1704 non-null   object 
 4   DailySnowfall          1704 non-null   object 
dtypes: float64(1), object(4)
memory usage: 2.5+ MB


In [292]:
def clean_daily_weather(df: DataFrame) -> DataFrame:
    """Cleans the daily weather data by removing rows where all key columns are NaN."""
    cleaned_df = df.copy()

    # Drop rows where all the specified columns are NaN
    cleaned_df = cleaned_df.dropna(subset=['DailyAverageWindSpeed', 'DailyPrecipitation', 'DailySnowDepth', 'DailySnowfall'], how='all')
    
    return cleaned_df

In [293]:
cleaned_daily_weather = clean_daily_weather(raw_weather_daily)

In [294]:
# Force conversion to float and replace any non-numeric values with 0
cleaned_daily_weather['DailySnowfall'] = pd.to_numeric(cleaned_daily_weather['DailySnowfall'], errors='coerce').fillna(0)
cleaned_daily_weather['DailyPrecipitation'] = pd.to_numeric(cleaned_daily_weather['DailyPrecipitation'], errors='coerce').fillna(0)
cleaned_daily_weather['DailySnowDepth'] = pd.to_numeric(cleaned_daily_weather['DailySnowDepth'], errors='coerce').fillna(0)


# Check the info again to verify the column type
cleaned_daily_weather.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1704 entries, 24 to 54590
Data columns (total 5 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Date                   1704 non-null   object 
 1   DailyAverageWindSpeed  1646 non-null   float64
 2   DailyPrecipitation     1704 non-null   float64
 3   DailySnowDepth         1704 non-null   float64
 4   DailySnowfall          1704 non-null   float64
dtypes: float64(4), object(1)
memory usage: 79.9+ KB


In [295]:
def replace_missing_daily_avg_wind_speed(hourly_df: DataFrame, daily_df: DataFrame) -> DataFrame:
    """Replaces missing daily average wind speed values in the daily dataset using hourly data."""
    # Step 1: Calculate daily average wind speed from the hourly data
    daily_avg_wind_speed = hourly_df.groupby('Date', as_index=False).agg({
        'HourlyWindSpeed': 'mean'  # Calculate mean wind speed for each day
    }).rename(columns={'HourlyWindSpeed': 'CalculatedDailyAvgWindSpeed'})
    
    # Step 2: Merge the daily calculated values into the daily weather DataFrame
    merged_df = daily_df.merge(daily_avg_wind_speed, on='Date', how='left')
    
    # Step 3: Replace missing values in 'DailyAverageWindSpeed' with calculated values
    merged_df['DailyAverageWindSpeed'] = merged_df['DailyAverageWindSpeed'].fillna(
        merged_df['CalculatedDailyAvgWindSpeed']
    )
    
    # Drop the helper column 'CalculatedDailyAvgWindSpeed' if no longer needed
    merged_df = merged_df.drop(columns=['CalculatedDailyAvgWindSpeed'])
    
    return merged_df

In [296]:
updated_daily_weather = replace_missing_daily_avg_wind_speed(aggregated_hourly, cleaned_daily_weather)
updated_daily_weather.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1704 entries, 0 to 1703
Data columns (total 5 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Date                   1704 non-null   object 
 1   DailyAverageWindSpeed  1704 non-null   float64
 2   DailyPrecipitation     1704 non-null   float64
 3   DailySnowDepth         1704 non-null   float64
 4   DailySnowfall          1704 non-null   float64
dtypes: float64(4), object(1)
memory usage: 66.7+ KB


Daily data on Sunrise and Sunset

In [350]:
def sunrise_daily(df: DataFrame) -> DataFrame:
    """Processes daily weather data to select relevant columns."""
    relevant_columns = ['Date','Sunset', 'Sunrise']
    return df[relevant_columns].copy()

In [351]:
sunset_sunrise = sunrise_daily(weather_data)

In [352]:
sunset_sunrise = sunset_sunrise.dropna(subset=['Sunset', 'Sunrise'], how='all')

In [353]:
sunset_sunrise.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1704 entries, 24 to 54590
Data columns (total 3 columns):
 #   Column   Non-Null Count  Dtype  
---  ------   --------------  -----  
 0   Date     1704 non-null   object 
 1   Sunset   1704 non-null   float64
 2   Sunrise  1704 non-null   float64
dtypes: float64(2), object(1)
memory usage: 53.2+ KB


In [354]:
def convert_to_time(value):
    value = int(value)  # Ensure the value is an integer
    hours = value // 100  # Extract hours
    minutes = value % 100  # Extract minutes
    return f"{hours:02d}:{minutes:02d}"  # Format as HH:MM

sunset_sunrise = sunset_sunrise.copy()

sunset_sunrise["Sunset"] = sunset_sunrise["Sunset"].apply(convert_to_time)
sunset_sunrise["Sunrise"] = sunset_sunrise["Sunrise"].apply(convert_to_time)

In [355]:
sunset_sunrise

Unnamed: 0,Date,Sunset,Sunrise
24,2020-01-01,16:39,07:20
49,2020-01-02,16:40,07:20
86,2020-01-03,16:41,07:20
144,2020-01-04,16:42,07:20
169,2020-01-05,16:43,07:20
...,...,...,...
54461,2024-08-27,18:35,05:19
54486,2024-08-28,18:33,05:20
54518,2024-08-29,18:32,05:21
54552,2024-08-30,18:30,05:22


## Part 2: Storing Cleaned Data

In [363]:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Define the database URL
DATABASE_URL = 'sqlite:///project.db'

# Create an engine instance
engine = create_engine(DATABASE_URL)

# Create a declarative base class
Base = declarative_base()

# Create a configured "Session" class
Session = sessionmaker(bind=engine)

# Create a Session
session = Session()


  Base = declarative_base()


In [364]:
from sqlalchemy import Column, Integer, String, Float, DateTime, Date

class HourlyWeather(Base):
    __tablename__ = 'hourly_weather'
    id = Column(Integer, primary_key=True, autoincrement=True)
    date = Column(Date, nullable=False)
    hour = Column(Integer)
    hourly_precipitation = Column(Float)
    hourly_wind_speed = Column(Float)
    hourly_wind_direction = Column(String)

class DailyWeather(Base):
    __tablename__ = 'daily_weather'
    id = Column(Integer, primary_key=True, autoincrement=True)
    date = Column(Date, nullable=False)
    daily_avg_wind_speed = Column(Float)
    daily_precipitation = Column(Float)
    daily_snow_depth = Column(Float)
    daily_snow_fall = Column(Float)

class UberTrip(Base):
    __tablename__ = 'uber_trips'
    id = Column(Integer, primary_key=True, autoincrement=True)
    request_datetime = Column(DateTime)
    on_scene_datetime = Column(DateTime)
    pickup_datetime = Column(DateTime)
    dropoff_datetime = Column(DateTime)
    trip_miles = Column(Float)
    trip_time = Column(Float)
    base_passenger_fare = Column(Float)
    tolls = Column(Float)
    black_car_fund = Column(Float)  
    sales_tax = Column(Float)
    congestion_surcharge = Column(Float)
    airport_fee = Column(Float)
    tips = Column(Float)
    driver_pay = Column(Float)
    shared_request_flag = Column(Integer)  # Binary Variable
    shared_match_flag = Column(Integer)    # Binary Variable
    access_a_ride_flag = Column(Integer)   # Binary Variable
    wav_request_flag = Column(Integer)     # Binary Variable
    wav_match_flag = Column(Integer)       # Binary Variable
    pickup_latitude = Column(Float)
    pickup_longitude = Column(Float)
    dropoff_latitude = Column(Float)
    dropoff_longitude = Column(Float)

class YellowTaxiTrip(Base):
    __tablename__ = 'yellow_taxi_trips'
    id = Column(Integer, primary_key=True, autoincrement=True)
    vendorid = Column(Integer)
    pickup_datetime = Column(DateTime)
    dropoff_datetime = Column(DateTime)
    passenger_count = Column(Integer)
    trip_distance = Column(Float)
    rate_code_id = Column(Integer)  
    store_and_fwd_flag = Column(Integer)  # Binary Variable
    payment_type = Column(Integer)
    fare_amount = Column(Float)
    miscellaneous_extras = Column(Float)  
    mta_tax = Column(Float)
    tip_amount = Column(Float)
    tolls_amount = Column(Float)
    improvement_surcharge = Column(Float)
    total_amount = Column(Float)
    congestion_surcharge = Column(Float)
    airport_fee = Column(Float)
    pickup_latitude = Column(Float)
    pickup_longitude = Column(Float)
    dropoff_latitude = Column(Float)
    dropoff_longitude = Column(Float)

class Sun_Data(Base):
    __tablename__ = 'sun_data'
    id = Column(Integer, primary_key=True, autoincrement=True)
    Date = Column(Date)
    Sunrise = Column(DateTime)
    Sunset = Column(DateTime)

In [365]:
# Create all tables in the database
Base.metadata.create_all(engine)


In [367]:
# Load the Uber trips dataset
df_uber = pd.read_parquet('Cleaned_Uber_Data/Uber_all.parquet')

# Load the Yellow Taxi trips dataset
df_taxi = pd.read_parquet('Cleaned_Taxi_Data/Taxi_all.parquet')

# Assuming aggregated_hourly and updated_daily_weather are functions
# that return DataFrames for hourly and daily weather data respectively
df_hourly_weather = aggregated_hourly
df_daily_weather = updated_daily_weather
df_sunset_sunrise = sunset_sunrise

In [368]:
# Insert data into the Uber trips table
df_uber.to_sql('uber_trips', con=engine, if_exists='replace', index=False)

# Insert data into the Yellow Taxi trips table
df_taxi.to_sql('yellow_taxi_trips', con=engine, if_exists='replace', index=False)

# Insert data into the Hourly Weather table
df_hourly_weather.to_sql('hourly_weather', con=engine, if_exists='replace', index=False)

# Insert data into the Daily Weather table
df_daily_weather.to_sql('daily_weather', con=engine, if_exists='replace', index=False)

df_sunset_sunrise.to_sql('daily_weather', con=engine, if_exists='replace', index=False)


1704

## Create a SQL Schema 

In [369]:
from sqlalchemy.schema import CreateTable

# Open a file to write the schema
with open('schema.sql', 'w') as f:
    for table in Base.metadata.sorted_tables:
        create_table_sql = str(CreateTable(table).compile(engine))
        f.write(f"{create_table_sql};\n\n")


## Part 3: Understanding the Data

In [None]:
# Helper function to write the queries to file
def write_query_to_file(query, outfile):
    raise NotImplementedError()

### Query 1

In [None]:
QUERY_1_FILENAME = ""

QUERY_1 = """
TODO
"""

In [None]:
# execute query either via sqlalchemy
with engine.connect() as con:
    results = con.execute(db.text(QUERY_1)).fetchall()
results

# or via pandas
pd.read_sql(QUERY_1, con=engine)

In [None]:
write_query_to_file(QUERY_1, QUERY_1_FILENAME)

## Part 4: Visualizing the Data

### Visualization 1

In [None]:
# use a more descriptive name for your function
def plot_visual_1(dataframe):
    figure, axes = plt.subplots(figsize=(20, 10))
    
    values = "..."  # use the dataframe to pull out values needed to plot
    
    # you may want to use matplotlib to plot your visualizations;
    # there are also many other plot types (other 
    # than axes.plot) you can use
    axes.plot(values, "...")
    # there are other methods to use to label your axes, to style 
    # and set up axes labels, etc
    axes.set_title("Some Descriptive Title")
    
    plt.show()

In [None]:
def get_data_for_visual_1():
    # Query SQL database for the data needed.
    # You can put the data queried into a pandas dataframe, if you wish
    raise NotImplementedError()

In [None]:
some_dataframe = get_data_for_visual_1()
plot_visual_1(some_dataframe)