# Part 1: Data Preprocessing


## 1. Downloading

Programmatically download the Yellow Taxi trip data

Using the re module, write a regular expression to help pull out the desired links for Yellow Taxi Parquet files

Use the 3rd-party packages requests and  BeautifulSoup to programmatically download the Yellow Taxi Parquet files


In [1]:
#imports 
import os
import re
import unittest
from typing import List
from unittest.mock import patch

import requests
from bs4 import BeautifulSoup


In [2]:
# Define the URL for taxi data
taxi_data_url = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

In [3]:
# Define functions
def download_file(url: str, local_filename: str) -> str:
    """
    Download a file from the given URL and save it to the specified local file.

    :param url: The URL of the file to download.
    :param local_filename: The path to save the downloaded file.
    :return: The path of the saved file.
    """
    # Send an HTTP request to the URL
    with requests.get(url, stream=True) as r:
        # Raise an exception if the request fails
        r.raise_for_status()
        # Save the file to the specified path
        with open(local_filename, 'wb') as f:
            # Write the content of the response in chunks
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)
    return local_filename

def get_yellow_taxi_parquet_links(soup: BeautifulSoup) -> List[str]:
    """
    Get yellow taxi parquet links from the BeautifulSoup object.

    :param soup: BeautifulSoup object containing the parsed HTML content.
    :return: A list of yellow taxi parquet links.
    """
    # Initialize an empty list to store the links
    yellow_taxi_parquet_links = []
    # Iterate through all 'a' tags with 'href' attribute
    for link in soup.find_all('a', href=True):
        href = link['href']
        # Check if the link matches the regex pattern
        if re.match(r'.*yellow_tripdata_(2009|2010|2011|2012|2013|2014|2015).*\.parquet', href):
            # Check if the month is between January and June for 2015 data
            if "2015" in href:
                date_part = re.search(r"(\d{4})-(\d{2})", href)
                if date_part:
                    year, month = int(date_part.group(1)), int(date_part.group(2))
                    if year == 2015 and month > 6:
                        continue
            # Add the link to the list
            yellow_taxi_parquet_links.append(href)
    return yellow_taxi_parquet_links

def test_download_file():
    import tempfile
    test_url = "https://www.example.com/"
    with tempfile.NamedTemporaryFile() as temp_file:
        local_path = temp_file.name
        assert download_file(test_url, local_path) == local_path
        assert os.path.getsize(local_path) > 0

In [4]:
# Send an HTTP request to the taxi data URL
response = requests.get(taxi_data_url)
# Parse the HTML content of the response
soup = BeautifulSoup(response.content, 'lxml')
# Get the list of yellow taxi parquet links
yellow_taxi_parquet_links = get_yellow_taxi_parquet_links(soup)

# Define the directory to store the downloaded files
yellow_taxi_data_dir = "yellow_taxi_data"
# Create the directory if it doesn't exist
os.makedirs(yellow_taxi_data_dir, exist_ok=True)

In [5]:
# Download and save each file from the list of links
for url in yellow_taxi_parquet_links:
    file_name = url.split("/")[-1]
    local_path = os.path.join(yellow_taxi_data_dir, file_name)
    print(f"Downloading {file_name} ...")
    download_file(url, local_path)
    print

Downloading yellow_tripdata_2015-01.parquet ...
Downloading yellow_tripdata_2015-02.parquet ...
Downloading yellow_tripdata_2015-03.parquet ...
Downloading yellow_tripdata_2015-04.parquet ...
Downloading yellow_tripdata_2015-05.parquet ...
Downloading yellow_tripdata_2015-06.parquet ...
Downloading yellow_tripdata_2014-01.parquet ...
Downloading yellow_tripdata_2014-02.parquet ...
Downloading yellow_tripdata_2014-03.parquet ...
Downloading yellow_tripdata_2014-04.parquet ...
Downloading yellow_tripdata_2014-05.parquet ...
Downloading yellow_tripdata_2014-06.parquet ...
Downloading yellow_tripdata_2014-07.parquet ...
Downloading yellow_tripdata_2014-08.parquet ...
Downloading yellow_tripdata_2014-09.parquet ...
Downloading yellow_tripdata_2014-10.parquet ...
Downloading yellow_tripdata_2014-11.parquet ...
Downloading yellow_tripdata_2014-12.parquet ...
Downloading yellow_tripdata_2013-01.parquet ...
Downloading yellow_tripdata_2013-02.parquet ...
Downloading yellow_tripdata_2013-03.parq

## 2. Cleaning & filtering:


looking up the latitude and longitude for some months where only location IDs are given for pickups and dropoffs


In [5]:
import geopandas as gpd
import pandas as pd
from typing import Tuple

def read_zones_from_shapefile(shapefile_path: str) -> gpd.GeoDataFrame:
    """
    Read the shapefile and return the GeoDataFrame with the required columns.

    :param shapefile_path: The path to the shapefile.
    :return: A GeoDataFrame with the required columns.
    """
    # Read the shapefile
    zones = gpd.read_file(shapefile_path)
    
    # Convert the coordinate reference system
    zones = zones.to_crs(4326)
    
    # Calculate the longitude and latitude for each zone
    zones['Lon'] = zones.centroid.x
    zones['Lat'] = zones.centroid.y
    
    # Drop unnecessary columns
    zones = zones.drop(columns=['OBJECTID', "Shape_Leng", 'Shape_Area', "zone", 'borough', 'geometry'])
    
    return zones


def test_read_zones_from_shapefile():
    import os
    test_shapefile_path = "path/to/your/test/shapefile.shp"
    
    if os.path.exists(test_shapefile_path):
        zones = read_zones_from_shapefile(test_shapefile_path)
        assert isinstance(zones, gpd.GeoDataFrame)
        assert "Lon" in zones.columns
        assert "Lat" in zones.columns
    else:
        print("Test shapefile not found, skipping test.")


In [6]:
# Set the path to shapefile
shapefile_path = "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/taxi_zones/taxi_zones.shp"

# Read the GeoDataFrame from the shapefile
zones = read_zones_from_shapefile(shapefile_path)

# Print the GeoDataFrame
print(zones)


     LocationID        Lon        Lat
0             1 -74.174000  40.691831
1             2 -73.831299  40.616745
2             3 -73.847422  40.864474
3             4 -73.976968  40.723752
4             5 -74.188484  40.552659
..          ...        ...        ...
258         259 -73.852215  40.897932
259         260 -73.906306  40.744235
260         261 -74.013023  40.709139
261         262 -73.946510  40.775932
262         263 -73.951010  40.778766

