## Project Setup

In [1]:
# all import statements needed for the project, for example:
import os
import bs4
from bs4 import BeautifulSoup
import matplotlib.pyplot as plt
import pandas as pd
import geopandas as gpd
from geopandas import GeoDataFrame
import requests
import sqlalchemy as db
import re
from datetime import datetime, timedelta
import numpy as np
import fiona
import math
import pytest
from shapely.geometry import Point
from typing import Optional, Tuple, List

To accomplish our project, we use several libraries to handle tasks such as data processing, visualization, database interaction, and geospatial analysis. Here’s a brief overview:

- **File and Directory Management**:
  - `os`: Helps manage directories and file paths.

- **Web Scraping**:
  - `bs4` and `BeautifulSoup`: Extract data from web pages.

- **Data Handling**:
  - `pandas`: Works with tabular datasets, like spreadsheets.
  - `geopandas`: Adds geospatial capabilities to pandas for working with maps and location data.

- **Database Interaction**:
  - `sqlalchemy`: Connects our data to a SQLite database for storage and querying.

- **Math and Statistics**:
  - `math` and `numpy`: Perform calculations and handle arrays of data.

- **Date and Time**:
  - `datetime`: Manage dates and times for our analysis.

- **Regular Expressions**:
  - `re`: Helps search for patterns in text, like extracting URLs.

- **Testing**:
  - `pytest`: Ensures our functions work as expected through automated testing.

- **Visualization**:
  - `matplotlib.pyplot`: Creates graphs and charts to visualize trends and patterns.

- **Geospatial Data**:
  - `fiona`, `shapely`, and `GeoDataFrame`: Handle maps, locations, and geographic shapes.

This mix of tools enables us to handle various aspects of the project, from downloading and cleaning data to storing it in a database and creating visualizations.


In [2]:
# any constants you might need; some have been added for you, and 
# some you need to fill in

TLC_URL = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

TAXI_ZONES_DIR = ""
TAXI_ZONES_SHAPEFILE = f"{TAXI_ZONES_DIR}/taxi_zones.shp"
WEATHER_CSV_DIR = ""

CRS = 4326  # coordinate reference system

# (lat, lon)
NEW_YORK_BOX_COORDS = ((40.560445, -74.242330), (40.908524, -73.717047))
LGA_BOX_COORDS = ((40.763589, -73.891745), (40.778865, -73.854838))
JFK_BOX_COORDS = ((40.639263, -73.795642), (40.651376, -73.766264))
EWR_BOX_COORDS = ((40.686794, -74.194028), (40.699680, -74.165205))

DATABASE_URL = "sqlite:///project.db"
DATABASE_SCHEMA_FILE = "schema.sql"
QUERY_DIRECTORY = "queries"

## Defining Project Constants

Constants are fixed values that we use throughout the project. By defining them in one place, it’s easier to maintain consistency and make changes if needed. Below is a simplified explanation of the constants used in the project:

### Data Source URLs
- **`TLC_URL`**:  
  The website link where we download ride data for New York City, including taxi and Uber trips.

### Taxi Zones Information
- **`TAXI_ZONES_DIR`**:  
  The folder where information about NYC taxi zones is stored.
- **`TAXI_ZONES_SHAPEFILE`**:  
  A file that maps the boundaries of taxi zones in NYC. This is important for identifying pickup and drop-off locations geographically.

### Weather Data Location
- **`WEATHER_CSV_DIR`**:  
  The folder where we keep cleaned weather data files.

### Geographic Settings
- **`CRS`**:  
  This stands for "Coordinate Reference System," a standard used to define locations on Earth (here, latitude and longitude).

#### Bounding Boxes for Geographic Areas
Bounding boxes are defined by latitude and longitude pairs and are used to filter rides or weather data based on specific locations:
- **`NEW_YORK_BOX_COORDS`**:  
  The boundaries of New York City. Used to filter rides within the city.
- **`LGA_BOX_COORDS`**:  
  The boundaries of LaGuardia Airport.
- **`JFK_BOX_COORDS`**:  
  The boundaries of John F. Kennedy International Airport.
- **`EWR_BOX_COORDS`**:  
  The boundaries of Newark Liberty International Airport.

### Database Configuration
- **`DATABASE_URL`**:  
  The location of the SQLite database where all cleaned data will be stored.
- **`DATABASE_SCHEMA_FILE`**:  
  A file that defines the structure (or schema) of the database tables.
- **`QUERY_DIRECTORY`**:  
  A folder where SQL query files for analysis will be saved.


In [3]:
# Make sure the QUERY_DIRECTORY exists
try:
    os.mkdir(QUERY_DIRECTORY)
except Exception as e:
    if e.errno == 17:
        # the directory already exists
        pass
    else:
        raise

## Ensuring the Existence of the Query Directory

This part of the code ensures that the folder (directory) where we will save our SQL query files is created and ready to use. The directory is defined by the constant `QUERY_DIRECTORY`.

### Why Is This Important?
During the project, we will generate and save SQL query files. To do this, we need a specific folder to keep them organized. This code checks if the folder already exists, and if not, it creates it.

### How It Works
1. **Create the Folder**:  
   The program attempts to create the folder specified by `QUERY_DIRECTORY`.

2. **Handle Existing Folders**:  
   - If the folder already exists, Python might raise an error. The code is designed to detect this specific situation and ignore the error, allowing the program to continue running without issues.
   - If a different error occurs (unrelated to the folder already existing), the program will stop and notify the user.

This process ensures the folder is always available without accidentally overwriting or creating duplicate folders.

By doing this early in the program, we avoid problems later when saving query files, keeping everything organized and efficient.


# Part 1: Data Preprocessing

## Load Taxi Zones

This section focuses on handling geographic data related to NYC taxi zones, which is essential for mapping and analyzing taxi and Uber rides.

### 1. Loading Taxi Zone Information

We use the **`load_taxi_zones`** function to load data about taxi zones from a geospatial file, such as a shapefile or GeoJSON. This data is stored in a **GeoDataFrame**, which allows us to perform geographic operations (e.g., finding locations or boundaries).

#### Why Is This Important?
- Taxi zones help us identify where rides start and end geographically.
- This information is used throughout the project for mapping and spatial analysis.

### 2. Getting the Coordinates for a Taxi Zone

The **`lookup_coords_for_taxi_zone_id`** function helps find the central point (latitude and longitude) of a taxi zone using its **LocationID**.

#### How It Works:
- If the LocationID is valid, the function calculates the **centroid** (geographic center) of the taxi zone and returns the coordinates (latitude and longitude).
- If the LocationID is not found, the function returns `None`, indicating the zone is invalid or missing.

#### Why Is This Important?
- Coordinates allow us to connect geographic zones with ride data, making it possible to analyze trips based on pickup and drop-off locations.
- This is especially helpful for visualizing the data on maps or filtering rides by specific areas.

These functions are crucial for linking spatial data with ride information, enabling us to analyze and visualize geographic trends effectively.

In [4]:
def load_taxi_zones(file_path: str) -> GeoDataFrame:
    """
    Load taxi zones from a shapefile or GeoJSON file into a GeoDataFrame.

    Args:
        file_path (str): The path to the geospatial file (e.g., a shapefile or GeoJSON).

    Returns:
        GeoDataFrame: A GeoDataFrame containing the taxi zones data.
    """
    geofile: GeoDataFrame = gpd.read_file(file_path)
    return geofile

In [5]:
taxi_zones = load_taxi_zones("taxi_zones/taxi_zones.shp")

