## Part 1: Data Preprocessing

In [1]:
import os
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import re
from typing import List

### 1.1 Downloading

In [2]:
def get_parquet_links(url: str, keyword: str, start_date: str = "2020-01", end_date: str = "2024-08") -> List[str]:
    """
    Fetch all Parquet file links from a webpage that match the specified keyword 
    and fall within the given date range (YYYY-MM format).
    """
    start_dt = datetime.strptime(start_date, "%Y-%m")
    end_dt = datetime.strptime(end_date, "%Y-%m")

    response = requests.get(url)  
    soup = BeautifulSoup(response.text, "html.parser")
    links = soup.find_all("a", href=True)  # Find all <a> tags with href attribute
    parquet_links = []

    for link in links:
        href = link['href'].strip() #Remove extra spaces around href
        # Check if the link contains the keyword and ends with .parquet
        if keyword in href and href.endswith('.parquet'):
            # Extract the date in YYYY-MM format from the file name
            date_match = re.search(r'(\d{4}-\d{2})', href)
            if date_match:
                file_date = datetime.strptime(date_match.group(1), "%Y-%m")
                # Check if the file date falls within the specified range
                if start_dt <= file_date <= end_dt:
                    parquet_links.append(href)
    return parquet_links


In [3]:
# Function to download the Parquet files from the provided links to the specified directory
def download_parquet_files(parquet_links: List[str], download_directory: str) -> None:
    for idx, file_url in enumerate(parquet_links):
        # Generate the complete path for the file
        file_name = file_url.split("/")[-1]
        file_path = os.path.join(download_directory, file_name)
        # If the file already exists, skip the download
        if os.path.exists(file_path):
            print(f"File {file_name} already downloaded, skipping.")
            continue
        # Download the file
        response = requests.get(file_url, stream=True)
        if response.status_code == 200:
            with open(file_path, "wb") as f:
                f.write(response.content)
            print(f"Downloaded {file_name} successfully.")

# Main function to download Parquet data
def download_parquet_data(url: str, keyword: str, download_directory: str) -> None:
    # Get the links for Parquet files containing the keyword
    parquet_links = get_parquet_links(url, keyword,start_date="2020-01", end_date="2024-08")
    if parquet_links:
        # Download the files that are not already present in the directory
        download_parquet_files(parquet_links, download_directory)
    else:
        print(f"No matching Parquet files found for keyword: {keyword}")

In [4]:
url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"  

# Download "Yellow Taxi" data
keyword_yellow_taxi = "yellow"
download_directory_yellow_taxi = r"E:\2024 Fall Academic\Tools for Analytics\Project\Yellow Taxi"  # Folder to store downloaded files
download_parquet_data(url, keyword_yellow_taxi, download_directory_yellow_taxi)


# Download "High Volume For-Hire Vehicle" data
keyword_hvfhv = "fhvhv"
download_directory_hvfhv = r"E:\2024 Fall Academic\Tools for Analytics\Project\HVFHV"  # Folder to store downloaded files
download_parquet_data(url, keyword_hvfhv, download_directory_hvfhv)


File yellow_tripdata_2024-01.parquet already downloaded, skipping.
File yellow_tripdata_2024-02.parquet already downloaded, skipping.
File yellow_tripdata_2024-03.parquet already downloaded, skipping.
File yellow_tripdata_2024-04.parquet already downloaded, skipping.
File yellow_tripdata_2024-05.parquet already downloaded, skipping.
File yellow_tripdata_2024-06.parquet already downloaded, skipping.
File yellow_tripdata_2024-07.parquet already downloaded, skipping.
File yellow_tripdata_2024-08.parquet already downloaded, skipping.
File yellow_tripdata_2023-01.parquet already downloaded, skipping.
File yellow_tripdata_2023-02.parquet already downloaded, skipping.
File yellow_tripdata_2023-03.parquet already downloaded, skipping.
File yellow_tripdata_2023-04.parquet already downloaded, skipping.
File yellow_tripdata_2023-05.parquet already downloaded, skipping.
File yellow_tripdata_2023-06.parquet already downloaded, skipping.
File yellow_tripdata_2023-07.parquet already downloaded, skipp