[263 rows x 3 columns]



  zones['Lon'] = zones.centroid.x

  zones['Lat'] = zones.centroid.y


###  Calculate distance（Missing data)

Add to the datasets missing information that can be calculated. Mainly: the distance between a trip’s starting point and ending point

In [7]:
import math
from typing import Tuple

def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """
    Calculate the distance between two points on Earth, given their latitudes and longitudes.

    :param lat1: Latitude of the first point.
    :param lon1: Longitude of the first point.
    :param lat2: Latitude of the second point.
    :param lon2: Longitude of the second point.
    :return: The distance between the two points in kilometers.
    """
    # Approximate radius of Earth in km
    R = 6373.0

    # Convert degrees to radians
    lat1 = math.radians(lat1)
    lon1 = math.radians(lon1)
    lat2 = math.radians(lat2)
    lon2 = math.radians(lon2)

    # Calculate the differences in latitudes and longitudes
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    # Apply the Haversine formula
    a = (math.sin(dlat / 2)) ** 2 + math.cos(lat1) * math.cos(lat2) * (math.sin(dlon / 2)) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    
    # Calculate the distance in kilometers
    distance = R * c
    return distance

def test_calculate_distance():
    lat1, lon1 = 40.7128, -74.0060  # New York City
    lat2, lon2 = 51.5074, -0.1278  # London

    distance = calculate_distance(lat1, lon1, lat2, lon2)
    assert isinstance(distance, float)
    assert round(distance) == 5571  # The distance between New York City and London is approximately 5571 km


### Processing uber rides sample(csv)+Missing data


removing unnecessary columns and only keeping columns needed to answer questions in the other parts of this project

removing invalid data points

normalizing column names

normalizing and using appropriate column types for the respective data


In [8]:
def process_uber_data(csv_path: str) -> pd.DataFrame:
    """
    Process the Uber data from the given CSV file and return a DataFrame with the required columns.

    :param csv_path: The path to the CSV file.
    :return: A DataFrame with the required columns.
    """
    # Read the CSV file
    uber_data = pd.read_csv(csv_path)

    # Calculate distance and add it to the DataFrame
    uber_data['Distance'] = uber_data.apply(lambda row: calculate_distance(row['pickup_latitude'], row['pickup_longitude'],
                                                                  row['dropoff_latitude'], row['dropoff_longitude']), axis=1)

    # Define coordinate box boundaries
    lat_min, lat_max = 40.560445, 40.908524
    lng_min, lng_max = -74.242330, -73.717047

    # Filter trips within the coordinate box
    uber_data = uber_data[(uber_data['pickup_latitude'] >= lat_min) & (uber_data['pickup_latitude'] <= lat_max) &
                (uber_data['dropoff_latitude'] >= lat_min) & (uber_data['dropoff_latitude'] <= lat_max) &
                (uber_data['pickup_longitude'] >= lng_min) & (uber_data['pickup_longitude'] <= lng_max) &
                (uber_data['dropoff_longitude'] >= lng_min) & (uber_data['dropoff_longitude'] <= lng_max)]

    # Drop rows with missing values
    uber_data.dropna(axis=0, how='any', inplace=True)

    # Drop trips with zero passenger count or fare amount
    uber_data = uber_data[(uber_data["passenger_count"] != 0) & (uber_data["fare_amount"] != 0)]

    # Drop trips with zero distance
    uber_data = uber_data[uber_data["Distance"] != 0]

    # Convert 'pickup_datetime' column to pandas datetime object and extract hour and day of week
    uber_data['pickup_datetime'] = pd.to_datetime(uber_data['pickup_datetime']).dt.tz_localize(None)
    uber_data['pickup_hour'] = uber_data['pickup_datetime'].dt.hour
    uber_data['day_of_week'] = uber_data['pickup_datetime'].dt.dayofweek

    # Select required columns and rename them
    uber_data = uber_data[['pickup_datetime', 'pickup_hour', 'day_of_week', 'pickup_longitude', 'pickup_latitude',
                           'dropoff_longitude', 'dropoff_latitude', "Distance"]]
    uber_data.rename(columns={
            'pickup_longitude': 'Start_Lon',
            'pickup_latitude': 'Start_Lat',
            'dropoff_longitude': 'End_Lon',
            'dropoff_latitude': 'End_Lat',
            'Distance': 'Trip_Distance',
            'pickup_datetime': 'Pickup_Datetime'
        }, inplace=True)

    # Sort by pickup datetime and reset index
    uber_data = uber_data.sort_values(by='Pickup_Datetime').reset_index(drop=True)

    return uber_data

def test_process_uber_data():
    test_csv_path = "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/uber_rides_sample.csv"
    
    if os.path.exists(test_csv_path):
        processed_data = process_uber_data(test_csv_path)
        assert isinstance(processed_data, pd.DataFrame)
        assert "Pickup_Datetime" in processed_data.columns
        assert "Start_Lon" in processed_data.columns
        assert "Start_Lat" in processed_data.columns
    else:
        print("Test CSV file not found, skipping test.")


In [9]:
#save the processed data to a new CSV file
def save_processed_data(processed_data: pd.DataFrame, output_csv_path: str) -> None:
    """
    Save the processed Uber data to a new CSV file.

    :param processed_data: The DataFrame containing the processed data.
    :param output_csv_path: The path to the output CSV file.
    """
    processed_data.to_csv(output_csv_path, index=False)


def test_save_processed_data():
    test_output_csv_path = "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/output_uber_rides_sample.csv"
    test_processed_data = pd.DataFrame({
        'Pickup_Datetime': [pd.Timestamp('2015-05-17 09:43:00')],
        'pickup_hour': [9],
        'day_of_week': [6],
        'Start_Lon': [-73.9871],
        'Start_Lat': [40.7339],
        'End_Lon': [-73.9894],
        'End_Lat': [40.7411],
        'Trip_Distance': [0.817]},
        index=[0])

    save_processed_data(test_processed_data, test_output_csv_path)
    assert os.path.exists(test_output_csv_path)

    loaded_data = pd.read_csv(test_output_csv_path)
    print("Loaded Data:\n", loaded_data)
    print("Expected Processed Data:\n", test_processed_data)
    assert loaded_data.equals(test_processed_data)



In [10]:
# Usage of the functions:
input_csv_path = "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/uber_rides_sample.csv"
output_csv_path = "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/processed_uber_rides_sample.csv"