In [14]:
def lookup_coords_for_taxi_zone_id(
    zone_loc_id: int, 
    loaded_taxi_zones: GeoDataFrame
) -> Optional[Tuple[float, float]]:
    """
    Look up the latitude and longitude coordinates for a taxi zone by its LocationID.

    Args:
        zone_loc_id (int): The LocationID of the taxi zone.
        loaded_taxi_zones (GeoDataFrame): A GeoDataFrame containing taxi zone geometries.

    Returns:
        Optional[Tuple[float, float]]: A tuple containing the latitude and longitude 
        coordinates if the zone is found, or None if the LocationID is not in the data.
    """
    # Ensure the CRS is set if missing
    if loaded_taxi_zones.crs is None:
        loaded_taxi_zones = loaded_taxi_zones.set_crs(epsg=2263)

    # Find the zone matching the LocationID
    zone = loaded_taxi_zones[loaded_taxi_zones['LocationID'] == zone_loc_id]
    if zone.empty:
        return None
    projected_zone = zone.to_crs(epsg=2263)
    centroid = projected_zone.geometry.centroid.iloc[0]
    centroid_geo = gpd.GeoSeries([centroid], crs=2263).to_crs(epsg=4326)

    # Return latitude and longitude as a tuple
    return (centroid_geo.geometry.iloc[0].y, centroid_geo.geometry.iloc[0].x)

## Calculate Sample Size

This section explains how we determine the required sample size for analyzing large datasets. We use a statistical method called **Cochran's formula**, which is adjusted for finite population sizes.

### Purpose
When working with massive datasets, analyzing every single data point can be inefficient. Instead, we calculate a representative sample size to ensure accurate results while saving time and resources.

### Function: `calculate_sample_size`

This function calculates the sample size based on the population size and desired statistical confidence.

#### Inputs:
1. **`population`**: The total number of available data points.
2. **`p`** (default: 0.5): Represents the proportion of the population with the characteristic of interest. A default of 0.5 is used for maximum variability, meaning we assume the most uncertain case.

#### Process:
1. **Confidence Level**: The function uses a Z-value of **1.96**, which corresponds to a **95% confidence level**.
2. **Margin of Error**: Assumes a **5% margin of error** by default.
3. **Cochran's Formula**: The initial sample size (`n_0`) is calculated using the formula:
   $$
   n_0 = \frac{Z^2 \cdot p \cdot (1-p)}{(\text{margin of error})^2}
   $$
4. **Adjustment for Finite Population**: The sample size is refined to account for finite population sizes:
   $$
   \text{Adjusted Sample Size} = \frac{n_0}{1 + \frac{n_0 - 1}{\text{population}}}
   $$

#### Output:
- The function returns the **adjusted sample size**, rounded up to the nearest whole number.

#### Why Is This Important?
- Ensures the sample is large enough to represent the dataset accurately.
- Reduces the computational load by avoiding the need to process the entire dataset.
- Maintains statistical reliability for analysis.

In [17]:
def calculate_sample_size(population: int, p: float = 0.5) -> int:
    """
    Calculate the required sample size using Cochran's formula, adjusted for finite populations.

    Args:
        population (int): The total population size.
        p (float, optional): Proportion of the population with the desired characteristic. 
            Defaults to 0.5 for maximum variability.

    Returns:
        int: The calculated sample size, rounded up to the nearest whole number.
    """
    # Z-value for 95% confidence level
    z: float = 1.96
    
    # Margin of error (5% by default)
    margin_of_error: float = 0.05
    
    # Complementary proportion
    q: float = 1 - p
    
    # Cochran's formula
    n_0: float = (z**2 * p * q) / (margin_of_error**2)
    
    # Adjust for finite population
    sample_size: float = n_0 / (1 + (n_0 - 1) / population)
    
    return math.ceil(sample_size)

## Common Functions

This section provides two utility functions that automate the process of retrieving and filtering dataset links from a webpage. These functions help streamline the downloading of required files for the project. (By calling function directly, we can speed up the progress without rewriting the code again and again)

### 1. `get_all_urls_from_page`

#### Purpose:
- Extracts all web links (URLs) from a given webpage.

#### Key Features:
- **Fetching the Webpage**: Uses the `requests` library to download the HTML content of the page.
- **Parsing Links**: Employs `BeautifulSoup` to find all anchor tags (`<a>`) with `href` attributes, which contain the links.
- **Error Handling**: Ensures the function handles connectivity issues or invalid webpages gracefully, raising descriptive errors when necessary.

---

### 2. `filter_parquet_urls`

#### Purpose:
- Filters the list of extracted links to include only Parquet files.

#### Key Features:
- **URL Normalization**: Strips unnecessary whitespace from each link.
- **Regular Expression Matching**: Uses a pattern to identify URLs that end with `.parquet` (e.g., `file.parquet` or `file.parquet?params`).
- **Efficient Filtering**: Ensures only relevant file links are returned, saving time and effort during the data downloading process.

---

### Why Are These Functions Important?

- They automate the process of collecting and filtering dataset links, eliminating the need for manual URL extraction.
- Ensures consistency and accuracy in identifying the correct file formats (e.g., Parquet files) needed for the project.
- Saves significant time when dealing with large or frequently updated webpages containing multiple links.


In [20]:
def get_all_urls_from_page(page_url: str) -> List[str]:
    """
    Fetch all URLs from a given webpage.

    Args:
        page_url (str): URL of the webpage to scrape.

    Returns:
        List[str]: A list of all URLs found on the webpage.

    Raises:
        Exception: If there is an error accessing the webpage or scraping its content.
    """
    try:
        response = requests.get(page_url)
        response.raise_for_status()  # Raise an HTTPError for bad responses
    except requests.exceptions.RequestException as e:
        raise Exception(f"Failed to access the URL: {page_url}. Error: {e}")
    
    # Parse the HTML content of the page
    soup = BeautifulSoup(response.content, "html.parser")
    links = soup.find_all("a", href=True)
    all_urls: List[str] = [link["href"] for link in links]
    
    return all_urls

In [21]:
def filter_parquet_urls(links: List[str]) -> List[str]:
    """
    Filter a list of URLs to include only those pointing to Parquet files.

    Args:
        links (List[str]): A list of URLs to filter.

    Returns:
        List[str]: A list of URLs that point to Parquet files.
    """
    parquet_urls: List[str] = []
    for url in links:
        # Normalize the URL
        url = url.strip()
        if re.search(r"\.parquet(\?.*)?$", url):
            parquet_urls.append(url)
    
    return parquet_urls

## Process Taxi Data

This section processes Yellow Taxi data from the NYC Taxi & Limousine Commission (TLC) webpage after extracting all parquet urls from the website. The goal is to clean and prepare the data for analysis, ensuring it is accurate, consistent, and manageable.

### Steps Involved

#### 1. Download and Process Monthly Data
##### Function: `get_and_clean_taxi_month`
- Downloads a monthly Yellow Taxi data file in Parquet format.
- Creates a sample dataset to make large datasets more manageable.
- Saves the processed data locally for reuse.

---

#### 2. Process Multiple Files
##### Function: `get_and_clean_taxi_data`
- Collects and processes multiple Yellow Taxi files from a list of URLs.
- Combines all the monthly datasets into one comprehensive dataset.

---

#### 3. Clean and Filter the Data
##### Function: `clean_taxi_data`
This function cleans the combined dataset to make it consistent and ready for analysis:
- **Renames Columns**: Normalizes column names for consistency.
- **Data Types**: Converts data into the correct formats (e.g., dates and numbers).
- **Coordinates**: Adds latitude and longitude for pickup and drop-off zones.
- **Invalid Data**: Removes trips with errors like zero distance or invalid times.
- **NYC Bounding Box**: Filters trips to include only those within New York City's geographic boundaries.
- **Relevant Columns**: Keeps only the essential columns needed for analysis.