### 1.2 Data Sampling

In [5]:
import os
import numpy as np
import pandas as pd

In [6]:
# Function to calculate the sample size using Cochran's formula
def calculate_sample_size(population_size: int, e: float = 0.05, p: float = 0.5) -> int:
    Z = 1.96  # Z value for 95% confidence level
    numerator = Z**2 * p * (1 - p)
    denominator = e**2 * (population_size - 1) + Z**2 * p * (1 - p)
    sample_size = (numerator / denominator) * population_size
    return int(np.ceil(sample_size))  # Round up to ensure enough sample size

In [7]:
# Function to perform sampling for each Parquet file
def sample_parquet_file(file_path: str, output_directory: str) -> None:
    # Read the Parquet file into a DataFrame
    df = pd.read_parquet(file_path)
    
    # Get the population size (total number of rows in the file)
    population_size = len(df)
    
    # Calculate the required sample size
    sample_size = calculate_sample_size(population_size)
    print(f"File: {file_path} - Total records: {population_size}, Sample size: {sample_size}")
    
    # Perform random sampling
    sampled_data = df.sample(n=sample_size, random_state=40)
    
    # Save the sampled data to a new file in the output directory
    file_name = os.path.basename(file_path)
    sampled_file_path = os.path.join(output_directory, f"sampled_{file_name}")
    sampled_data.to_parquet(sampled_file_path, compression='snappy')
    print(f"Sampled data saved to: {sampled_file_path}")

In [8]:
# Main function to sample all Parquet files in a directory
def sample_all_parquet_files(input_directory: str, output_directory: str) -> None:    
    # Loop through each file in the input directory
    for file_name in os.listdir(input_directory):
        file_path = os.path.join(input_directory, file_name)
        
        # If the file is already in the output directory, skip the sampling
        if os.path.exists(os.path.join(output_directory, f"sampled_{file_name}")):
            print(f"File {file_name} already sampled, skipping.")
            continue
        
        # Check if the file is a Parquet file
        if file_name.endswith('.parquet'):
            # Sample the Parquet file and save the result
            sample_parquet_file(file_path, output_directory)

In [9]:
def merge_sampled_parquet_files(input_directory: str) -> pd.DataFrame:
    dataframes = []
    # Iterate over all files in the input directory
    for file_name in os.listdir(input_directory):
        if file_name.endswith('.parquet'):  # Process only Parquet files
            file_path = os.path.join(input_directory, file_name)
            print(f"Reading file: {file_path}")
            # Read the Parquet file into a DataFrame
            df = pd.read_parquet(file_path)
            # Append the DataFrame to the list
            dataframes.append(df)
    # Concatenate all DataFrames in the list
    combined_dataframe = pd.concat(dataframes, ignore_index=True)
    print(f"Successfully merged {len(dataframes)} files into a single DataFrame.")
    return combined_dataframe

#### 1.2.1 Sampling yellow taxi data

In [10]:
input_directory = "E:/2024 Fall Academic/Tools for Analytics/Project/Yellow Taxi"  
output_directory = "E:/2024 Fall Academic/Tools for Analytics/Project/Yellow Taxi Sample" 

# Perform sampling on all Parquet files in the input directory
sample_all_parquet_files(input_directory, output_directory)
# Merge all sampled Parquet files into a single DataFrame
yellow_taxi_df = merge_sampled_parquet_files(output_directory)

yellow_taxi_df=yellow_taxi_df.dropna(axis=1, how="all")
yellow_taxi_df

