# Understanding Hired Rides in NYC
## IEOR E4501 Final Project

## Project Setup

In [3]:
# all import statements needed for the project
import math
import os

import bs4
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.axes import Axes
from matplotlib.container import BarContainer

import numpy as np
import pandas as pd
import geopandas as gpd
from geopy.distance import distance
import requests
import re
import sqlalchemy as db

import folium
from folium.plugins import HeatMap

from typing import List, Union, Any, Dict,  Tuple
from matplotlib.container import BarContainer

In [4]:
TAXI_URL = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"
TAXI_DIR = "data/taxi"

TAXI_ZONES_DIR = "data/taxi_zones"
TAXI_ZONES_SHAPEFILE = f"{TAXI_ZONES_DIR}/taxi_zones.shp"
UBER_CSV = "data/uber_rides_sample.csv"
WEATHER_CSV_DIR = "data/weather"

CRS = 4326 # coordinate reference system

# (lat, lon)
NEW_YORK_BOX_COORDS = ((40.560445, -74.242330), (40.908524, -73.717047))
LGA_BOX_COORDS = ((40.763589, -73.891745), (40.778865, -73.854838))
JFK_BOX_COORDS = ((40.639263, -73.795642), (40.651376, -73.766264))
EWR_BOX_COORDS = ((40.686794, -74.194028), (40.699680, -74.165205))

DATABASE_URL = "sqlite:///project.db"
DATABASE_SCHEMA_FILE = "schema.sql"
QUERY_DIRECTORY = "queries"

In [None]:
# Make sure the QUERY_DIRECTORY exists
try:
    os.mkdir(QUERY_DIRECTORY)
    print(f"Folder {QUERY_DIRECTORY} created successfully.")
except FileExistsError:
    print(f"Folder {QUERY_DIRECTORY} already exists.")

In [None]:
# Make sure the TAXI_DIR exists
try:
    os.mkdir(TAXI_DIR)
    print(f"Folder {TAXI_DIR} created successfully.")
except FileExistsError:
    print(f"Folder {TAXI_DIR} already exists.")

## Part 1: Data Preprocessing

### Load Taxi Zones

In [None]:
def load_taxi_zones(shapefile: str) -> pd.DataFrame:
    """Load the shapefile and return it as a DataFrame.
    
    Parameters
    ----------
    shapefile : str
        The relative path of the shape file including zone IDs and geometries.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe including the geometries and corresponding zone ID.
    
    """
    loaded_taxi_zones = gpd.read_file(shapefile)
    loaded_taxi_zones = loaded_taxi_zones[['OBJECTID', 'geometry']].set_index('OBJECTID')
    # Transform geometries to the new coordinate reference system 4326
    loaded_taxi_zones = loaded_taxi_zones.to_crs(CRS)
    
    return loaded_taxi_zones

In [None]:
def lookup_coords_for_taxi_zone_id(zone_loc_id: int,
                                   loaded_taxi_zones: pd.DataFrame) -> tuple:
    """Given the zone ID and return the corresponding centroid coordinates.
    
    Parameters
    ----------
    zone_loc_id : int
        The zone ID which needs to be searched.
    loaded_taxi_zones : pandas.DataFrame
        A dataframe including the geometries and corresponding zone ID.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe including the geometries and corresponding zone ID.

    """
    geometry = loaded_taxi_zones.loc[zone_loc_id, 'geometry']
    # Obtain the approximate coordinates by the centroid location
    longitude = geometry.centroid.x
    latitude = geometry.centroid.y
    coords = (longitude, latitude)
    
    return coords

### Calculate distance