---

#### 4. Combine Everything
##### Function: `get_taxi_data`
- Combines the above steps to:
  1. Fetch URLs for Yellow Taxi data files from the TLC webpage.
  2. Filter URLs to keep only relevant files.
  3. Download, process, and clean the taxi data.

This process ensures the data is ready for analysis and visualization in later stages of the project.


In [24]:
def get_and_clean_taxi_month(parquet_url: str) -> pd.DataFrame:
    """
    Download, process, and sample a Yellow Taxi dataset for a given month.

    Args:
        parquet_url (str): URL of the Yellow Taxi Parquet file.

    Returns:
        pd.DataFrame: A sampled and processed DataFrame.
    """
    # Default directory for Yellow Taxi data
    save_dir = "processed_data/yellow_taxi"
    os.makedirs(save_dir, exist_ok=True)

    # Extract file name and define local path
    file_name = parquet_url.split("/")[-1]
    local_file_path = os.path.join(save_dir, file_name)

    # Download the file if not exist
    if not os.path.exists(local_file_path):
        response = requests.get(parquet_url, stream=True)
        response.raise_for_status()
        with open(local_file_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=1024 * 1024):  # 1MB chunks
                if chunk:
                    f.write(chunk)

    # Load the dataset
    data = pd.read_parquet(local_file_path)

    # Calculate sample size (using p = 0.5 for Yellow Taxi data)
    population = len(data)
    sample_size = calculate_sample_size(population, p = 0.5)
    sampled_data = data.sample(n=sample_size, random_state=42) if population > sample_size else data
    
    processed_file_path = os.path.join(save_dir, f"sampled_{file_name}")
    sampled_data.to_parquet(processed_file_path)
    return sampled_data


In [25]:
def get_and_clean_taxi_data(parquet_urls: List[str]) -> pd.DataFrame:
    """
    Download, process, and combine Yellow Taxi data from multiple Parquet file URLs.

    Args:
        parquet_urls (List[str]): A list of URLs pointing to Yellow Taxi Parquet files.

    Returns:
        pd.DataFrame: A combined DataFrame containing processed data from all valid Parquet files.
    """
    all_taxi_dataframes: List[pd.DataFrame] = []

    # Regex pattern for valid Yellow Taxi Parquet files (2020-2024 dates)
    yellow_taxi_pattern: re.Pattern = re.compile(
        r"yellow_tripdata_(2020-(0[1-9]|1[0-2])|202[1-3]-(0[1-9]|1[0-2])|2024-(0[1-8]))\.parquet"
    )

    # Filter URLs matching the pattern
    yellow_taxi_urls: List[str] = [url for url in parquet_urls if yellow_taxi_pattern.search(url)]

    for url in yellow_taxi_urls:
        save_dir: str = "processed_data/yellow_taxi"
        file_name: str = f"sampled_{url.split('/')[-1]}"
        processed_file_path: str = os.path.join(save_dir, file_name)

        if os.path.exists(processed_file_path):
            dataframe: pd.DataFrame = pd.read_parquet(processed_file_path)
        else:
            dataframe: pd.DataFrame = get_and_clean_taxi_month(url)

        all_taxi_dataframes.append(dataframe)

    # Combine all DataFrames into one
    taxi_data: pd.DataFrame = pd.concat(all_taxi_dataframes, ignore_index=True)

    return taxi_data

### Note for column(variable) selection
Regarding the formula (base fare + all surcharges + taxes + tolls), we calculate the total amount as follows: it is equal to fare_amount + extra + mta_tax + tip_amount + tolls_amount + improvement_surcharge + congestion_surcharge + airport_fee, we will assume the subsurcharges segment includes extra, tip_amount, improvement_surcharge, congestion_surcharge and airport_fee.

In [71]:
def clean_taxi_data(taxi_data: pd.DataFrame) -> pd.DataFrame:
    """
    Clean the taxi data by retaining specific columns, normalizing column names,
    converting column types, and removing invalid trips. This includes unifying column names
    across different data sources using a predefined mapping.

    Args:
        taxi_data (pd.DataFrame): The input taxi data DataFrame.

    Returns:
        pd.DataFrame: The cleaned and filtered taxi data.
    """

    # Normalize column names using the column_mapping
    column_mapping = {
    'tpep_pickup_datetime': 'pickup_datetime',
    'tpep_dropoff_datetime': 'dropoff_datetime',
    'trip_distance': 'trip_miles'}
   
    taxi_data = taxi_data.rename(columns=column_mapping)

    
    # Add latitude and longitude
    taxi_data["PU_coords"] = taxi_data["PULocationID"].apply(lambda x: lookup_coords_for_taxi_zone_id(x, taxi_zones))
    taxi_data["DO_coords"] = taxi_data["DOLocationID"].apply(lambda x: lookup_coords_for_taxi_zone_id(x, taxi_zones))
    taxi_data = taxi_data.dropna(subset=["PU_coords", "DO_coords"]).reset_index(drop=True)
    taxi_data[["PU_lat", "PU_lon"]] = pd.DataFrame(taxi_data["PU_coords"].tolist(), index=taxi_data.index)
    taxi_data[["DO_lat", "DO_lon"]] = pd.DataFrame(taxi_data["DO_coords"].tolist(), index=taxi_data.index)
    taxi_data = taxi_data.drop(columns=["PU_coords", "DO_coords"])
    
    # Retain only the required columns
    required_columns = [
        'pickup_datetime', 'dropoff_datetime', 'trip_miles',
        'PU_lat', 'PU_lon', 'DO_lat', 'DO_lon', 'total_amount',
        'fare_amount', 'extra', 'mta_tax', 'tip_amount', 
        'tolls_amount', 'improvement_surcharge', 'congestion_surcharge', 'airport_fee'
    ]
    taxi_data = taxi_data[required_columns]

    # Removing Invalid Data Points
    taxi_data = taxi_data[taxi_data["trip_miles"] > 0]
    taxi_data = taxi_data[taxi_data["total_amount"] > 0]
    taxi_data = taxi_data[taxi_data["fare_amount"] > 0]
    taxi_data = taxi_data[taxi_data["pickup_datetime"] < taxi_data["dropoff_datetime"]]

    #Normalize column names
    taxi_data.columns = [col.lower() for col in taxi_data.columns]

    # Normalizing and Using Appropriate Column Types
    taxi_data["pickup_datetime"] = pd.to_datetime(taxi_data["pickup_datetime"], errors="coerce")
    taxi_data["dropoff_datetime"] = pd.to_datetime(taxi_data["dropoff_datetime"], errors="coerce")
    numeric_columns = [
        "trip_miles", "pu_lat", "pu_lon", "do_lat", "do_lon", "total_amount",
        'fare_amount', 'extra', 'mta_tax', 'tip_amount', 
        'tolls_amount', 'improvement_surcharge', 'congestion_surcharge', 'airport_fee'
    ]
    # Process numeric columns
    taxi_data[numeric_columns] = taxi_data[numeric_columns].apply(pd.to_numeric, errors="coerce")
    
    # Drop rows where pickup_datetime or dropoff_datetime is NaN
    taxi_data = taxi_data.dropna(subset=["pickup_datetime", "dropoff_datetime"])
    
    # Fill NaN values in numeric columns with 0
    taxi_data[numeric_columns] = taxi_data[numeric_columns].fillna(0)

    # Removing Trips Outside the Latitude/Longitude Bounding Box
    lat_min, lon_min = 40.560445, -74.242330
    lat_max, lon_max = 40.908524, -73.717047
    taxi_data = taxi_data[
        (taxi_data["pu_lat"] >= lat_min) & (taxi_data["pu_lat"] <= lat_max) &
        (taxi_data["pu_lon"] >= lon_min) & (taxi_data["pu_lon"] <= lon_max) &
        (taxi_data["do_lat"] >= lat_min) & (taxi_data["do_lat"] <= lat_max) &
        (taxi_data["do_lon"] >= lon_min) & (taxi_data["do_lon"] <= lon_max)
    ]

    # Reset index after filtering
    taxi_data = taxi_data.reset_index(drop=True)

    return taxi_data