uber_data = process_uber_data(input_csv_path)
save_processed_data(uber_data, output_csv_path)


In [11]:
# Run test functions here to check if everything is working correctly.
#test_calculate_distance()
test_process_uber_data()
#test_save_processed_data()

### Processing yellow_taxi files+Sampling+Missing data

Cleaning & filtering:

removing unnecessary columns and only keeping columns needed to answer questions in the other parts of this project

removing invalid data points

normalizing column names

normalizing and using appropriate column types for the respective data

for Uber and Yellow Taxi data, removing trips that start and/or end outside of the following latitude/longitude coordinate box: (40.560445, -74.242330) and (40.908524, -73.717047)

Sampling: Each month of Yellow Taxi data contains millions of trips. However, the provided Uber dataset is only a sampling of all data. Therefore, you will need to generate a sampling of Yellow Taxi data that’s roughly equal to the sample size of the Uber dataset. 

Missing data: To help answer the questions later in the project, you will need to add to the datasets missing information that can be calculated. Mainly: the distance between a trip’s starting point and ending point 


#### clean the data for year 2009

In [63]:
def process_dataframe_2009(raw_dataframe_2009: pd.DataFrame) -> pd.DataFrame:
    """
    Process the input DataFrame for year 2009 and clean the data.

    :param raw_dataframe_2009: The DataFrame containing the raw data for year 2009.
    :return: A cleaned DataFrame with the required columns.
    """
    # Drop trips with zero passenger count
    cleaned_df_2009 = raw_dataframe_2009[raw_dataframe_2009['Passenger_Count'] != 0]
    # Drop trips with no fare
    cleaned_df_2009 = cleaned_df_2009[cleaned_df_2009['Fare_Amt'] != 0]
    cleaned_df_2009 = cleaned_df_2009[cleaned_df_2009['Tip_Amt'] != 0]

    # Drop unnecessary columns
    cleaned_df_2009 = cleaned_df_2009.drop(columns=["vendor_name", "Passenger_Count", "Rate_Code","store_and_forward", "Payment_Type", 
                          "surcharge", "mta_tax", "Tolls_Amt", "Total_Amt","Trip_Dropoff_DateTime","Fare_Amt"])

    # Filter out trips that start and/or end outside of the defined latitude/longitude bounds
    lat_min, lat_max = 40.560445, 40.908524
    lng_min, lng_max = -74.242330, -73.717047
    cleaned_df_2009 = cleaned_df_2009[(cleaned_df_2009['Start_Lat'] >= lat_min) & (cleaned_df_2009['Start_Lat'] <= lat_max) &
            (cleaned_df_2009['End_Lat'] >= lat_min) & (cleaned_df_2009['End_Lat'] <= lat_max) &
            (cleaned_df_2009['Start_Lon'] >= lng_min) & (cleaned_df_2009['Start_Lon'] <= lng_max) &
            (cleaned_df_2009['End_Lon'] >= lng_min) & (cleaned_df_2009['End_Lon'] <= lng_max)]

    # Filter out rows with the same pickup and dropoff location
    cleaned_df_2009 = cleaned_df_2009[(cleaned_df_2009['Start_Lon'] != cleaned_df_2009['End_Lon']) &(cleaned_df_2009['Start_Lat'] != cleaned_df_2009['End_Lat'])]

    cleaned_df_2009.rename(columns = {'Trip_Pickup_DateTime':'Pickup_Datetime',
                         "Trip_Dropoff_DateTime":"Dropoff_Datetime",
                        },inplace = True)

    # Select necessary columns
    cleaned_df_2009 = cleaned_df_2009.loc[:, ["Pickup_Datetime","Trip_Distance","Start_Lon",
                    "Start_Lat","End_Lon","End_Lat","Tip_Amt"]]

    # Drop NA
    cleaned_df_2009.dropna(inplace=True)

    return cleaned_df_2009