File yellow_tripdata_2020-01.parquet already sampled, skipping.
File yellow_tripdata_2020-02.parquet already sampled, skipping.
File yellow_tripdata_2020-03.parquet already sampled, skipping.
File yellow_tripdata_2020-04.parquet already sampled, skipping.
File yellow_tripdata_2020-05.parquet already sampled, skipping.
File yellow_tripdata_2020-06.parquet already sampled, skipping.
File yellow_tripdata_2020-07.parquet already sampled, skipping.
File yellow_tripdata_2020-08.parquet already sampled, skipping.
File yellow_tripdata_2020-09.parquet already sampled, skipping.
File yellow_tripdata_2020-10.parquet already sampled, skipping.
File yellow_tripdata_2020-11.parquet already sampled, skipping.
File yellow_tripdata_2020-12.parquet already sampled, skipping.
File yellow_tripdata_2021-01.parquet already sampled, skipping.
File yellow_tripdata_2021-02.parquet already sampled, skipping.
File yellow_tripdata_2021-03.parquet already sampled, skipping.
File yellow_tripdata_2021-04.parquet alr

  combined_dataframe = pd.concat(dataframes, ignore_index=True)


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,Airport_fee
0,2,2020-01-21 18:56:18,2020-01-21 19:08:41,1.0,2.27,1.0,N,170,141,1,10.0,1.0,0.5,1.70,0.00,0.3,16.00,2.5,,
1,2,2020-01-20 15:55:43,2020-01-20 16:00:45,2.0,0.87,1.0,N,164,170,1,5.5,0.0,0.5,1.00,0.00,0.3,9.80,2.5,,
2,2,2020-01-22 20:05:14,2020-01-22 20:13:53,1.0,0.76,1.0,N,162,229,1,7.0,0.5,0.5,2.16,0.00,0.3,12.96,2.5,,
3,1,2020-01-22 07:04:51,2020-01-22 07:54:32,1.0,0.00,1.0,N,159,89,1,46.2,0.0,0.5,0.00,6.12,0.3,53.12,0.0,,
4,2,2020-01-22 06:58:21,2020-01-22 07:01:47,2.0,0.75,1.0,N,100,50,1,4.5,0.0,0.5,1.95,0.00,0.3,9.75,2.5,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
21551,2,2024-08-08 22:16:46,2024-08-08 22:31:47,2.0,2.37,1.0,N,68,163,2,15.6,1.0,0.5,0.00,0.00,1.0,20.60,2.5,,0.00
21552,2,2024-08-24 18:53:23,2024-08-24 19:11:06,1.0,8.12,1.0,N,138,263,1,32.4,5.0,0.5,0.00,6.94,1.0,50.09,2.5,,1.75
21553,2,2024-08-18 18:21:03,2024-08-18 18:37:13,1.0,1.86,1.0,N,170,90,1,14.9,0.0,0.5,3.78,0.00,1.0,22.68,2.5,,0.00
21554,2,2024-08-15 06:34:03,2024-08-15 06:42:49,2.0,1.64,1.0,N,161,239,1,11.4,0.0,0.5,3.08,0.00,1.0,18.48,2.5,,0.00


In [11]:
filepath = r"E:/2024 Fall Academic/Tools for Analytics/Project/yellow_taxi_sampled.csv"
if not os.path.exists(filepath):
    # If it doesn't exist, save the dataframe to the file
    yellow_taxi_df.to_csv(filepath, index=False)
    print("File saved successfully.")
else:
    print("File already exists. Skipping save.")

File already exists. Skipping save.


#### 1.2.2 Sampling Uber data

In [12]:
input_directory = "E:/2024 Fall Academic/Tools for Analytics/Project/HVFHV"  # Folder containing original Parquet files
output_directory = "E:/2024 Fall Academic/Tools for Analytics/Project/HVFHV Sample"  # Folder to save the sampled Parquet files

# Perform sampling on all Parquet files in the input directory
sample_all_parquet_files(input_directory, output_directory)
# Merge all sampled Parquet files into a single DataFrame
HVFHV_df = merge_sampled_parquet_files(output_directory)

HVFHV_df=HVFHV_df.dropna(axis=1, how="all")
HVFHV_df