In [73]:
def get_taxi_data() -> pd.DataFrame:
    """
    Retrieve, filter, and clean taxi data from the TLC data page.

    This function performs the following steps:
    1. Fetches all URLs from the TLC webpage.
    2. Filters the URLs to retain only those pointing to Parquet files.
    3. Downloads, processes, and combines the data from these Parquet files.

    Returns:
        pd.DataFrame: A DataFrame containing the combined and cleaned taxi data.
    """
    all_urls: list[str] = get_all_urls_from_page(TLC_URL)
    all_parquet_urls: list[str] = filter_parquet_urls(all_urls)
    taxi_data: pd.DataFrame = get_and_clean_taxi_data(all_parquet_urls)

    return taxi_data

In [75]:
taxi_data = get_taxi_data()
taxi_data = clean_taxi_data(taxi_data)

In [76]:
taxi_data.head()

Unnamed: 0,pickup_datetime,dropoff_datetime,trip_miles,pu_lat,pu_lon,do_lat,do_lon,total_amount,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,congestion_surcharge,airport_fee
0,2024-01-20 13:31:30,2024-01-20 14:03:25,17.14,40.646985,-73.78653,40.749914,-73.970443,90.96,70.0,0.0,0.5,8.27,6.94,1.0,2.5,0.0
1,2024-01-18 21:52:46,2024-01-18 22:03:21,2.49,40.764421,-73.977569,40.790011,-73.94575,22.5,13.5,1.0,0.5,4.0,0.0,1.0,2.5,0.0
2,2024-01-01 03:43:58,2024-01-01 03:50:47,1.84,40.866075,-73.919308,40.857779,-73.885867,12.5,10.0,1.0,0.5,0.0,0.0,1.0,0.0,0.0
3,2024-01-19 22:20:12,2024-01-19 22:50:12,3.6,40.748497,-73.992438,40.778766,-73.95101,33.95,23.3,3.5,0.5,5.65,0.0,1.0,2.5,0.0
4,2024-01-06 22:41:50,2024-01-06 22:43:24,0.04,40.791705,-73.973049,40.791705,-73.973049,6.2,3.7,1.0,0.5,0.0,0.0,1.0,0.0,0.0