In [None]:
def calculate_distance_with_coords(from_coord: tuple, to_coord: tuple) -> float:
    """Given the coordinates and return the distance between them.
    
    This function utilizes the Haversine formula to calculate the distance
    between two coordinates on Earth's surface.
    
    Parameters
    ----------
    from_coord : tuple of float
        A tuple containing the longitude and latitude of the starting point, expressed in degrees.
    to_coord : tuple of float
        A tuple containing the longitude and latitude of the destination point, expressed in degrees.
    
    Returns
    -------
    float
        The distance between the two coordinates, in kilometers.
        
    """
    
    # Convert the input coordinates from degrees to radians
    from_lon, from_lat = math.radians(from_coord[0]), math.radians(from_coord[1])
    to_lon, to_lat = math.radians(to_coord[0]), math.radians(to_coord[1])
    # Calculate the differences in latitude and longitude
    delta_lon = to_lon - from_lon
    delta_lat = to_lat - from_lat
    # Apply the Haversine formula to calculate the distance
    a = math.sin(delta_lat / 2)**2 + math.cos(from_lat) * math.cos(to_lat) * math.sin(delta_lon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    distance = 6371 * c # earth's radius is assumed to be 6371 kilometers
    
    return distance

In [None]:
def add_distance_column(dataframe: pd.DataFrame) -> pd.DataFrame:
    """Add the 'distance' column to the dataframe.
    
    Parameters
    ----------
    dataframe : pandas.DataFrame
        The dataframe which needs to be processed.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe including the new calculated column 'distance'.
    
    """
    dataframe['distance'] = dataframe.apply(lambda x:calculate_distance_with_coords(
                                (x['pickup_longitude'],x['pickup_latitude']),
                                (x['dropoff_longitude'],x['dropoff_latitude'])), axis=1)
    
    return dataframe

### Remove outside trips

In [None]:
def remove_outside_trip(df: pd.DataFrame) -> pd.DataFrame:
    """Remove the trip records outside the defined region.
    
    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe which needs to be processed.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe after removing all trip records outside the defined region.
    
    """
    # Obtain the coordinate limits
    southlimit, westlimit = NEW_YORK_BOX_COORDS[0]
    northlimit, eastlimit = NEW_YORK_BOX_COORDS[1]
    # Remove the trips outside the location 
    df = df[(df['pickup_longitude'] >= westlimit) & (df['pickup_longitude'] <= eastlimit)]
    df = df[(df['pickup_latitude'] >= southlimit) & (df['pickup_latitude'] <= northlimit)]

    df = df[(df['dropoff_longitude'] >= westlimit) & (df['dropoff_longitude'] <= eastlimit)]
    df = df[(df['dropoff_latitude'] >= southlimit) & (df['dropoff_latitude'] <= northlimit)]
    
    return df

### Processing Taxi Data

In [None]:
def get_all_urls_from_taxi_page(taxi_page: str) -> list:
    """Scrap the URLs from the given page and return them as a list.
    
    Parameters
    ----------
    taxi_page : str
        The URL of the target page.
    
    Returns
    -------
    list
        A list of URLs scraped from the given page.
        
    """
    all_urls = list()
    
    content = requests.get(TAXI_URL)
    soup = bs4.BeautifulSoup(content.text, 'lxml')
    # Find all the URLs in the page
    for link in soup.find_all("a"):
        all_urls.append(link.get('href'))
        
    return all_urls

In [None]:
def filter_taxi_parquet_urls(all_urls: list) -> list:
    """Find the URLs for yellow taxi data and return them as a list.
    
    Parameters
    ----------
    all_urls : str
        A list of URLs.
    
    Returns
    -------
    list
        A list of URLs for yellow taxi parquet files from 2019-01 to 2015-06.
        
    """
    all_parquet_urls = list()
    
    pattern = r".*yellow_tripdata.*parquet\Z"
    time_pattern = r"(2009-(0[1-9]|1[0-2]))|(201[0-4]-(0[1-9]|1[0-2]))|(2015-(0[1-6]))"
    for url in all_urls:
        # Check if the URL belongs to yellow taxi trip data
        if re.search(pattern, url):
            # Check if the URL belongs to the time range for the project
            if re.search(time_pattern, url):
                all_parquet_urls.append(url)
            
    return all_parquet_urls

In [None]:
def generate_coords_from_zones(dataframe):
    """Generate the coordinates from zone IDs in a DataFrame.
    
    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe which needs to be processed.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe after transforming zone IDs to longitude and latitude coordinates.
    
    """
    loaded_taxi_zones = load_taxi_zones(TAXI_ZONES_SHAPEFILE)
    southlimit, westlimit = NEW_YORK_BOX_COORDS[0]
    northlimit, eastlimit = NEW_YORK_BOX_COORDS[1]
    
    for index, row in dataframe.iterrows():
        pickup_zoneid = row['pickup_zoneid']
        dropoff_zoneid = row['dropoff_zoneid']

        pickup_coords = lookup_coords_for_taxi_zone_id(pickup_zoneid, loaded_taxi_zones)
        
        # define the initial bearing
        direction = 0
        # check if pickup and dropoff zones are the same
        if pickup_zoneid == dropoff_zoneid:
            # generate dropoff coordinates using distance and bearing
            dropoff_coords = distance(
                miles=row['trip_distance']).destination(pickup_coords[::-1], bearing=direction)[1::-1]
        else:
            # generate dropoff coordinates using dropoff zone ID
            dropoff_coords = lookup_coords_for_taxi_zone_id(dropoff_zoneid, loaded_taxi_zones)
        
        # check if dropoff coordinates fall outside the defined box
        while not ((westlimit <= dropoff_coords[0] <= eastlimit) and 
                   (southlimit <= dropoff_coords[1] <= northlimit)):
            # Generate new dropoff coordinates by changing the bearing
            direction += 90
            # If all four bearings do not work, drop this record instead
            if direction == 360:
                break
            dropoff_coords = distance(
                miles=row['trip_distance']).destination(pickup_coords, bearing=direction)[1::-1]
        
        # update the dataframe with the generated coordinates
        if direction != 360:
            dataframe.loc[index, 'pickup_longitude'] = pickup_coords[0]
            dataframe.loc[index, 'pickup_latitude'] = pickup_coords[1]
            dataframe.loc[index, 'dropoff_longitude'] = dropoff_coords[0]
            dataframe.loc[index, 'dropoff_latitude'] = dropoff_coords[1]
        else:
            dataframe.drop(index=index, inplace=True)
    
    # Drop the unnecessary columns
    dataframe.drop(['trip_distance', 'pickup_zoneid', 'dropoff_zoneid'], axis=1, inplace=True)
    
    return dataframe

In [None]:
def clean_taxi_df_2009_to_2010(dataframe: pd.DataFrame) -> pd.DataFrame:
    """Clean the yellow taxi data from 2009 to 2010.
    
    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe with records from 2009 to 2010.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe after normalizing column names, removing invalid data, and sampling.
        
    """
    # Normalize the column names
    dataframe.columns = ['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
                         'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
                         'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude', 
                         'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount', 
                         'tolls_amount', 'total_amount']
    
    # Remove the trips outside the required coordinate box
    dataframe = remove_outside_trip(dataframe)
    # Remove the trips with zero passenger count
    dataframe = dataframe[dataframe['passenger_count'] != 0]
    # Remove the trips without a fare
    dataframe = dataframe[dataframe['fare_amount'] != 0]
    # Remove the trips with no distance between pickup and dropoff locations
    dataframe = dataframe[dataframe['trip_distance'] != 0]
    dataframe = dataframe[(dataframe['pickup_longitude'] != dataframe['dropoff_longitude']) 
                          & (dataframe['pickup_latitude'] != dataframe['dropoff_latitude'])]
    
    # Sample the taxi data at a appropriate number
    dataframe = dataframe.sample(n = 2500, random_state=1)
    
    # Choose useful columns for the coming analysis
    columns_to_keep = ['pickup_datetime', 'pickup_longitude', 'pickup_latitude',
                       'dropoff_longitude', 'dropoff_latitude', 'tip_amount']
    dataframe = dataframe[columns_to_keep]
    
    # Transform the pickup datetime column from strings to datetime
    dataframe['pickup_datetime'] = pd.to_datetime(dataframe['pickup_datetime'],
                                                  format='%Y-%m-%d %H:%M:%S') 
    
    return dataframe

In [None]:
def clean_taxi_df_2011_to_2015(dataframe: pd.DataFrame) -> pd.DataFrame:
    """Clean the yellow taxi data from 2011 to 2015.
    
    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe with records from 2011 to 2015.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe after normalizing column names, removing invalid data, and sampling.
        
    """
    # Normalize the column names
    dataframe.columns = ['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
                         'trip_distance', 'rate_code', 'store_and_fwd_flag', 'pickup_zoneid',
                         'dropoff_zoneid', 'payment_type', 'fare_amount', 'surcharge', 'mta_tax',
                         'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount',
                         'congestion_surcharge', 'airport_fee']
    
    # Remove the trips outside the 1-263 zones
    dataframe = dataframe[dataframe['pickup_zoneid'] != 264]
    dataframe = dataframe[dataframe['pickup_zoneid'] != 265]
    dataframe = dataframe[dataframe['dropoff_zoneid'] != 264]
    dataframe = dataframe[dataframe['dropoff_zoneid'] != 265]
    # Remove the trips with zero passenger count
    dataframe = dataframe[dataframe['passenger_count'] != 0]
    # Remove the trips without a fare
    dataframe = dataframe[dataframe['fare_amount'] != 0]
    # Remove the trips with no distance between pickup and dropoff locations
    dataframe = dataframe[dataframe['trip_distance'] != 0]
    
    # Sample the taxi data at a appropriate number
    dataframe = dataframe.sample(n = 2500, random_state=1)
    
    # Choose useful columns for the coming analysis
    columns_to_keep = ['pickup_datetime', 'trip_distance',
                       'pickup_zoneid', 'dropoff_zoneid', 'tip_amount']
    dataframe = dataframe[columns_to_keep]
    
    # Generate the coordinates from zone IDs
    dataframe = generate_coords_from_zones(dataframe)
    
    return dataframe

In [None]:
def get_and_clean_month(url: str) -> pd.DataFrame:
    """Load and clean the parquet file for the URL, return it as a DataFrame.
    
    Parameters
    ----------
    url : str
        The URL for the parquet file.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe after loading and cleaning the parquet file from the given URL.
        
    """
    # Programmatically download needed data if not exists
    dataframe = pd.DataFrame()
    
    time_pattern = r"(2009-(0[1-9]|1[0-2]))|(201[0-4]-(0[1-9]|1[0-2]))|(2015-(0[1-6]))"
    time = ""
    
    if re.search(time_pattern, url):
        time = re.search(time_pattern, url).group(0)
        file_path = f"{TAXI_DIR}/yellow_taxi_{time}.parquet"
        
        # Check if the parquet file has already been downloaded
        if os.path.exists(file_path):
            print(f"File {file_path} already exists.")
            dataframe = pd.read_parquet(file_path, engine='pyarrow')
        else:
            # If not, download the file from the given URL
            print(f"File {file_path} does not exist. Downloading...")
            response = requests.get(url, stream=True)
            with open(file_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=1024):
                    if chunk:
                        f.write(chunk)
                print(f"File {file_path} downloaded successfully.")
            dataframe = pd.read_parquet(file_path, engine='pyarrow')
    
    # Clean the dataframe
    if re.search(r"2009|2010", time):
        dataframe_cleaned = clean_taxi_df_2009_to_2010(dataframe)
    else:
        dataframe_cleaned = clean_taxi_df_2011_to_2015(dataframe)
    
    return dataframe_cleaned

In [None]:
def get_and_clean_taxi_data(parquet_urls: list) -> pd.DataFrame:
    """Preprocess and concatenate all the data, return them as a DataFrame.
    
    Parameters
    ----------
    parquet_urls : list
        A list of URLs for parquet files of yellow taxi data.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe after preprocessing and concatenating all the parquet file data.
    
    """
    all_taxi_dataframes = []
    
    # Iterate the URLs and obtain the dataframe for each month
    for parquet_url in parquet_urls:
        dataframe = get_and_clean_month(parquet_url)
        # Add the 'distance' column
        dataframe = add_distance_column(dataframe)
        all_taxi_dataframes.append(dataframe)

    # Create one gigantic dataframe with data from every month needed
    taxi_data = pd.concat(all_taxi_dataframes)
    
    return taxi_data

In [None]:
def get_taxi_data() -> pd.DataFrame:
    """Scrap the yellow taxi data and return the result as a DataFrame.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe including all cleaned and sampled records for yellow taxi data.
    
    """
    all_urls = get_all_urls_from_taxi_page(TAXI_URL)
    all_parquet_urls = filter_taxi_parquet_urls(all_urls)
    taxi_data = get_and_clean_taxi_data(all_parquet_urls)
    
    return taxi_data

In [None]:
taxi_data = get_taxi_data()

In [None]:
taxi_data.head()

In [None]:
# Download the sampled yellow taxi data as a CSV file
taxi_data.to_csv('data/taxi/yellow_taxi_sampled.csv', index=False)
# Load the sampled yellow taxi data directly
taxi_data = pd.read_csv('data/taxi/yellow_taxi_sampled.csv')

### Processing Uber Data

In [None]:
def load_and_clean_uber_data(csv_file: str) -> pd.DataFrame:
    """Load and clean the Uber data and return it as a DataFrame.
    
    Parameters
    ----------
    csv_file : str
        The relative path of the CSV file of Uber data.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe after loading and cleaning all the Uber data.
    
    """
    columns_to_keep = ['pickup_datetime', 'pickup_longitude', 'pickup_latitude',
                       'dropoff_longitude', 'dropoff_latitude']
    dataframe = pd.read_csv(csv_file, usecols = columns_to_keep)
    # Transform the pickup datetime column from strings to datetime
    dataframe['pickup_datetime'] = pd.to_datetime(dataframe['pickup_datetime'],
                                                  format='%Y-%m-%d %H:%M:%S %Z')
    dataframe['pickup_datetime'] = dataframe['pickup_datetime'].dt.tz_convert(None)
    
    # Remove the trips outside the defined coordinate box
    dataframe = remove_outside_trip(dataframe)
    # Remove the trips with no distance between pickup and dropoff locations
    dataframe = dataframe[(dataframe['pickup_longitude'] != dataframe['dropoff_longitude']) 
                          & (dataframe['pickup_latitude'] != dataframe['dropoff_latitude'])]
    
    return dataframe

In [None]:
def get_uber_data() -> pd.DataFrame:
    """Return the processed Uber data as a DataFrame.
    
    Returns
    -------
    pandas.DataFrame
        A dataframe after preprocessing all the Uber data and adding column 'distance'.
    
    """
    uber_dataframe = load_and_clean_uber_data(UBER_CSV)
    # Add the 'distance' column
    add_distance_column(uber_dataframe)
    uber_dataframe.dropna(axis=0, inplace=True)
    
    return uber_dataframe

In [None]:
uber_data = get_uber_data()

In [None]:
uber_data.head()

### Processing Weather Data

In [5]:
def drop_invalid_rows_for_HourlyWindSpeed(dataframe: pd.DataFrame) -> pd.DataFrame:
    """
    Drop rows from the input dataframe if the DATE column value is not 23:59
    and the HourlyWindSpeed column value is not a float or is NaN.

    Parameters
    ----------
    dataframe : pd.DataFrame
        Input dataframe with 'DATE' and 'HourlyWindSpeed' columns.

    Returns
    -------
    pd.DataFrame
        The dataframe after dropping invalid rows.
    """
    # Filter rows with DATE values not equal to 23:59
    not_2359_rows = (dataframe['DATE'].dt.hour != 23) | (dataframe['DATE'].dt.minute != 59)
    
    # Check if the HourlyWindSpeed is of type float and not NaN
    is_float_and_not_nan = dataframe['HourlyWindSpeed'].apply(lambda x: isinstance(x, float) and not pd.isna(x))
    
    # Combine both conditions using bitwise AND
    invalid_rows = not_2359_rows & ~is_float_and_not_nan

    # Drop the invalid rows
    dataframe = dataframe[~invalid_rows]

    return dataframe

In [6]:
def generate_missing_dates(df: pd.DataFrame) -> pd.DataFrame:
    """
    Generate a DataFrame containing missing daily records.

    Parameters
    ----------
    df : pd.DataFrame
        The input DataFrame containing weather data from January 2009 to June 2015.

    Returns
    -------
    pd.DataFrame
        A DataFrame with missing daily records.
    """
    # Find the minimum and maximum dates in the input DataFrame
    min_date = df['DATE'].min().normalize()
    max_date = df['DATE'].max().normalize()
    # Create a date range from the minimum to maximum date
    date_range = pd.date_range(start=min_date, end=max_date, freq='D')
    # Generate missing daily records by checking if a record exists for each date
    missing_dates = [
        {'DATE': date.replace(hour=23, minute=59, second=0)}
        for date in date_range
        if not ((df['DATE'] == date.replace(hour=23, minute=59, second=0)).any())
    ]
    return pd.DataFrame(missing_dates)

In [7]:
def fill_missing_daily_wind_speed(new_df: pd.DataFrame) -> None:
    """
    Fill missing daily wind speed values by calculating the mean hourly wind speed.

    Parameters
    ----------
    new_df : pd.DataFrame
        The input DataFrame containing weather data with missing daily wind speed values.
    """
    # Iterate through the rows of the DataFrame with missing daily wind speed values
    for index, row in new_df.loc[new_df['DATE'].dt.strftime('%H:%M') == '23:59'].iterrows():
        if pd.isna(row['DailyAverageWindSpeed']):
            date = row['DATE'].date()
            # Calculate the mean hourly wind speed for the current date
            hourly_wind_speed_mean = new_df.loc[
                (new_df['DATE'].dt.date == date) & (new_df['DATE'] != row['DATE']) & (~new_df['HourlyWindSpeed'].isna()),
                'HourlyWindSpeed'
            ].mean()
            # Replace the missing daily wind speed value with the calculated mean
            new_df.loc[index, 'DailyAverageWindSpeed'] = hourly_wind_speed_mean

In [8]:
def get_all_weather_csvs(directory: str) -> List[pd.DataFrame]:
    """
    Load and clean the CSV files, return them as a list of DataFrames.

    Parameters
    ----------
    directory : str
        The directory path containing the CSV files.

    Returns
    -------
    List[pd.DataFrame]
        A list of cleaned DataFrames.
    """
    # List all CSV files in the directory
    csv_files = [entry.name for entry in os.scandir(directory)
        if entry.name.endswith('.csv') and entry.is_file()]
    dfs = []
    # Choose useful columns for the coming analysis
    columns_to_keep = [
        'DATE', 'HourlyPrecipitation', 'HourlyWindSpeed',
        'DailyAverageWindSpeed', 'Sunrise', 'Sunset'
    ]
    
    # Process each CSV file
    for csv_file in csv_files:
        # Join the directory path and CSV file name to form the full file path
        file_path = os.path.join(directory, csv_file)
        # Read the CSV file into a DataFrame
        df = pd.read_csv(file_path, usecols=columns_to_keep, parse_dates=['DATE'], engine='python')
        
        # Filter the DataFrame by date range
        df = df[(df['DATE'] >= '2009-01-01 00:00:00') & (df['DATE'] <= '2015-06-30 23:59:59')]
        
        # Drop rows that do not meet the requirement of 'HourlyWindSpeed'.
        df = drop_invalid_rows_for_HourlyWindSpeed(df)
        
        # Generate a DataFrame with missing dates
        missing_df = generate_missing_dates(df)
        
        # Combine the original DataFrame and the missing dates DataFrame
        new_df = pd.concat([df, missing_df], ignore_index=True)
        # Sort the new DataFrame by date
        new_df.sort_values(by='DATE', inplace=True)
        
        # Fill in missing daily wind speed values
        fill_missing_daily_wind_speed(new_df)
        
        # Append the cleaned DataFrame to the list of DataFrames
        dfs.append(new_df)
        
    
    return dfs

In [9]:
def clean_hourly_precipitation(hourly_data: pd.DataFrame) -> pd.DataFrame:
    """
    Clean the 'HourlyPrecipitation' column of the input DataFrame.

    Parameters
    ----------
    hourly_data : pd.DataFrame
        Input DataFrame containing hourly weather data.

    Returns
    -------
    pd.DataFrame
        DataFrame containing hourly data.
    """
    # Remove 's' from the HourlyPrecipitation values, replace 'T' with 0, and fill missing values with 0
    hourly_data['HourlyPrecipitation'] = hourly_data['HourlyPrecipitation'].str.replace('s', '')
    hourly_data['HourlyPrecipitation'] = hourly_data['HourlyPrecipitation'].replace('T', 0)
    hourly_data['HourlyPrecipitation'].fillna(0, inplace=True)
    # Convert the HourlyPrecipitation column to numeric type
    hourly_data['HourlyPrecipitation'] = pd.to_numeric(hourly_data['HourlyPrecipitation'])
    return hourly_data

In [10]:
def aggregate_hourly_data(hourly_data: pd.DataFrame) -> pd.DataFrame:
    """
    Aggregate the input DataFrame's data to remove multiple reports within an hour.

    Parameters
    ----------
    hourly_data : pd.DataFrame
        Input DataFrame containing hourly weather data.

    Returns
    -------
    pd.DataFrame
        DataFrame with aggregated hourly data.
    """
    # Round the datetime to the nearest hour
    hourly_data['date'] = hourly_data['date'].dt.floor('H')
    # Group by hour and aggregate the data by taking the maximum of hourly_precipitation and hourly_windspeed
    return hourly_data.groupby('date').agg(
        {'hourly_precipitation': 'max', 'hourly_windspeed': 'max'}
    ).reset_index()

In [11]:
def clean_month_weather_data_hourly(csv_file: pd.DataFrame) -> pd.DataFrame:
    """
    Clean the DataFrame and return only hourly weather data.

    Parameters
    ----------
    csv_file : pd.DataFrame
        Input DataFrame containing weather data.

    Returns
    -------
    pd.DataFrame
        DataFrame with aggregated hourly data.
    """
    # Split the input DataFrame into daily and hourly data
    daily_data = csv_file[(csv_file['DATE'].dt.hour == 23) & (csv_file['DATE'].dt.minute == 59)]
    hourly_data = csv_file.drop(daily_data.index)
    # Select only relevant columns for further processing
    columns_to_keep = ['DATE', 'HourlyPrecipitation', 'HourlyWindSpeed']
    hourly_data = hourly_data[columns_to_keep]

    # Clean the hourly precipitation and wind speed data
    hourly_data = clean_hourly_precipitation(hourly_data)
    # Clean the 'HourlyWindSpeed' column
    hourly_data['HourlyWindSpeed'].fillna(0, inplace=True)
    # Normalize the column names
    hourly_data.columns = ['date', 'hourly_precipitation', 'hourly_windspeed']
    # Aggregate the hourly data to remove multiple reports within an hour
    result = aggregate_hourly_data(hourly_data)

    return result

In [12]:
def float_to_time_string(value: float) -> str:
    """
    Transform the float value to time string in HH:MM format.

    Parameters
    ----------
    value : float
        Float value representing time.

    Returns
    -------
    str
        Formatted time string in HH:MM format.
    """
    if pd.isna(value):
        return None
    else:
        # Extract hours and minutes from the float value
        hours = int(value // 100)
        minutes = int(value % 100)
    # Return formatted time string
    return f"{hours:02d}:{minutes:02d}"

In [13]:
def convert_time_to_datetime(row: Dict[str, Any]) -> Dict[str, Any]:
    """
    Convert 'Sunrise' and 'Sunset' time strings to datetime objects using the date from the 'DATE' column.

    Parameters
    ----------
    row : Dict[str, Any]
        A dictionary representing a row in the DataFrame containing 'DATE', 'Sunrise', and 'Sunset' columns.

    Returns
    -------
    Dict[str, Any]
        The modified row with 'Sunrise' and 'Sunset' columns converted to datetime objects.
    """
    # Extract the date from the 'DATE' column
    date = row['DATE'].date()
    
    # Check if the 'Sunrise' value is not None
    if row['Sunrise'] is not None:
        # Convert the 'Sunrise' time string to a time object
        sunrise_time = pd.to_datetime(row['Sunrise'], format="%H:%M").time()
        # Combine the date and sunrise_time to create a datetime object
        row['Sunrise'] = pd.to_datetime(f"{date} {sunrise_time}", format="%Y-%m-%d %H:%M:%S")
    
    # Check if the 'Sunset' value is not None
    if row['Sunset'] is not None:
        # Convert the 'Sunset' time string to a time object
        sunset_time = pd.to_datetime(row['Sunset'], format="%H:%M").time()
        # Combine the date and sunset_time to create a datetime object
        row['Sunset'] = pd.to_datetime(f"{date} {sunset_time}", format="%Y-%m-%d %H:%M:%S")

    # Return the modified row
    return row

In [14]:
def clean_month_weather_data_daily(csv_file: pd.DataFrame) -> pd.DataFrame:
    """
    Clean the DataFrame and return only daily weather data.

    The function processes the input DataFrame and returns a cleaned DataFrame
    with daily weather data. The steps include extracting daily data, selecting
    relevant columns, cleaning sunrise and sunset data, and normalizing column names.

    Parameters
    ----------
    csv_file : pd.DataFrame
        DataFrame containing weather data.

    Returns
    -------
    pd.DataFrame
        DataFrame containing cleaned daily weather data.
    """
    # Filter daily data based on timestamp (hour=23 and minute=59)
    daily_data = csv_file[(csv_file['DATE'].dt.hour == 23) & (csv_file['DATE'].dt.minute == 59)]
    # Define the columns to keep
    columns_to_keep = ['DATE', 'DailyAverageWindSpeed', 'Sunrise', 'Sunset']
    daily_data = daily_data[columns_to_keep]
    
    # Clean the  "Sunrise" and "Sunset" columns
    # Convert 'Sunrise' and 'Sunset' columns to strings in HH:MM format
    daily_data['Sunrise'] = daily_data['Sunrise'].apply(float_to_time_string)
    daily_data['Sunset'] = daily_data['Sunset'].apply(float_to_time_string)
    # Convert 'Sunrise' and 'Sunset' columns to datetime
    daily_data = daily_data.apply(convert_time_to_datetime, axis=1)
    
    # Normalize the column names
    daily_data.columns = ['date', 'daily_average_windspeed', 'sunrise', 'sunset']
    # Round the datetime column in a DataFrame to the nearest day.
    daily_data["date"] = pd.to_datetime(daily_data["date"]).dt.date

    return daily_data

In [15]:
def load_and_clean_weather_data() -> tuple:
    """
    Load and clean weather data, return hourly and daily records as DataFrames.

    The function loads weather data from CSV files, cleans the data, and returns
    two DataFrames containing hourly and daily weather data respectively.

    Returns
    -------
    tuple
        Tuple containing two DataFrames: (hourly_data, daily_data).
    """
    # Load weather CSV files
    weather_csv_files = get_all_weather_csvs(WEATHER_CSV_DIR)

    hourly_dataframes = []
    daily_dataframes = []

    # Process each CSV file
    for csv_file in weather_csv_files:
        # Clean hourly and daily weather data
        hourly_dataframe = clean_month_weather_data_hourly(csv_file)
        daily_dataframe = clean_month_weather_data_daily(csv_file)
        # Append DataFrames to respective lists
        hourly_dataframes.append(hourly_dataframe)
        daily_dataframes.append(daily_dataframe)

    # Create two DataFrames with hourly and daily data from every month
    hourly_data = pd.concat(hourly_dataframes)
    daily_data = pd.concat(daily_dataframes)

    return hourly_data, daily_data

In [16]:
hourly_weather_data, daily_weather_data = load_and_clean_weather_data()

In [17]:
hourly_weather_data.head()

Unnamed: 0,date,hourly_precipitation,hourly_windspeed
0,2009-01-01 00:00:00,0.0,18.0
1,2009-01-01 01:00:00,0.0,18.0
2,2009-01-01 02:00:00,0.0,18.0
3,2009-01-01 03:00:00,0.0,8.0
4,2009-01-01 04:00:00,0.0,11.0


In [18]:
daily_weather_data.head()

Unnamed: 0,date,daily_average_windspeed,sunrise,sunset
11123,2009-01-01,11.041667,NaT,NaT
55,2009-01-02,6.806452,2009-01-02 07:20:00,2009-01-02 16:40:00
11124,2009-01-03,9.875,NaT,NaT
11125,2009-01-04,7.37037,NaT,NaT
11126,2009-01-05,6.925926,NaT,NaT


## Part 2: Storing Cleaned Data

In [None]:
engine = db.create_engine(DATABASE_URL)

In [None]:
# Create the table schema
HOURLY_WEATHER_SCHEMA = """
CREATE TABLE IF NOT EXISTS hourly_weather (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    date DATETIME,
    hourly_precipitation FLOAT,
    hourly_windspeed FLOAT
);
"""

DAILY_WEATHER_SCHEMA = """
CREATE TABLE IF NOT EXISTS daily_weather (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    date DATETIME,
    daily_average_windspeed FLOAT,
    sunrise DATETIME,
    sunset DATETIME
);
"""

TAXI_TRIPS_SCHEMA = """
CREATE TABLE IF NOT EXISTS taxi_trips (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pickup_datetime DATETIME,
    pickup_longitude FLOAT,
    pickup_latitude FLOAT,
    dropoff_longitude FLOAT,
    dropoff_latitude FLOAT,
    tip_amount FLOAT,
    distance FLOAT
);
"""

UBER_TRIPS_SCHEMA = """
CREATE TABLE IF NOT EXISTS uber_trips (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pickup_datetime DATETIME,
    pickup_longitude FLOAT,
    pickup_latitude FLOAT,
    dropoff_longitude FLOAT,
    dropoff_latitude FLOAT,
    distance FLOAT
);
"""

In [None]:
# create that required schema.sql file
with open(DATABASE_SCHEMA_FILE, "w") as f:
    f.write(HOURLY_WEATHER_SCHEMA)
    f.write(DAILY_WEATHER_SCHEMA)
    f.write(TAXI_TRIPS_SCHEMA)
    f.write(UBER_TRIPS_SCHEMA)

In [None]:
# create the tables with the schema files
with engine.connect() as connection:
    connection.execute(HOURLY_WEATHER_SCHEMA)
    connection.execute(DAILY_WEATHER_SCHEMA)
    connection.execute(TAXI_TRIPS_SCHEMA)
    connection.execute(UBER_TRIPS_SCHEMA)

### Add Data to Database

In [None]:
def write_dataframes_to_table(table_to_df_dict: Dict[str, pd.DataFrame]) -> None:
    """
    Write DataFrames to their corresponding database tables.

    Parameters
    ----------
    table_to_df_dict : Dict[str, pd.DataFrame]
        A dictionary where keys represent table names and values represent DataFrames to be written to those tables.
    """
    # Iterate through the table name and DataFrame pairs in the dictionary
    for table, dataframe in table_to_df_dict.items():
        # Write the DataFrame to the corresponding table in the database
        dataframe.to_sql(table, con=engine, if_exists='append', index=False)

In [None]:
map_table_name_to_dataframe = {
    "taxi_trips": taxi_data,
    "uber_trips": uber_data,
    "hourly_weather": hourly_weather_data,
    "daily_weather": daily_weather_data,
}

In [None]:
write_dataframes_to_table(map_table_name_to_dataframe)

## Part 3: Understanding the Data

In [None]:
# Helper function to write the queries to file
def write_query_to_file(query, outfile):
    raise NotImplementedError()

### Query 1

In [None]:
QUERY_1_FILENAME = ""

QUERY_1 = """
TODO
"""

In [None]:
engine.execute(QUERY_1).fetchall()

In [None]:
write_query_to_file(QUERY_1, QUERY_1_FILENAME)

## Part 4: Visualizing the Data

### Visualization 1

In [None]:
# use a more descriptive name for your function
def plot_visual_1(dataframe):
    figure, axes = plt.subplots(figsize=(20, 10))
    
    values = "..."  # use the dataframe to pull out values needed to plot
    
    # you may want to use matplotlib to plot your visualizations;
    # there are also many other plot types (other 
    # than axes.plot) you can use
    axes.plot(values, "...")
    # there are other methods to use to label your axes, to style 
    # and set up axes labels, etc
    axes.set_title("Some Descriptive Title")
    
    plt.show()

In [None]:
def get_data_for_visual_1():
    # Query SQL database for the data needed.
    # You can put the data queried into a pandas dataframe, if you wish
    raise NotImplementedError()

In [None]:
some_dataframe = get_data_for_visual_1()
plot_visual_1(some_dataframe)