File fhvhv_tripdata_2020-01.parquet already sampled, skipping.
File fhvhv_tripdata_2020-02.parquet already sampled, skipping.
File fhvhv_tripdata_2020-03.parquet already sampled, skipping.
File fhvhv_tripdata_2020-04.parquet already sampled, skipping.
File fhvhv_tripdata_2020-05.parquet already sampled, skipping.
File fhvhv_tripdata_2020-06.parquet already sampled, skipping.
File fhvhv_tripdata_2020-07.parquet already sampled, skipping.
File fhvhv_tripdata_2020-08.parquet already sampled, skipping.
File fhvhv_tripdata_2020-09.parquet already sampled, skipping.
File fhvhv_tripdata_2020-10.parquet already sampled, skipping.
File fhvhv_tripdata_2020-11.parquet already sampled, skipping.
File fhvhv_tripdata_2020-12.parquet already sampled, skipping.
File fhvhv_tripdata_2021-01.parquet already sampled, skipping.
File fhvhv_tripdata_2021-02.parquet already sampled, skipping.
File fhvhv_tripdata_2021-03.parquet already sampled, skipping.
File fhvhv_tripdata_2021-04.parquet already sampled, sk

  combined_dataframe = pd.concat(dataframes, ignore_index=True)


Unnamed: 0,hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,...,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
0,HV0003,B02872,B02872,2020-01-25 06:32:19,2020-01-25 06:38:11,2020-01-25 06:38:42,2020-01-25 06:44:40,32,18,1.860,...,0.50,0.00,,0.00,5.39,Y,N,,N,N
1,HV0005,B02510,,2020-01-23 01:44:51,NaT,2020-01-23 01:49:21,2020-01-23 02:12:36,88,7,9.211,...,2.61,2.75,,0.00,21.60,N,N,N,N,N
2,HV0003,B02872,B02872,2020-01-06 18:16:36,2020-01-06 18:16:51,2020-01-06 18:19:01,2020-01-06 18:29:42,92,53,2.090,...,0.59,0.00,,0.00,7.57,N,N,,N,N
3,HV0003,B02872,B02872,2020-01-22 21:25:56,2020-01-22 21:28:29,2020-01-22 21:29:23,2020-01-22 21:38:26,161,50,1.520,...,0.77,2.75,,0.00,6.14,N,N,,N,N
4,HV0005,B02510,,2020-01-24 17:02:22,NaT,2020-01-24 17:05:01,2020-01-24 17:10:08,132,132,2.289,...,0.88,0.00,,5.00,5.39,N,N,N,N,N
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
21555,HV0003,B03404,B03404,2024-08-14 19:46:30,2024-08-14 19:51:21,2024-08-14 19:53:23,2024-08-14 20:09:17,188,181,2.370,...,1.57,0.00,0.0,0.00,12.49,N,N,N,N,N
21556,HV0003,B03404,B03404,2024-08-06 21:14:28,2024-08-06 21:22:51,2024-08-06 21:23:07,2024-08-06 21:37:52,158,13,2.610,...,3.35,2.75,0.0,4.49,20.40,N,N,N,N,N
21557,HV0003,B03404,B03404,2024-08-17 13:26:17,2024-08-17 13:27:41,2024-08-17 13:28:59,2024-08-17 13:46:35,82,157,2.640,...,1.58,0.00,0.0,0.00,13.85,N,N,N,N,N
21558,HV0005,B03406,,2024-08-31 11:57:25,NaT,2024-08-31 12:21:53,2024-08-31 12:37:53,238,262,2.275,...,2.06,2.75,0.0,4.29,19.20,N,N,N,N,N


In [13]:
filepath = r"E:/2024 Fall Academic/Tools for Analytics/Project/HVFHV_sampled.csv"
if not os.path.exists(filepath):
    # If it doesn't exist, save the dataframe to the file
    HVFHV_df.to_csv(filepath, index=False)
    print("File saved successfully.")
else:
    print("File already exists. Skipping save.")

File already exists. Skipping save.