In [77]:
taxi_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20710 entries, 0 to 20709
Data columns (total 16 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   pickup_datetime        20710 non-null  datetime64[ns]
 1   dropoff_datetime       20710 non-null  datetime64[ns]
 2   trip_miles             20710 non-null  float64       
 3   pu_lat                 20710 non-null  float64       
 4   pu_lon                 20710 non-null  float64       
 5   do_lat                 20710 non-null  float64       
 6   do_lon                 20710 non-null  float64       
 7   total_amount           20710 non-null  float64       
 8   fare_amount            20710 non-null  float64       
 9   extra                  20710 non-null  float64       
 10  mta_tax                20710 non-null  float64       
 11  tip_amount             20710 non-null  float64       
 12  tolls_amount           20710 non-null  float64       
 13  i

In [78]:
taxi_data.describe()

Unnamed: 0,pickup_datetime,dropoff_datetime,trip_miles,pu_lat,pu_lon,do_lat,do_lon,total_amount,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,congestion_surcharge,airport_fee
count,20710,20710,20710.0,20710.0,20710.0,20710.0,20710.0,20710.0,20710.0,20710.0,20710.0,20710.0,20710.0,20710.0,20710.0,20710.0
mean,2022-04-29 15:32:02.506904832,2022-04-29 15:48:05.330613248,3.271791,40.753381,-73.966982,40.755782,-73.970786,22.497828,15.378428,1.221689,0.497586,2.701182,0.437548,0.551994,2.21282,0.033679
min,2020-01-01 00:11:06,2020-01-01 00:30:50,0.01,40.576961,-74.029893,40.576961,-74.174002,1.3,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,2021-02-27 01:11:26.750000128,2021-02-27 01:26:18.249999872,1.09,40.740337,-73.989845,40.740337,-73.989845,12.8,7.5,0.0,0.5,0.0,0.0,0.3,2.5,0.0
50%,2022-04-28 13:02:38.500000,2022-04-28 13:12:18,1.805,40.758028,-73.977698,40.758028,-73.977698,17.02,10.7,0.5,0.5,2.16,0.0,0.3,2.5,0.0
75%,2023-06-27 15:31:13,2023-06-27 15:53:30.500000,3.31,40.773633,-73.961764,40.775932,-73.959635,24.5,17.0,2.5,0.5,3.45,0.0,1.0,2.5,0.0
max,2024-08-31 22:43:47,2024-08-31 23:26:23,67.9,40.899528,-73.739337,40.899528,-73.726655,262.7,209.5,11.75,0.5,50.0,40.0,1.0,2.5,1.25
std,,,4.111148,0.032344,0.044978,0.033091,0.036046,17.35857,13.648988,1.507951,0.034661,3.160579,1.795556,0.336437,0.797188,0.202403


In [79]:
output_file = "taxi_data_cleaned.csv"

# Save the DataFrame to a CSV file
taxi_data.to_csv(output_file, index=False)

## Processing Uber Data

This section processes Uber High-Volume For-Hire (HVHF) trip data, ensuring it is clean, consistent, and ready for analysis. The process involves downloading, sampling, cleaning, and saving the data.

### Steps Involved

#### 1. Download and Sample Data
##### **Function: `get_and_clean_uber_month`**
- Downloads data for a specific month if it doesn't already exist locally.
- Samples the dataset using Cochran's formula, which ensures a manageable file size while maintaining statistical relevance.
- Saves the processed sample locally for reuse.

##### **Function: `get_and_clean_uber_data`**
- Loops through multiple Parquet file URLs for HVHF data.
- Downloads and processes each month's data, sampling where necessary.
- Combines all monthly datasets into a single, larger DataFrame.

---

#### 2. Clean and Process the Data
##### **Function: `load_and_clean_uber_data`**
This function ensures the Uber dataset is ready for analysis by performing the following steps:
- **Filter Uber Rides**: Includes only trips with the license number `HV0003` (Uber-specific rides).
- **Add Coordinates**: Converts location IDs into latitude and longitude coordinates for both pickup and dropoff locations.
- **Keep Essential Columns**: Retains only the columns required for analysis, such as:
  - Pickup/Dropoff datetime
  - Coordinates
  - Trip distance
  - Fare details
- **Remove Invalid Data**: Excludes trips with issues like:
  - Zero or negative distance
  - Negative fare amounts
  - Incorrect timestamps
- **Normalize Data**:
  - Renames columns for consistency.
  - Ensures numeric fields (e.g., distance, fares) are in the correct format.
- **Filter NYC Trips**: Keeps only trips within NYC’s geographic boundaries using latitude and longitude filtering.

---

#### 3. Combine All Steps
##### **Function: `get_uber_data`**
- Fetches all URLs for Uber HVHF Parquet files from the TLC webpage.
- Filters the URLs to retain only relevant data files.
- Downloads, processes, cleans, and combines all monthly datasets.

The cleaned Uber data is now ready for analysis and integration with other datasets like weather or Yellow Taxi data.


In [36]:
def get_and_clean_uber_month(parquet_url: str) -> pd.DataFrame:
    """
    Download, process, and sample an Uber HVHF dataset for a given month.

    Args:
        parquet_url (str): URL of the Uber HVHF Parquet file.

    Returns:
        pd.DataFrame: A sampled and processed DataFrame.
    """
    # Define the directory for processed Uber HVHF data
    save_dir: str = "processed_data/hvhf"
    os.makedirs(save_dir, exist_ok=True)

    # Extract file name and define local path
    file_name: str = parquet_url.split("/")[-1]
    local_file_path: str = os.path.join(save_dir, file_name)

    # Download the file
    if not os.path.exists(local_file_path):
        try:
            response = requests.get(parquet_url, stream=True)
            response.raise_for_status()
            with open(local_file_path, "wb") as f:
                for chunk in response.iter_content(chunk_size=1024 * 1024):  # 1MB chunks
                    if chunk:
                        f.write(chunk)
        except requests.exceptions.RequestException as e:
            raise Exception(f"Failed to download the file from {parquet_url}. Error: {e}")
    data: pd.DataFrame = pd.read_parquet(local_file_path)

    # Sample the dataset
    population: int = len(data)
    sample_size: int = calculate_sample_size(population, p=0.4)
    sampled_data: pd.DataFrame = data.sample(n=sample_size, random_state=42) if population > sample_size else data
    processed_file_path: str = os.path.join(save_dir, f"sampled_{file_name}")
    if not os.path.exists(processed_file_path):
        sampled_data.to_parquet(processed_file_path)

    return sampled_data

In [37]:
def get_and_clean_uber_data(parquet_urls: List[str]) -> pd.DataFrame:
    """
    Download, process, and combine Uber HVHF data from multiple Parquet file URLs.

    Args:
        parquet_urls (List[str]): A list of URLs pointing to Uber HVHF Parquet files.

    Returns:
        pd.DataFrame: A combined DataFrame containing processed data from all valid Parquet files.
    """
    all_uber_dataframes: List[pd.DataFrame] = []

    # Regex pattern to filter Uber HVHF Parquet files for specific dates (2020-2024)
    hvfhv_pattern: re.Pattern = re.compile(
        r"fhvhv_tripdata_(2020-(0[1-9]|1[0-2])|202[1-3]-(0[1-9]|1[0-2])|2024-(0[1-8]))\.parquet"
    )
    hvfhv_urls: List[str] = [url for url in parquet_urls if hvfhv_pattern.search(url)]

    for url in hvfhv_urls:
        save_dir: str = "processed_data/hvhf"
        file_name: str = f"sampled_{url.split('/')[-1]}"
        processed_file_path: str = os.path.join(save_dir, file_name)

        if os.path.exists(processed_file_path):
            dataframe: pd.DataFrame = pd.read_parquet(processed_file_path)
        else:
            dataframe: pd.DataFrame = get_and_clean_uber_month(url)

        all_uber_dataframes.append(dataframe)
    uber_data: pd.DataFrame = pd.concat(all_uber_dataframes, ignore_index=True)
    return uber_data

### Note: Column (variable) selection:
Regarding the formula (base fare + all surcharges + taxes + tolls), we calculate the total amount as follows: it is equal to fare_amount + tolls + bcf + sales_tax + tips + congestion_surcharge + airport_fee, we will assume the subsurcharges segment includes bcf, tips, congestion_surcharge and airport_fee.

In [39]:
def load_and_clean_uber_data(uber_data: pd.DataFrame) -> pd.DataFrame:
    """
    Clean and process Uber HVHF data, including filtering by license, adding coordinates,
    retaining required columns, and normalizing data.

    Args:
        uber_data (pd.DataFrame): The raw Uber HVHF data.

    Returns:
        pd.DataFrame: A cleaned and filtered DataFrame with necessary columns and normalized values.
    """
    # Filter by HVFH license number
    uber_data['hvfhs_license_num'] = uber_data['hvfhs_license_num'].astype(str)
    uber_data = uber_data[uber_data['hvfhs_license_num'] == 'HV0003'].copy()

    # Add coordinates for pickup and dropoff locations
    uber_data["PU_coords"] = uber_data["PULocationID"].apply(lambda x: lookup_coords_for_taxi_zone_id(x, taxi_zones))
    uber_data["DO_coords"] = uber_data["DOLocationID"].apply(lambda x: lookup_coords_for_taxi_zone_id(x, taxi_zones))
    uber_data = uber_data.dropna(subset=["PU_coords", "DO_coords"]).reset_index(drop=True)
    uber_data[["PU_lat", "PU_lon"]] = pd.DataFrame(uber_data["PU_coords"].tolist(), index=uber_data.index)
    uber_data[["DO_lat", "DO_lon"]] = pd.DataFrame(uber_data["DO_coords"].tolist(), index=uber_data.index)
    uber_data = uber_data.drop(columns=["PU_coords", "DO_coords"])

    # Retain required columns
    required_columns = [
        'pickup_datetime', 'dropoff_datetime', 'PU_lat', 'PU_lon', 
        'DO_lat', 'DO_lon', 'trip_miles', 'base_passenger_fare', 
        'tolls', 'bcf', 'sales_tax', 'tips', 'airport_fee', 'congestion_surcharge'
    ]
    uber_data = uber_data[required_columns]

    # Remove invalid data points
    uber_data = uber_data[uber_data["trip_miles"] > 0]
    uber_data = uber_data[uber_data["base_passenger_fare"] > 0]
    uber_data = uber_data[uber_data["pickup_datetime"] < uber_data["dropoff_datetime"]]

    # Normalize column names
    uber_data.columns = [col.lower() for col in uber_data.columns]

    # Ensure numeric columns are float or int
    numeric_columns = [
        'pu_lat', 'pu_lon', 'do_lat', 'do_lon', 'trip_miles', 
        'base_passenger_fare', 'tolls', 'bcf', 'sales_tax', 
        'tips', 'airport_fee', 'congestion_surcharge'
    ]
    uber_data[numeric_columns] = uber_data[numeric_columns].apply(pd.to_numeric, errors="coerce")

    uber_data[numeric_columns] = uber_data[numeric_columns].fillna(0)
    uber_data = uber_data.dropna(subset=["pickup_datetime", "dropoff_datetime"])

    # Normalize column names using the column_mapping
    column_mapping = {
    'base_passenger_fare': 'fare_amount',
    'tips': 'tip_amount',
    'tolls': 'tolls_amount'}
   
    uber_data = uber_data.rename(columns=column_mapping)

    # Filter trips within the latitude/longitude bounding box
    lat_min, lon_min = 40.560445, -74.242330
    lat_max, lon_max = 40.908524, -73.717047
    uber_data = uber_data[
        (uber_data["pu_lat"] >= lat_min) & (uber_data["pu_lat"] <= lat_max) &
        (uber_data["pu_lon"] >= lon_min) & (uber_data["pu_lon"] <= lon_max) &
        (uber_data["do_lat"] >= lat_min) & (uber_data["do_lat"] <= lat_max) &
        (uber_data["do_lon"] >= lon_min) & (uber_data["do_lon"] <= lon_max)
    ]

    # Reset index after filtering
    uber_data = uber_data.reset_index(drop=True)

    return uber_data

In [40]:
def get_uber_data() -> pd.DataFrame:
    """
    Retrieve, filter, process, and clean Uber HVHF data from the TLC data page.

    This function performs the following steps:
    1. Fetch all URLs from the TLC webpage.
    2. Filter the URLs to retain only those pointing to Parquet files.
    3. Download, process, and combine data from these Parquet files.
    4. Perform additional cleaning and normalization on the combined data.

    Returns:
        pd.DataFrame: A cleaned and processed DataFrame containing Uber HVHF trip data.
    """
    all_urls: list[str] = get_all_urls_from_page(TLC_URL)
    all_parquet_urls: list[str] = filter_parquet_urls(all_urls)
    uber_data: pd.DataFrame = get_and_clean_uber_data(all_parquet_urls)
    uber_data = load_and_clean_uber_data(uber_data)
    return uber_data

In [41]:
uber_data = get_uber_data()

In [42]:
uber_data.head()

Unnamed: 0,pickup_datetime,dropoff_datetime,pu_lat,pu_lon,do_lat,do_lon,trip_miles,fare_amount,tolls_amount,bcf,sales_tax,tip_amount,airport_fee,congestion_surcharge
0,2024-01-26 08:07:17,2024-01-26 08:35:38,40.646116,-73.951623,40.666559,-73.895364,4.29,27.49,0.0,0.76,2.44,0.0,0.0,0.0
1,2024-01-19 02:17:05,2024-01-19 02:29:12,40.882403,-73.910665,40.857108,-73.932832,2.55,15.14,0.0,0.42,1.34,0.0,0.0,0.0
2,2024-01-21 01:44:00,2024-01-21 02:08:30,40.748575,-73.985156,40.71537,-73.936794,6.37,24.57,0.0,0.68,2.18,3.01,0.0,2.75
3,2024-01-20 12:58:40,2024-01-20 13:15:42,40.758028,-73.977698,40.753309,-74.004016,1.99,18.96,0.0,0.52,1.68,0.0,0.0,2.75
4,2024-01-02 08:40:48,2024-01-02 08:54:28,40.666559,-73.895364,40.676644,-73.913632,2.23,16.08,0.0,0.44,1.43,0.0,0.0,0.0


In [43]:
uber_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14383 entries, 0 to 14382
Data columns (total 14 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   pickup_datetime       14383 non-null  datetime64[ns]
 1   dropoff_datetime      14383 non-null  datetime64[ns]
 2   pu_lat                14383 non-null  float64       
 3   pu_lon                14383 non-null  float64       
 4   do_lat                14383 non-null  float64       
 5   do_lon                14383 non-null  float64       
 6   trip_miles            14383 non-null  float64       
 7   fare_amount           14383 non-null  float64       
 8   tolls_amount          14383 non-null  float64       
 9   bcf                   14383 non-null  float64       
 10  sales_tax             14383 non-null  float64       
 11  tip_amount            14383 non-null  float64       
 12  airport_fee           14383 non-null  float64       
 13  congestion_surch

In [44]:
uber_data.describe()

Unnamed: 0,pickup_datetime,dropoff_datetime,pu_lat,pu_lon,do_lat,do_lon,trip_miles,fare_amount,tolls_amount,bcf,sales_tax,tip_amount,airport_fee,congestion_surcharge
count,14383,14383,14383.0,14383.0,14383.0,14383.0,14383.0,14383.0,14383.0,14383.0,14383.0,14383.0,14383.0,14383.0
mean,2022-05-03 15:37:43.487311360,2022-05-03 15:55:31.077104896,40.737349,-73.935103,40.737331,-73.935269,4.372007,21.021585,0.678372,0.616084,1.867329,0.79651,0.136533,1.056525
min,2020-01-01 03:27:51,2020-01-01 03:30:21,40.561994,-74.170885,40.561994,-74.174002,0.01,0.61,0.0,0.0,0.0,0.0,0.0,0.0
25%,2021-02-27 15:08:56,2021-02-27 15:21:33,40.691201,-73.984197,40.691201,-73.984197,1.55,10.49,0.0,0.29,0.91,0.0,0.0,0.0
50%,2022-05-01 06:12:14,2022-05-01 06:17:35,40.736824,-73.948788,40.737698,-73.948136,2.83,16.68,0.0,0.47,1.45,0.0,0.0,0.0
75%,2023-07-02 21:51:33.500000,2023-07-02 22:17:46.500000,40.774376,-73.900317,40.774376,-73.899735,5.53,26.35,0.0,0.77,2.35,0.0,0.0,2.75
max,2024-08-31 21:37:05,2024-08-31 21:57:29,40.899528,-73.726655,40.899528,-73.726655,33.17,188.58,43.31,5.45,17.57,50.0,5.0,2.75
std,,,0.068382,0.064742,0.068795,0.068168,4.263149,15.197834,2.713401,0.493748,1.412463,2.41649,0.571035,1.333243


In [45]:
output_file = "uber_data_cleaned.csv"

# Save the DataFrame to a CSV file
uber_data.to_csv(output_file, index=False)

## Processing Weather Data

This section processes raw weather data from CSV files into clean, ready-to-use datasets for analysis. The weather data includes both hourly and daily records, which are cleaned and stored separately.

### Steps Involved

#### 1. Fetch Weather CSV Files
##### **Function: `get_all_weather_csvs`**
- **Purpose**: Collects all CSV files from a specific directory containing weather data.

---

#### 2. Clean Hourly Weather Data
##### **Function: `clean_month_weather_data_hourly`**
- **Purpose**: Processes and cleans hourly weather data from a single CSV file.
- **Steps**:
  - Reads the file and extracts relevant columns: 
    - `date` (timestamp)
    - `precipitation` (amount of rain/snow)
    - `wind_speed` (wind velocity)
  - Converts columns to numeric types for analysis.
  - Handles missing or invalid values by replacing them with `NaN` (Not a Number).
  - Normalizes column names for consistency.

---

#### 3. Clean Daily Weather Data
##### **Function: `clean_month_weather_data_daily`**
- **Purpose**: Processes and cleans daily weather data from a single CSV file.
- **Steps**:
  - Extracts and cleans columns for:
    - `date`
    - `precipitation` (total rainfall/snowfall)
    - `average_wind_speed` (daily average wind velocity)
    - `snowfall` (total snowfall)
    - `sunrise` (sunrise time)
    - `sunset` (sunset time)
  - Converts columns to numeric types and drops rows with missing values in all key fields.
  - Normalizes column names for consistency.
- **Output**: A clean DataFrame with daily weather data.

---

#### 4. Combine Weather Data
##### **Function: `load_and_clean_weather_data`**
- **Purpose**: Processes all weather CSV files and combines them into unified datasets.
- **Steps**:
  - Iterates over all weather files.
  - Cleans hourly and daily weather data for each file.
  - Combines all hourly data into a single DataFrame (`hourly_data`).
  - Combines all daily data into another DataFrame (`daily_data`).
  - Replaces any remaining missing values with `0` to ensure data completeness.
- **Output**: Two comprehensive DataFrames:
  - `hourly_data`: Hourly weather data for all files.
  - `daily_data`: Daily weather data for all files.

---

#### 5. Save Processed Data
- **Outputs**:
  - Hourly weather data is saved to `cleaned_hourly_weather_data.csv`.
  - Daily weather data is saved to `cleaned_daily_weather_data.csv`.
---

### Outcome
The cleaned weather data is stored in two CSV files, ready for integration with ride data or for standalone analysis:
- **Hourly Weather Data**: Provides detailed hourly precipitation and wind speed.
- **Daily Weather Data**: Offers daily summaries of precipitation, wind speed, and snowfall.


In [85]:
def get_all_weather_csvs(directory: str) -> List[str]:
    """
    Retrieve all CSV file paths from a specified directory.

    Args:
        directory (str): Path to the directory containing CSV files.

    Returns:
        List[str]: A list of file paths to all CSV files in the directory.
    """
    csv_files: List[str] = [
        os.path.join(directory, file) for file in os.listdir(directory) if file.endswith(".csv")
    ]
    return csv_files

In [87]:
def clean_month_weather_data_hourly(csv_file: str) -> pd.DataFrame:
    """
    Clean and process hourly weather data from a CSV file.

    Args:
        csv_file (str): Path to the CSV file containing weather data.

    Returns:
        pd.DataFrame: A DataFrame containing cleaned and processed hourly weather data.
                      Columns include 'date', 'precipitation', and 'wind_speed'.
                      Returns an empty DataFrame if the processing fails.
    """
    try:
        df: pd.DataFrame = pd.read_csv(csv_file, low_memory=False)
        df.rename(columns=lambda x: x.strip(), inplace=True)

        # Convert 'DATE' column to datetime
        df["DATE"] = pd.to_datetime(df["DATE"], errors="coerce")

        # Select and clean relevant columns
        hourly_data: pd.DataFrame = df[["DATE", "HourlyPrecipitation", "HourlyWindSpeed"]].copy()
        hourly_data.replace("T", 0, inplace=True)
        hourly_data.replace(regex=r"[^\d.]", value=np.nan, inplace=True)

        # Rename columns for consistency
        hourly_data.columns = ["date", "precipitation", "wind_speed"]

        # Convert columns to numeric
        hourly_data["precipitation"] = pd.to_numeric(hourly_data["precipitation"], errors="coerce")
        hourly_data["wind_speed"] = pd.to_numeric(hourly_data["wind_speed"], errors="coerce")

        return hourly_data

    except Exception as e:
        print(f"Error processing file {csv_file}: {e}")
        return pd.DataFrame()

In [89]:
def clean_month_weather_data_daily(csv_file: str) -> pd.DataFrame:
    """
    Clean and process daily weather data from a CSV file.

    Args:
        csv_file (str): Path to the CSV file containing weather data.

    Returns:
        pd.DataFrame: A DataFrame containing cleaned and processed daily weather data.
                      Columns include 'date', 'precipitation', 'average_wind_speed', 'snowfall',
                      'sunrise', and 'sunset'.
    """
    try:
        # Load data
        df: pd.DataFrame = pd.read_csv(csv_file, low_memory=False)
        df.rename(columns=lambda x: x.strip(), inplace=True)

        # Convert 'DATE' column to datetime
        df["DATE"] = pd.to_datetime(df["DATE"], errors="coerce")

        # Select and clean relevant columns
        daily_data: pd.DataFrame = df[
            ["DATE", "DailyPrecipitation", "DailyAverageWindSpeed", "DailySnowfall", "Sunrise", "Sunset"]
        ].copy()

        # Replace placeholder and non-numeric values
        daily_data.replace("T", 0, inplace=True)
        daily_data.replace(regex=r"[^\d.]", value=np.nan, inplace=True)

        # Rename columns for consistency
        daily_data.columns = ["date", "precipitation", "average_wind_speed", "snowfall", "sunrise", "sunset"]

        # Convert columns to numeric
        daily_data["precipitation"] = pd.to_numeric(daily_data["precipitation"], errors="coerce")
        daily_data["average_wind_speed"] = pd.to_numeric(daily_data["average_wind_speed"], errors="coerce")
        daily_data["snowfall"] = pd.to_numeric(daily_data["snowfall"], errors="coerce")

        # Drop rows where the columns values are NaN
        daily_data = daily_data.dropna(subset=["precipitation", "average_wind_speed", "snowfall", "sunrise", "sunset"], how="all")

        # Format 'sunrise' and 'sunset' columns
        daily_data['sunrise'] = daily_data['sunrise'].astype(float).astype(int).astype(str).str.zfill(4)
        daily_data['sunset'] = daily_data['sunset'].astype(float).astype(int).astype(str).str.zfill(4)

        daily_data['sunrise'] = pd.to_datetime(daily_data['sunrise'], format='%H%M', errors='coerce').dt.time
        daily_data['sunset'] = pd.to_datetime(daily_data['sunset'], format='%H%M', errors='coerce').dt.time

        # Convert 'date' column to date
        daily_data["date"] = daily_data["date"].dt.date

        return daily_data

    except Exception as e:
        print(f"Error processing file {csv_file}: {e}")
        return pd.DataFrame()


In [91]:
def load_and_clean_weather_data(directory: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Load and clean weather data from all CSV files in the specified directory.

    This function processes each file to extract and clean both hourly and daily weather data.

    Args:
        directory (str): Path to the directory containing CSV files.

    Returns:
        tuple: Two DataFrames:
            - hourly_data (pd.DataFrame): Combined cleaned hourly weather data.
            - daily_data (pd.DataFrame): Combined cleaned daily weather data.
    """
    weather_csv_files = get_all_weather_csvs(directory)
    hourly_dataframes = []
    daily_dataframes = []
    for csv_file in weather_csv_files:
        hourly_dataframe = clean_month_weather_data_hourly(csv_file)
        daily_dataframe = clean_month_weather_data_daily(csv_file)

        # Append cleaned data to the respective lists
        if not hourly_dataframe.empty:
            hourly_dataframes.append(hourly_dataframe)
        if not daily_dataframe.empty:
            daily_dataframes.append(daily_dataframe)

    hourly_data = pd.concat(hourly_dataframes, ignore_index=True) if hourly_dataframes else pd.DataFrame()
    daily_data = pd.concat(daily_dataframes, ignore_index=True) if daily_dataframes else pd.DataFrame()
    
    # Replace NaN values with 0
    hourly_data.fillna(0, inplace=True)
    daily_data.fillna(0, inplace=True)
    
    return hourly_data, daily_data

In [93]:
if __name__ == "__main__":
    
    WEATHER_CSV_DIR = "weather/"  # Path to your weather data directory
    hourly_weather_data, daily_weather_data = load_and_clean_weather_data(WEATHER_CSV_DIR)

    hourly_weather_data.to_csv("cleaned_hourly_weather_data.csv", index=False)
    daily_weather_data.to_csv("cleaned_daily_weather_data.csv", index=False)

In [95]:
hourly_weather_data.head()

Unnamed: 0,date,precipitation,wind_speed
0,2020-01-01 00:51:00,0.0,8.0
1,2020-01-01 01:51:00,0.0,8.0
2,2020-01-01 02:51:00,0.0,14.0
3,2020-01-01 03:51:00,0.0,11.0
4,2020-01-01 04:51:00,0.0,6.0


In [97]:
hourly_weather_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 56098 entries, 0 to 56097
Data columns (total 3 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   date           56098 non-null  datetime64[ns]
 1   precipitation  56098 non-null  float64       
 2   wind_speed     56098 non-null  float64       
dtypes: datetime64[ns](1), float64(2)
memory usage: 1.3 MB


In [99]:
hourly_weather_data.describe()

Unnamed: 0,date,precipitation,wind_speed
count,56098,56098.0,56098.0
mean,2022-05-29 21:14:19.618881024,0.010288,4.537238
min,2020-01-01 00:51:00,0.0,0.0
25%,2021-03-18 19:01:45,0.0,0.0
50%,2022-05-28 01:21:00,0.0,5.0
75%,2023-08-15 05:39:00,0.0,7.0
max,2024-10-22 18:51:00,3.47,2237.0
std,,0.056033,13.883208


In [101]:
daily_weather_data.head()

Unnamed: 0,date,precipitation,average_wind_speed,snowfall,sunrise,sunset
0,2020-01-01,0.0,8.6,0.0,07:20:00,16:39:00
1,2020-01-02,0.0,5.4,0.0,07:20:00,16:40:00
2,2020-01-03,0.15,3.4,0.0,07:20:00,16:41:00
3,2020-01-04,0.27,4.4,0.0,07:20:00,16:42:00
4,2020-01-05,0.0,11.3,0.0,07:20:00,16:43:00


In [103]:
daily_weather_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1755 entries, 0 to 1754
Data columns (total 6 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   date                1755 non-null   object 
 1   precipitation       1755 non-null   float64
 2   average_wind_speed  1755 non-null   float64
 3   snowfall            1755 non-null   float64
 4   sunrise             1755 non-null   object 
 5   sunset              1755 non-null   object 
dtypes: float64(3), object(3)
memory usage: 82.4+ KB


In [105]:
daily_weather_data.describe()

Unnamed: 0,precipitation,average_wind_speed,snowfall
count,1755.0,1755.0,1755.0
mean,0.141966,4.835499,0.039088
std,0.414574,2.467952,0.493457
min,0.0,0.0,0.0
25%,0.0,3.0,0.0
50%,0.0,4.5,0.0
75%,0.06,6.2,0.0
max,7.13,14.2,14.8


# Part 2: Storing Cleaned Data

In this part, we store the cleaned datasets into a SQLite database by creating appropriate schemas for each dataset and loading the data into the database. This ensures our data is structured and easily queryable for subsequent analyses.

#### 1. Load Cleaned Datasets
- **Datasets loaded**:
  - `daily_data`: Daily weather data.
  - `hourly_data`: Hourly weather data.
  - `taxi_data`: Cleaned Yellow Taxi trip data.
  - `uber_data`: Cleaned Uber HVHF trip data.
- These datasets are read from their respective cleaned CSV files.

#### 2. Define Database Schemas
- **Schema definitions**:
  - `hourly_weather`: Contains hourly weather data including datetime, temperature, precipitation, and wind speed.
  - `daily_weather`: Contains daily weather data including date, average precipitation, average wind speed, total snowfall, sunset and sunrise time.
  - `taxi_trips`: Contains Yellow Taxi trip data including pickup/dropoff datetimes, coordinates, trip miles, and total amount.
  - `uber_trips`: Contains Uber trip data with similar columns as `taxi_trips`.

#### 3. Save Schema to `schema.sql`
- All schema definitions are written to a file named `schema.sql`.
- This file serves as a record of the database structure and can be reused to recreate the tables in another database.

#### 4. Create Tables
- The `schema.sql` file is executed to create the database tables.
- **Execution steps**:
  1. Open a connection to the SQLite database.
  2. Read the `schema.sql` file.
  3. Execute each schema creation statement.

By the end of this part, the SQLite database is populated with structured tables ready for queries and further analysis.


In [107]:
engine = db.create_engine(DATABASE_URL)

In [109]:
# Load cleaned datasets
daily_data = pd.read_csv("cleaned_daily_weather_data.csv")
hourly_data = pd.read_csv("cleaned_hourly_weather_data.csv")
taxi_data = pd.read_csv("taxi_data_cleaned.csv")
uber_data = pd.read_csv("uber_data_cleaned.csv")

In [111]:
# if using SQL (as opposed to SQLAlchemy), define the commands 
# to create your 4 tables/dataframes
HOURLY_WEATHER_SCHEMA = """
CREATE TABLE IF NOT EXISTS hourly_weather (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    date DATETIME NOT NULL,
    precipitation FLOAT,
    wind_speed FLOAT
);
"""

DAILY_WEATHER_SCHEMA = """
CREATE TABLE IF NOT EXISTS daily_weather (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    date DATE NOT NULL,
    precipitation FLOAT,
    average_wind_speed FLOAT,
    snowfall FLOAT,
    sunrise TIME,
    sunset TIME
);
"""

TAXI_TRIPS_SCHEMA = """
CREATE TABLE IF NOT EXISTS taxi_trips (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pickup_datetime DATETIME NOT NULL,
    dropoff_datetime DATETIME NOT NULL,
    pu_lat FLOAT,
    pu_lon FLOAT,
    do_lat FLOAT,
    do_lon FLOAT,
    trip_miles FLOAT,
    total_amount FLOAT,
    fare_amount FLOAT,
    extra FLOAT,
    mta_tax FLOAT,
    tip_amount FLOAT,
    tolls_amount FLOAT,
    improvement_surcharge FLOAT,
    congestion_surcharge FLOAT,
    airport_fee FLOAT
);
"""

UBER_TRIPS_SCHEMA = """
CREATE TABLE IF NOT EXISTS uber_trips (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pickup_datetime DATETIME NOT NULL,
    dropoff_datetime DATETIME NOT NULL,
    pu_lat FLOAT,
    pu_lon FLOAT,
    do_lat FLOAT,
    do_lon FLOAT,
    trip_miles FLOAT,
    fare_amount FLOAT,
    tolls_amount FLOAT,
    bcf FLOAT,
    sales_tax FLOAT,
    tip_amount FLOAT,
    airport_fee FLOAT,
    congestion_surcharge FLOAT
);
"""

In [113]:
# create that required schema.sql file
DATABASE_SCHEMA_FILE = "schema.sql"

with open(DATABASE_SCHEMA_FILE, "w") as f:
    f.write(HOURLY_WEATHER_SCHEMA)
    f.write(DAILY_WEATHER_SCHEMA)
    f.write(TAXI_TRIPS_SCHEMA)
    f.write(UBER_TRIPS_SCHEMA)

In [115]:
# create the tables with the schema files
with engine.connect() as connection:
    with open(DATABASE_SCHEMA_FILE, "r") as schema_file:
        schema_script = schema_file.read()
        statements = schema_script.split(";")
        for statement in statements:
            statement = statement.strip()
            if statement:
                connection.execute(db.text(statement))

### Add Data to Database

In [118]:
def write_dataframes_to_table(table_to_df_dict):
    """
    Writes dataframes to the corresponding SQL tables without replacing the tables.

    Args:
        table_to_df_dict (dict): A dictionary where keys are table names
                                 and values are the respective DataFrames.
    """
    for table_name, df in table_to_df_dict.items():
        # Use 'append' mode to insert data without replacing the table
        df.to_sql(table_name, con=engine, if_exists='append', index=False)

In [120]:
map_table_name_to_dataframe = {
    "taxi_trips": taxi_data,
    "uber_trips": uber_data,
    "hourly_weather": hourly_data,
    "daily_weather": daily_data,
}

In [122]:
write_dataframes_to_table(map_table_name_to_dataframe)

## Part 3: Understanding the Data

In [None]:
# Helper function to write the queries to file
def write_query_to_file(query, outfile):
    raise NotImplementedError()

### Query 1

In [None]:
QUERY_1_FILENAME = ""

QUERY_1 = """
TODO
"""

In [None]:
# execute query either via sqlalchemy
with engine.connect() as con:
    results = con.execute(db.text(QUERY_1)).fetchall()
results

# or via pandas
pd.read_sql(QUERY_1, con=engine)

In [None]:
write_query_to_file(QUERY_1, QUERY_1_FILENAME)

## Part 4: Visualizing the Data

### Visualization 1

In [None]:
# use a more descriptive name for your function
def plot_visual_1(dataframe):
    figure, axes = plt.subplots(figsize=(20, 10))
    
    values = "..."  # use the dataframe to pull out values needed to plot
    
    # you may want to use matplotlib to plot your visualizations;
    # there are also many other plot types (other 
    # than axes.plot) you can use
    axes.plot(values, "...")
    # there are other methods to use to label your axes, to style 
    # and set up axes labels, etc
    axes.set_title("Some Descriptive Title")
    
    plt.show()

In [None]:
def get_data_for_visual_1():
    # Query SQL database for the data needed.
    # You can put the data queried into a pandas dataframe, if you wish
    raise NotImplementedError()

In [None]:
some_dataframe = get_data_for_visual_1()
plot_visual_1(some_dataframe)