In [64]:
def process_data_2009(year: int, sample_size: int):
    """
    Process the data for the specified year and sample size.

    :param year: The year of the data to process.
    :param sample_size: The sample size to use for each month's data.
    """
    # Define the file paths for each month for the specified year
    file_paths = [f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/yellow_tripdata_{year}-{month:02}.parquet' for month in range(1, 13)]

    all_dfs_2009 = []

    for file_path in file_paths:
        # Load the entire dataset
        entire_df_2009 = pd.read_parquet(file_path)

        # Process the entire dataset
        cleaned_entire_df_2009 = process_dataframe_2009(entire_df_2009)

        # Sample the cleaned dataframe
        sampled_df_2009 = cleaned_entire_df_2009.sample(n=sample_size, random_state=42)

        all_dfs_2009.append(sampled_df_2009)

    final_df_2009 = pd.concat(all_dfs_2009, ignore_index=True)
    output_file_path = f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_{year}.parquet'
    final_df_2009.to_parquet(output_file_path)

# Set the sample size
sample_size = 2500

# Process the data for the year 2009
process_data_2009(2009, sample_size)


In [65]:
file_path = "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_2009.parquet"
df = pd.read_parquet(file_path)
print("Length of the data in cleaned_yellow_tripdata_2009.parquet:", len(df))

Length of the data in cleaned_yellow_tripdata_2009.parquet: 30000


In [41]:
# Test function 
import os

def test_process_data_2009():
    """
    Test the process_data function for years 2009
    """
    sample_size = 10
    year = 2009
    process_data_2009(year, sample_size)
    
    output_file_path = f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/test_cleaned_yellow_tripdata_{year}.parquet'
    
    # Check if the output file is created
    assert os.path.exists(output_file_path)
    
    # Load the output file and check the number of rows
    test_df = pd.read_parquet(output_file_path)
    assert len(test_df) == sample_size * 12  # 12 months of data

# Run the test function
test_process_data_2009()


#### clean the data for year 2010

In [66]:
def process_dataframe_2010(df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean the dataframe for year 2010.

    :param df: The input dataframe to clean.
    :return: The cleaned dataframe.
    """
    # Drop trips with zero passenger count
    df = df[df['passenger_count'] != 0]

    # Drop trips with no fare
    df = df[df['fare_amount'] != 0]
    df = df[df['tip_amount'] != 0]

    # Drop unnecessary columns
    df = df.drop(columns=["vendor_id", "passenger_count", "rate_code", "dropoff_datetime", "fare_amount",
                          "store_and_fwd_flag", "payment_type", "surcharge", "mta_tax", "tolls_amount", "total_amount"])

    df.rename(columns={'pickup_datetime': 'Pickup_Datetime',
                       "dropoff_datetime": "Dropoff_Datetime",
                       "trip_distance": "Trip_Distance",
                       "fare_amount": "Fare_Amt",
                       "tip_amount": "Tip_Amt",
                       "pickup_longitude": "Start_Lon",
                       "pickup_latitude": "Start_Lat",
                       "dropoff_longitude": "End_Lon",
                       "dropoff_latitude": "End_Lat"
                       }, inplace=True)

    # Filter out trips that start and/or end outside of the defined latitude/longitude bounds
    # Define the latitude/longitude bounds
    lat_min, lat_max = 40.560445, 40.908524
    lng_min, lng_max = -74.242330, -73.717047
    df = df[(df['Start_Lat'] >= lat_min) & (df['Start_Lat'] <= lat_max) &
            (df['End_Lat'] >= lat_min) & (df['End_Lat'] <= lat_max) &
            (df['Start_Lon'] >= lng_min) & (df['Start_Lon'] <= lng_max) &
            (df['End_Lon'] >= lng_min) & (df['End_Lon'] <= lng_max)]

    # Filter out rows with the same pickup and dropoff location
    df = df[(df['Start_Lon'] != df['End_Lon']) & (df['Start_Lat'] != df['End_Lat'])]

    # Select necessary columns
    df = df.loc[:, ["Pickup_Datetime", "Trip_Distance", "Start_Lon",
                    "Start_Lat", "End_Lon", "End_Lat", "Tip_Amt"]]

    # Remove NaN
    df.dropna()

    return df



In [67]:
def process_data_2010(year: int, sample_size: int):
    """
    Process the data for the specified year and sample size.

    :param year: The year of the data to process.
    :param sample_size: The sample size to use for each month's data.
    """
    # Define the file paths for each month for the specified year
    file_paths = [f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/yellow_tripdata_{year}-{month:02}.parquet' for month in range(1, 13)]

    all_dfs_2010 = []

    for file_path in file_paths:
        # Load the entire dataset
        entire_df_2010 = pd.read_parquet(file_path)

        # Process the entire dataset
        cleaned_entire_df_2010 = process_dataframe_2010(entire_df_2010)

        # Sample the cleaned dataframe
        sampled_df_2010 = cleaned_entire_df_2010.sample(n=sample_size, random_state=42)

        all_dfs_2010.append(sampled_df_2010)

    final_df_2010 = pd.concat(all_dfs_2010, ignore_index=True)
    output_file_path = f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_{year}.parquet'
    final_df_2010.to_parquet(output_file_path)

#Set the sample size
sample_size = 2500

#Process the data for the year 2010
process_data_2010(2010, sample_size)



In [68]:
file_path = "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_2010.parquet"
df = pd.read_parquet(file_path)
print("Length of the data in cleaned_yellow_tripdata_2010.parquet:", len(df))

Length of the data in cleaned_yellow_tripdata_2010.parquet: 30000


In [45]:
# Test function 
import os

def test_process_data_2010():
    """
    Test the process_data function for years 2010
    """
    sample_size = 10
    year = 2010
    process_data_2010(year, sample_size)
    
    output_file_path = f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/test_cleaned_yellow_tripdata_{year}.parquet'
    
    # Check if the output file is created
    assert os.path.exists(output_file_path)
    
    # Load the output file and check the number of rows
    test_df = pd.read_parquet(output_file_path)
    assert len(test_df) == sample_size * 12  # 12 months of data

# Run the test function
test_process_data_2010()


#### clean the data for year 2011-2014

In [69]:
def process_dataframe_2011_2014(raw_data: pd.DataFrame) -> pd.DataFrame:
    """
    Clean and preprocess the given taxi data DataFrame.

    :param raw_data: Input taxi data DataFrame.
    :return: Cleaned and preprocessed taxi data DataFrame.
    """
    # Drop trips with zero passenger count
    valid_passengers = raw_data[raw_data['passenger_count'] != 0]

    # Drop trips with no fare
    valid_fare = valid_passengers[valid_passengers['fare_amount'] != 0]
    valid_tip = valid_fare[valid_fare['tip_amount'] != 0]

    # Drop unnecessary columns
    reduced_columns = valid_tip.drop(columns=["tolls_amount", "mta_tax", "extra", "VendorID", "passenger_count", "tpep_dropoff_datetime",
                          "RatecodeID", "store_and_fwd_flag", "payment_type", "improvement_surcharge",
                          "total_amount", "congestion_surcharge", "airport_fee", "fare_amount"])

    # Merge with location id and id
    pickup_locations = reduced_columns.drop_duplicates().merge(zones, left_on='PULocationID', right_on="LocationID",
          how='left').rename(columns={'Lon':'Start_Lon', "Lat":"Start_Lat"}).drop(columns=["PULocationID", "LocationID"])
    cleaned_data = pickup_locations.merge(zones, left_on='DOLocationID', right_on="LocationID", how='left'
          ).rename(columns={'Lon':'End_Lon', "Lat":"End_Lat"}).drop(columns=["DOLocationID", "LocationID"])

    cleaned_data.rename(columns={'tpep_pickup_datetime':'Pickup_Datetime',
                                  "trip_distance" : "Trip_Distance",
                                  "tip_amount" : "Tip_Amt"
                                 }, inplace=True)

    # Define the latitude/longitude bounds
    lat_min, lat_max = 40.560445, 40.908524
    lng_min, lng_max = -74.242330, -73.717047

    cleaned_data = cleaned_data[(cleaned_data['Start_Lat'] >= lat_min) & (cleaned_data['Start_Lat'] <= lat_max) &
                        (cleaned_data['End_Lat'] >= lat_min) & (cleaned_data['End_Lat'] <= lat_max) &
                        (cleaned_data['Start_Lon'] >= lng_min) & (cleaned_data['Start_Lon'] <= lng_max) &
                        (cleaned_data['End_Lon'] >= lng_min) & (cleaned_data['End_Lon'] <= lng_max)]

    # Filter out rows with the same pickup and dropoff location
    cleaned_data = cleaned_data[(cleaned_data['Start_Lon'] != cleaned_data['End_Lon']) &
                                 (cleaned_data['Start_Lat'] != cleaned_data['End_Lat'])]

    cleaned_data = cleaned_data.reindex(columns=["Pickup_Datetime", "Trip_Distance", "Start_Lon",
                                                 "Start_Lat", "End_Lon", "End_Lat", "Tip_Amt"])

    # Drop rows with missing values
    cleaned_data.dropna(inplace=True)

    return cleaned_data



In [70]:
def process_data_2011_2014(start_year: int, end_year: int, sample_size: int) -> None:
    """
    Process and save the cleaned taxi data for the specified years.

    :param start_year: Starting year for the data.
    :param end_year: Ending year for the data.
    :param sample_size: Number of samples per month.
    """
    all_dfs = []

    for year in range(start_year, end_year+1):
        # Define the file paths for each month for the specified year
        file_paths = [f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/yellow_tripdata_{year}-{month:02}.parquet' for month in range(1, 13)]

        year_dfs = []
        for file_path in file_paths:
            # Load the entire dataset
            entire_df = pd.read_parquet(file_path)

            # Process the entire dataset
            cleaned_entire_df = process_dataframe_2011_2014(entire_df)

            # Sample the cleaned dataframe
            sampled_df = cleaned_entire_df.sample(n=sample_size, random_state=42)

            year_dfs.append(sampled_df)

        # Concatenate all sampled dataframes for the year
        year_df = pd.concat(year_dfs, ignore_index=True)

        all_dfs.append(year_df)

    # Concatenate all sampled dataframes for all years
    final_df = pd.concat(all_dfs, ignore_index=True)

    output_file_path = f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_{start_year}-{end_year}.parquet'
    final_df.to_parquet(output_file_path)

start_year = 2011
end_year = 2014
sample_size = 2500

process_data_2011_2014(start_year, end_year, sample_size)


In [71]:
file_path = "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_2011-2014.parquet"
df = pd.read_parquet(file_path)
print("Length of the data in cleaned_yellow_tripdata_2011-2014.parquet:", len(df))


Length of the data in cleaned_yellow_tripdata_2011-2014.parquet: 120000


In [49]:
def test_process_data_2011_2014():
    """
    Test the process_data function for years 2011-2014
    """
    start_year = 2011
    end_year = 2014
    sample_size = 10
    process_data_2011_2014(start_year, end_year, sample_size)

    output_file_path = f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/test_cleaned_yellow_tripdata_{start_year}-{end_year}.parquet'

    # Check if the output file is created
    assert os.path.exists(output_file_path)

    # Load the output file and check the number of rows
    test_df = pd.read_parquet(output_file_path)
    total_months = (end_year - start_year + 1) * 12
    assert len(test_df) == sample_size * total_months  # total months of data

# Run the test function
test_process_data_2011_2014()


#### clean the data for year 2015

In [72]:
def process_dataframe_2015(df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean the dataframe for year 2015.

    :param df: The input dataframe
    :return: The cleaned dataframe
    """
    # Drop trips with zero passenger count
    df = df[df['passenger_count'] != 0]
    
    # Drop trips with no fare
    df = df[df['fare_amount'] != 0]
    df = df[df['tip_amount'] != 0]

    # Drop unnecessary columns
    df.drop(columns=["tolls_amount", "mta_tax", "extra", "VendorID", "passenger_count", "tpep_dropoff_datetime",
                     "RatecodeID", "store_and_fwd_flag", "payment_type", "improvement_surcharge",
                     "total_amount", "congestion_surcharge", "airport_fee", "fare_amount"], inplace=True)

    # Combine with location id and id
    df2 = df.drop_duplicates().merge(zones, left_on='PULocationID', right_on="LocationID",
                                     how='left').rename(columns={'Lon': 'Start_Lon', "Lat": "Start_Lat"}).drop(
        columns=["PULocationID", "LocationID"])
    clean_data = df2.merge(zones, left_on='DOLocationID', right_on="LocationID", how='inner'
                           ).rename(columns={'Lon': 'End_Lon', "Lat": "End_Lat"}).drop(columns=["DOLocationID", "LocationID"])

    clean_data.rename(columns={'tpep_pickup_datetime': 'Pickup_Datetime',
                               "tpep_dropoff_datetime": "Dropoff_Datetime",
                               "trip_distance": "Trip_Distance",
                               "fare_amount": "Fare_Amt",
                               "tip_amount": "Tip_Amt"
                               }, inplace=True)

    # Define the latitude/longitude bounds
    lat_min, lat_max = 40.560445, 40.908524
    lng_min, lng_max = -74.242330, -73.717047
    # Filter out trips that start and/or end outside of the defined latitude/longitude bounds
    clean_data = clean_data[(clean_data['Start_Lat'] >= lat_min) & (clean_data['Start_Lat'] <= lat_max) &
                            (clean_data['End_Lat'] >= lat_min) & (clean_data['End_Lat'] <= lat_max) &
                            (clean_data['Start_Lon'] >= lng_min) & (clean_data['Start_Lon'] <= lng_max) &
                            (clean_data['End_Lon'] >= lng_min) & (clean_data['End_Lon'] <= lng_max)]

    # Filter out rows with the same pickup and dropoff location
    clean_data = clean_data[(clean_data['Start_Lon'] != clean_data['End_Lon']) &
                            (clean_data['Start_Lat'] != clean_data['End_Lat'])]

    clean_data = clean_data[["Pickup_Datetime", "Trip_Distance", "Start_Lon",
                             "Start_Lat", "End_Lon", "End_Lat", "Tip_Amt"]]

    clean_data.dropna(inplace=True)
    return clean_data



In [73]:
def process_data_2015(year: int, sample_size: int) -> None:
    """
    Process data for the year 2015.

    :param year: The year of data to process
    :param sample_size: The number of samples to take for each month
    :return: None
    """
    # Define the file paths for each month for the specified year
    file_paths = [f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/yellow_tripdata_{year}-{month:02}.parquet' for month in range(1, 7)]

    all_dfs = []

    for file_path in file_paths:
        # Load the entire dataset
        entire_df = pd.read_parquet(file_path)

        # Process the entire dataset
        cleaned_entire_df = process_dataframe_2015(entire_df)

        # Sample the cleaned dataframe
        sampled_df = cleaned_entire_df.sample(n=sample_size, random_state=42)

        all_dfs.append(sampled_df)

    final_df = pd.concat(all_dfs, ignore_index=True)
    output_file_path = f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_{year}.parquet'
    final_df.to_parquet(output_file_path)

# Set the sample size
sample_size = 2500

# Process the data for the year 2015
process_data_2015(2015, sample_size)


In [74]:
file_path = "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_2015.parquet"
df = pd.read_parquet(file_path)
print("Length of the data in cleaned_yellow_tripdata_2015.parquet:", len(df))


Length of the data in cleaned_yellow_tripdata_2015.parquet: 15000


In [54]:
def test_process_data_2015():
    """
    Test the process_data function for the year 2015
    """
    year = 2015
    sample_size = 10
    process_data_2015(year, sample_size)

    output_file_path = f'/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/test_cleaned_yellow_tripdata_{year}.parquet'

    # Check if the output file is created
    assert os.path.exists(output_file_path)

    # Load the output file and check the number of rows
    test_df = pd.read_parquet(output_file_path)
    total_months = 6  # Since the data for 2015 is available for 6 months only
    assert len(test_df) == sample_size * total_months  # total months of data

# Run the test function
test_process_data_2015()


### Combine all the cleaned yellow taxi data together

In [12]:
def load_and_concatenate_parquets(file_paths: List[str]) -> pd.DataFrame:
    """
    Load Parquet files from the given file paths and concatenate them into a single DataFrame.

    Args:
        file_paths (List[str]): List of file paths to the Parquet files.

    Returns:
        pd.DataFrame: A concatenated DataFrame containing data from all the Parquet files.
    """
    # Load all the Parquet files into a list of dataframes
    dfs = [pd.read_parquet(f) for f in file_paths]

    # Concatenate all the dataframes together
    combined_data = pd.concat(dfs)

    return combined_data

def sort_and_reset_index(df: pd.DataFrame, datetime_column: str) -> pd.DataFrame:
    """
    Sort the DataFrame by a datetime column and reset the index.

    Args:
        df (pd.DataFrame): The input DataFrame.
        datetime_column (str): The name of the datetime column to sort by.

    Returns:
        pd.DataFrame: The sorted DataFrame with reset index.
    """
    # Convert the integer column to datetime format
    df[datetime_column] = pd.to_datetime(df[datetime_column])

    # Sort the DataFrame by the datetime column
    df = df.sort_values(by=datetime_column)

    # Reset the index
    df = df.reset_index(drop=True)

    return df

# List all the file paths
file_list = [
    "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_2009.parquet",
    "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_2010.parquet",
    "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_2011-2014.parquet",
    "/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/cleaned_yellow_tripdata_2015.parquet"
]

# Load and concatenate the Parquet files
yellow_taxi_data = load_and_concatenate_parquets(file_list)

# Sort the DataFrame and reset the index
yellow_taxi_data = sort_and_reset_index(yellow_taxi_data, 'Pickup_Datetime')


In [13]:
print(len(yellow_taxi_data))
print(yellow_taxi_data.head(5))

print(uber_data.head())
print(len(uber_data))

195000
      Pickup_Datetime  Trip_Distance  Start_Lon  Start_Lat    End_Lon  \
0 2009-01-01 01:17:09            3.7 -73.980560  40.730467 -73.989196   
1 2009-01-01 01:24:09            2.6 -73.978789  40.777265 -73.971543   
2 2009-01-01 01:38:29            0.5 -73.989369  40.735944 -73.987685   
3 2009-01-01 02:29:16            3.1 -73.987969  40.718590 -73.938873   
4 2009-01-01 02:45:46            5.7 -73.948401  40.809091 -73.987866   

     End_Lat  Tip_Amt  
0  40.763682     1.50  
1  40.751308     3.00  
2  40.773752     2.35  
3  40.707808     1.83  
4  40.748045     3.00  
      Pickup_Datetime  pickup_hour  day_of_week  Start_Lon  Start_Lat  \
0 2009-01-01 01:15:22            1            3 -73.981918  40.779456   
1 2009-01-01 01:59:17            1            3 -73.983759  40.721389   
2 2009-01-01 02:05:03            2            3 -73.956635  40.771254   
3 2009-01-01 02:09:13            2            3 -73.984605  40.728020   
4 2009-01-01 02:13:41            2           

## Processing weather data
1. We need 'DATE','HourlyPrecipitation','HourlyWindSpeed'
2. filter the time with minutes 51 as the daily data
3. replace null with 0 and T with 0 for the 'HourlyPrecipitation'

In [14]:
import glob
import pandas as pd
from typing import List


def read_weather_data(file_list: List[str]) -> pd.DataFrame:
    """
    Read CSV files into a DataFrame, filter the required columns,
    and concatenate all the filtered DataFrames together.
    """
    dfs = []
    for file in file_list:
        df = pd.read_csv(file)
        df['DATE'] = pd.to_datetime(df['DATE'])
        df_filtered = df[['DATE', 'HourlyPrecipitation', 'HourlyWindSpeed']]
        dfs.append(df_filtered)
    weather = pd.concat(dfs)
    return weather


def filter_weather_data(weather: pd.DataFrame) -> pd.DataFrame:
    """
    Keep only rows with minute equal to 51, reset index, and modify the 'HourlyPrecipitation' column.
    """
    weather = weather[weather['DATE'].apply(lambda x: x.minute) == 51]
    weather.reset_index(drop=True, inplace=True)
    weather['HourlyPrecipitation'] = weather['HourlyPrecipitation'].fillna(0).replace('T', 0).str.extract('(\d+\.?\d*)').astype(float)
    weather['HourlyPrecipitation'] = weather['HourlyPrecipitation'].replace('T', '0')
    weather['HourlyWindSpeed'] = weather['HourlyWindSpeed'].replace('T', '0')
    weather['HourlyPrecipitation'] = weather['HourlyPrecipitation'].fillna(0).astype(float)
    weather['HourlyWindSpeed'] = weather['HourlyWindSpeed'].fillna(0).astype(float)
    return weather


def save_weather_data(weather: pd.DataFrame, filename: str) -> None:
    """
    Save the modified DataFrame as a new CSV file.
    """
    weather.to_csv(filename, index=False)


# List all the CSV file paths for the years 2009-2015
file_list = []
for year in range(2009, 2016):
    file_list.extend(glob.glob(f"/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/weather_data/*{year}*_weather.csv"))

# Read all CSV files and concatenate the DataFrames
weather = read_weather_data(file_list)

# Apply necessary transformations
weather = filter_weather_data(weather)

# Save the modified DataFrame as a new CSV file
save_weather_data(weather, '/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/weather_data/hourly_weather_2009_2015.csv')


  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  weather['HourlyPrecipitation'] = weather['HourlyPrecipitation'].fillna(0).replace('T', 0).str.extract('(\d+\.?\d*)').astype(float)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  weather['HourlyPrecipitation'] = weather['HourlyPrecipitation'].replace('T', '0')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[

In [15]:
# Check if the file exists in the specified file path

def check_file_exists(file_path: str) -> bool:
    """
    Checks if a file exists in the given file path.

    Args:
        file_path: The file path to check.

    Returns:
        True if the file exists, False otherwise.
    """
    return os.path.isfile('/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/weather_data/hourly_weather_2009_2015.csv')


## daily data type

In [16]:
import pandas as pd
import glob

def read_weather_data(file_path: str) -> pd.DataFrame:
    """
    Read hourly weather data from CSV files for the years 2009-2015 and filter the required columns.

    Args:
        file_path (str): The file path where the weather data CSV files are located.

    Returns:
        pd.DataFrame: A pandas DataFrame containing the filtered weather data.
    """
    # List all the CSV file paths for the yers 2009-2015
    file_list = []
    for year in range(2009, 2016):
        file_list.extend(glob.glob(f"{file_path}/*{year}*_weather.csv"))

    # Read each CSV file into a DataFrame, filter the required columns, and store them in a list
    dfs = []
    for file in file_list:
        df = pd.read_csv(file)
        df['DATE'] = pd.to_datetime(df['DATE'])
        df_filtered = df[['DATE', 'HourlyPrecipitation', 'HourlyWindSpeed']]
        dfs.append(df_filtered)

    # Concatenate all the filtered DataFrames together
    weather = pd.concat(dfs)

    # Convert DATE column to datetime
    weather['DATE'] = pd.to_datetime(weather['DATE'])

    # Keep only rows with minute equal to 51
    weather = weather[weather['DATE'].apply(lambda x: x.minute) == 51]

    # Reset index
    weather.reset_index(drop=True, inplace=True)

    # Drop the 's' at the end of the HourlyWindSpeed column
    weather['HourlyPrecipitation'] = weather['HourlyPrecipitation'].fillna(0).replace('T', 0).str.extract('(\d+\.?\d*)').astype(float)

    # Replace 'T' with 0 in HourlyPrecipitation and HourlyWindSpeed columns
    weather['HourlyPrecipitation'] = weather['HourlyPrecipitation'].replace('T', '0')
    weather['HourlyWindSpeed'] = weather['HourlyWindSpeed'].replace('T', '0')

    # Convert HourlyPrecipitation and HourlyWindSpeed columns to float
    weather['HourlyPrecipitation'] = weather['HourlyPrecipitation'].fillna(0).astype(float)
    weather['HourlyWindSpeed'] = weather['HourlyWindSpeed'].fillna(0).astype(float)

    return weather


def get_daily_weather_data(weather_data: pd.DataFrame) -> pd.DataFrame:
    """
    Group the hourly weather data by day and calculate the mean precipitation and wind speed.

    Args:
        weather_data (pd.DataFrame): A pandas DataFrame containing the hourly weather data.

    Returns:
        pd.DataFrame: A pandas DataFrame containing the daily weather data.
    """
    # Convert DATE column to date
    weather_data['DATE'] = weather_data['DATE'].dt.date

    # Group by date and calculate mean precipitation and wind speed
    precipitation = weather_data.groupby(['DATE']).mean()['HourlyPrecipitation']
    wind_speed = weather_data.groupby(['DATE']).mean()['HourlyWindSpeed']

    # Concatenate mean precipitation and wind speed into a new DataFrame
    daily_weather = pd.concat((precipitation, wind_speed), axis=1)

    # Rename columns
    daily_weather.rename(columns={"HourlyPrecipitation": "DailyPrecipitation", "HourlyWindSpeed": "DailyWindSpeed"}, inplace=True)

    # Reset index
    daily_weather = daily_weather.reset_index()

    return daily_weather

# Read the weather data from CSV files
weather_data = read_weather_data("/Users/Yolanda/CU2023spring/4501 tools of python/project/yellow_taxi_data/weather_data")

# Group the hourly weather data by day and calculate the mean precipitation and wind speed
daily_weather_data = get_daily_weather_data(weather_data)



  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)
  df = pd.read_csv(file)


## All datas

In [17]:
print(uber_data)

           Pickup_Datetime  pickup_hour  day_of_week  Start_Lon  Start_Lat  \
0      2009-01-01 01:15:22            1            3 -73.981918  40.779456   
1      2009-01-01 01:59:17            1            3 -73.983759  40.721389   
2      2009-01-01 02:05:03            2            3 -73.956635  40.771254   
3      2009-01-01 02:09:13            2            3 -73.984605  40.728020   
4      2009-01-01 02:13:41            2            3 -73.980127  40.737425   
...                    ...          ...          ...        ...        ...   
192823 2015-06-30 22:57:53           22            1 -73.971703  40.782207   
192824 2015-06-30 23:16:42           23            1 -74.001099  40.730961   
192825 2015-06-30 23:31:06           23            1 -73.999962  40.733135   
192826 2015-06-30 23:33:33           23            1 -73.980988  40.762020   
192827 2015-06-30 23:40:39           23            1 -73.984795  40.751411   

          End_Lon    End_Lat  Trip_Distance  
0      -73.957685

In [18]:
print(yellow_taxi_data)

           Pickup_Datetime  Trip_Distance  Start_Lon  Start_Lat    End_Lon  \
0      2009-01-01 01:17:09           3.70 -73.980560  40.730467 -73.989196   
1      2009-01-01 01:24:09           2.60 -73.978789  40.777265 -73.971543   
2      2009-01-01 01:38:29           0.50 -73.989369  40.735944 -73.987685   
3      2009-01-01 02:29:16           3.10 -73.987969  40.718590 -73.938873   
4      2009-01-01 02:45:46           5.70 -73.948401  40.809091 -73.987866   
...                    ...            ...        ...        ...        ...   
194995 2015-06-30 23:21:46           1.23 -73.988787  40.753513 -73.989845   
194996 2015-06-30 23:26:10           1.46 -73.946510  40.775932 -73.959635   
194997 2015-06-30 23:41:40           1.15 -73.999917  40.748428 -74.002875   
194998 2015-06-30 23:43:46           3.30 -74.002875  40.734576 -73.986114   
194999 2015-06-30 23:54:28           2.90 -74.007486  40.726290 -73.984196   

          End_Lat  Tip_Amt  
0       40.763682     1.50  
1    

In [19]:
print(weather)

                     DATE  HourlyPrecipitation  HourlyWindSpeed
0     2009-01-01 00:51:00                  0.0             18.0
1     2009-01-01 01:51:00                  0.0             18.0
2     2009-01-01 02:51:00                  0.0             18.0
3     2009-01-01 03:51:00                  0.0              8.0
4     2009-01-01 04:51:00                  0.0             11.0
...                   ...                  ...              ...
60306 2015-12-31 19:51:00                  0.0              6.0
60307 2015-12-31 20:51:00                  0.0             10.0
60308 2015-12-31 21:51:00                  0.0              0.0
60309 2015-12-31 22:51:00                  0.0              7.0
60310 2015-12-31 23:51:00                  0.0              5.0

[60311 rows x 3 columns]


In [20]:
print(daily_weather_data)

            DATE  DailyPrecipitation  DailyWindSpeed
0     2009-01-01            0.000000       11.041667
1     2009-01-02            0.000000        6.083333
2     2009-01-03            0.000000        9.875000
3     2009-01-04            0.000000        7.416667
4     2009-01-05            0.000000        7.000000
...          ...                 ...             ...
2546  2015-12-27            0.005000        5.416667
2547  2015-12-28            0.001250        7.791667
2548  2015-12-29            0.016957        6.782609
2549  2015-12-30            0.007917        4.250000
2550  2015-12-31            0.001250        4.958333

[2551 rows x 3 columns]


# Part 2: Storing Data

##  load in preprocessed datasets.
## create a schema.sql file that defines each table’s scheme

In [75]:
import glob
import pandas as pd
from sqlalchemy import create_engine
from typing import Dict

DATABASE_URL = "sqlite:///project.db"
engine = create_engine(DATABASE_URL)
DATABASE_SCHEMA_FILE = "database_schema.sql"

In [76]:
HOURLY_WEATHER_SCHEMA = """
CREATE TABLE IF NOT EXISTS hourly_weather
(
    id INTEGER PRIMARY KEY,
    DATE DATE,
    HourlyPrecipitation FLOAT,
    HourlyWindSpeed FLOAT
)
"""

DAILY_WEATHER_SCHEMA = """
CREATE TABLE IF NOT EXISTS daily_weather
(
    id INTEGER PRIMARY KEY,
    DATE DATE,
    DailyPrecipitation FLOAT,
    DailyWindSpeed FLOAT
)
"""

TAXI_TRIPS_SCHEMA = """
CREATE TABLE IF NOT EXISTS taxi_trips
(
    id INTEGER PRIMARY KEY,
    Pickup_Datetime DATETIME,
    Tip_Amt FLOAT,
    Start_Lon FLOAT,
    Start_Lat FLOAT,
    End_Lon FLOAT,
    End_Lat FLOAT,
    Trip_Distance FLOAT
)
"""

UBER_TRIPS_SCHEMA= """
CREATE TABLE IF NOT EXISTS uber_trips
(
    id INTEGER PRIMARY KEY,
    Pickup_Datetime DATETIME,
    Start_Lon FLOAT,
    Start_Lat FLOAT,
    End_Lon FLOAT,
    End_Lat FLOAT,
    Trip_Distance FLOAT,
    day_of_week INTEGER,
    pickup_hour INTEGER
)
"""


In [77]:
    """
    Create database schema based on the schema file.
    """
    with open(DATABASE_SCHEMA_FILE, "w") as f:
        f.write(HOURLY_WEATHER_SCHEMA + ";\n")
        f.write(DAILY_WEATHER_SCHEMA + ";\n")
        f.write(TAXI_TRIPS_SCHEMA + ";\n")
        f.write(UBER_TRIPS_SCHEMA + ";\n")

    # create the tables with the schema files
    with engine.connect() as connection:
        connection.execute(HOURLY_WEATHER_SCHEMA)
        connection.execute(DAILY_WEATHER_SCHEMA)
        connection.execute(TAXI_TRIPS_SCHEMA)
        connection.execute(UBER_TRIPS_SCHEMA)


In [78]:
def write_dataframes_to_table(table_to_df_dict: Dict[str, pd.DataFrame]) -> None:
    """
    Writes data from a dictionary of table names to dataframes
    to their respective tables in the SQLite database.

    Args:
        table_to_df_dict (Dict[str, pd.DataFrame]): A dictionary containing the table names and dataframes.

    Returns:
        None
    """
    # Write data from each dataframe to their respective tables in the database
    for table_name, df in table_to_df_dict.items():
        try:
            df.to_sql(table_name, con=engine, if_exists='replace', index=False)
            print(f"Data successfully written to table {table_name}.")
        except Exception as e:
            print(f"Error writing data to table {table_name}: {str(e)}")


In [79]:
# Example dictionary with table names and dataframes
tables_to_df_dict = {
    "hourly_weather": weather,
    "daily_weather": daily_weather_data,
    "taxi_trips": yellow_taxi_data,
    "uber_trips": uber_data
}

# Write the dataframes to the tables in the database
write_dataframes_to_table(tables_to_df_dict)


Data successfully written to table hourly_weather.
Data successfully written to table daily_weather.
Data successfully written to table taxi_trips.
Data successfully written to table uber_trips.


## Test the database and written

In [80]:
!sqlite3 project.db < database_schema.sql

In [81]:
#Check that the data has been successfully written to the tables by running a sample query
# Execute a query and read the results into a DataFrame object
with engine.connect() as connection:
    df = pd.read_sql_query("SELECT * FROM hourly_weather", connection)

# Print the DataFrame
print(df)


                             DATE  HourlyPrecipitation  HourlyWindSpeed
0      2009-01-01 00:51:00.000000                  0.0             18.0
1      2009-01-01 01:51:00.000000                  0.0             18.0
2      2009-01-01 02:51:00.000000                  0.0             18.0
3      2009-01-01 03:51:00.000000                  0.0              8.0
4      2009-01-01 04:51:00.000000                  0.0             11.0
...                           ...                  ...              ...
60306  2015-12-31 19:51:00.000000                  0.0              6.0
60307  2015-12-31 20:51:00.000000                  0.0             10.0
60308  2015-12-31 21:51:00.000000                  0.0              0.0
60309  2015-12-31 22:51:00.000000                  0.0              7.0
60310  2015-12-31 23:51:00.000000                  0.0              5.0

[60311 rows x 3 columns]
