In [75]:
# fetch the file from the server

import requests
import sys
from pathlib import Path

def fetch_raw_data(url, file_name, repo_dir='../data/raw'):
    """
    Downloads the file from the provided URL if it doesn't already exist in the directory.
    
    Args:
    - url: The URL from which the file will be downloaded.
    - file_name: The name with which the file will be saved.
    - repo_dir: The directory where the file will be saved (default: '../data/raw').
    
    Returns:
    - None
    """
    file_path = Path(repo_dir) / file_name

    # Check if the file already exists
    if file_path.exists():
        print(f'{file_name} already exists. Skipping download.')
        return

    # Attempt to download the file
    response = requests.get(url)
    
    # Check if the download was successful
    if response.status_code == 200:
        Path(repo_dir).mkdir(parents=True, exist_ok=True)
        with open(file_path, 'wb') as f:
            f.write(response.content)
        print(f'{file_name} downloaded successfully to {repo_dir}')
    else:
        raise Exception(f'Status code error: {response.status_code} - File not available!')


def download_yellow_monthly_data(year, month):
    url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month}.parquet'
    file_name = f'yellow_tripdata_{year}-{month}.parquet'
    fetch_raw_data(url, file_name)

def download_green_monthly_data(year, month):
    url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_{year}-{month}.parquet'
    file_name = f'green_tripdata_{year}-{month}.parquet'
    fetch_raw_data(url, file_name)

def download_fhv_monthly_data(year, month):
    url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_{year}-{month}.parquet'
    file_name = f'fhv_tripdata_{year}-{month}.parquet'
    fetch_raw_data(url, file_name)

def download_fhvhv_monthly_data(year, month):
    url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_{year}-{month}.parquet'
    file_name = f'fhvhv_tripdata_{year}-{month}.parquet'
    fetch_raw_data(url, file_name)

def nyc_data_extrator(year, month):
    download_yellow_monthly_data(year, month)
    download_green_monthly_data(year, month)
    download_fhv_monthly_data(year, month)
    download_fhvhv_monthly_data(year, month)

if __name__ == '__main__':
    years = ['2021', '2022', '2023']
    months = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']

    for year in years:
        for month in months:
            nyc_data_extrator(year, month)


yellow_tripdata_2021-01.parquet already exists. Skipping download.
green_tripdata_2021-01.parquet already exists. Skipping download.
fhv_tripdata_2021-01.parquet already exists. Skipping download.
fhvhv_tripdata_2021-01.parquet already exists. Skipping download.
yellow_tripdata_2021-02.parquet already exists. Skipping download.
green_tripdata_2021-02.parquet already exists. Skipping download.
fhv_tripdata_2021-02.parquet already exists. Skipping download.
fhvhv_tripdata_2021-02.parquet already exists. Skipping download.
yellow_tripdata_2021-03.parquet already exists. Skipping download.
green_tripdata_2021-03.parquet already exists. Skipping download.
fhv_tripdata_2021-03.parquet already exists. Skipping download.
fhvhv_tripdata_2021-03.parquet already exists. Skipping download.
yellow_tripdata_2021-04.parquet already exists. Skipping download.
green_tripdata_2021-04.parquet downloaded successfully to ../data/raw
fhv_tripdata_2021-04.parquet downloaded successfully to ../data/raw
fhvhv_

KeyboardInterrupt: 

In [12]:
columns = [
    'VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge'
    ]

important_columns = ['tpep_pickup_datetime', 'PULocationID']

In [40]:
import pandas as pd
params = {'year': '2021', 'month': '01'}

df = pd.read_parquet('../data/raw/yellow_tripdata_{year}-{month}.parquet'.format(**params), columns=columns)

In [42]:
df_green = pd.read_parquet('../data/raw/green_tripdata_{year}-{month}.parquet'.format(**params))

In [43]:
df_green

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2021-01-01 00:15:56,2021-01-01 00:19:52,N,1.0,43,151,1.0,1.01,5.50,0.50,0.5,0.00,0.00,,0.3,6.80,2.0,1.0,0.00
1,2,2021-01-01 00:25:59,2021-01-01 00:34:44,N,1.0,166,239,1.0,2.53,10.00,0.50,0.5,2.81,0.00,,0.3,16.86,1.0,1.0,2.75
2,2,2021-01-01 00:45:57,2021-01-01 00:51:55,N,1.0,41,42,1.0,1.12,6.00,0.50,0.5,1.00,0.00,,0.3,8.30,1.0,1.0,0.00
3,2,2020-12-31 23:57:51,2021-01-01 00:04:56,N,1.0,168,75,1.0,1.99,8.00,0.50,0.5,0.00,0.00,,0.3,9.30,2.0,1.0,0.00
4,2,2021-01-01 00:16:36,2021-01-01 00:16:40,N,2.0,265,265,3.0,0.00,-52.00,0.00,-0.5,0.00,0.00,,-0.3,-52.80,3.0,1.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
76513,2,2021-01-31 21:38:00,2021-01-31 22:16:00,,,81,90,,17.63,56.23,2.75,0.0,0.00,6.12,,0.3,65.40,,,
76514,2,2021-01-31 22:43:00,2021-01-31 23:21:00,,,35,213,,18.36,46.66,0.00,0.0,12.20,6.12,,0.3,65.28,,,
76515,2,2021-01-31 22:16:00,2021-01-31 22:27:00,,,74,69,,2.50,18.95,2.75,0.0,0.00,0.00,,0.3,22.00,,,
76516,2,2021-01-31 23:10:00,2021-01-31 23:37:00,,,168,215,,14.48,48.87,2.75,0.0,0.00,6.12,,0.3,58.04,,,


In [44]:
df_fhv = pd.read_parquet('../data/raw/fhv_tripdata_{year}-{month}.parquet'.format(**params))

In [45]:
df_fhv

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2021-01-01 00:27:00,2021-01-01 00:44:00,,,,B00009
1,B00009,2021-01-01 00:50:00,2021-01-01 01:07:00,,,,B00009
2,B00013,2021-01-01 00:01:00,2021-01-01 01:51:00,,,,B00013
3,B00037,2021-01-01 00:13:09,2021-01-01 00:21:26,,72.0,,B00037
4,B00037,2021-01-01 00:38:31,2021-01-01 00:53:44,,61.0,,B00037
...,...,...,...,...,...,...,...
1154107,B03266,2021-01-31 23:43:03,2021-01-31 23:51:48,7.0,7.0,,B03266
1154108,B03284,2021-01-31 23:50:27,2021-02-01 00:48:03,44.0,91.0,,
1154109,B03285,2021-01-31 23:13:46,2021-01-31 23:29:58,171.0,171.0,,B03285
1154110,B03285,2021-01-31 23:58:03,2021-02-01 00:17:29,15.0,15.0,,B03285


In [54]:
df_fhv = pd.read_parquet('../data/raw/fhv_tripdata_{year}-{month}.parquet'.format(**params))

In [55]:
df_fhv

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2021-01-01 00:27:00,2021-01-01 00:44:00,,,,B00009
1,B00009,2021-01-01 00:50:00,2021-01-01 01:07:00,,,,B00009
2,B00013,2021-01-01 00:01:00,2021-01-01 01:51:00,,,,B00013
3,B00037,2021-01-01 00:13:09,2021-01-01 00:21:26,,72.0,,B00037
4,B00037,2021-01-01 00:38:31,2021-01-01 00:53:44,,61.0,,B00037
...,...,...,...,...,...,...,...
1154107,B03266,2021-01-31 23:43:03,2021-01-31 23:51:48,7.0,7.0,,B03266
1154108,B03284,2021-01-31 23:50:27,2021-02-01 00:48:03,44.0,91.0,,
1154109,B03285,2021-01-31 23:13:46,2021-01-31 23:29:58,171.0,171.0,,B03285
1154110,B03285,2021-01-31 23:58:03,2021-02-01 00:17:29,15.0,15.0,,B03285


In [11]:
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.10,1.0,N,142,43,2,8.00,3.00,0.5,0.00,0.00,0.3,11.80,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.20,1.0,N,238,151,2,3.00,0.50,0.5,0.00,0.00,0.3,4.30,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.70,1.0,N,132,165,1,42.00,0.50,0.5,8.65,0.00,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.60,1.0,N,138,132,1,29.00,0.50,0.5,6.05,0.00,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.50,0.50,0.5,4.06,0.00,0.3,24.36,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1369764,2,2021-01-31 23:03:00,2021-01-31 23:33:00,,8.89,,,229,181,0,27.78,0.00,0.5,7.46,0.00,0.3,38.54,
1369765,2,2021-01-31 23:29:00,2021-01-31 23:51:00,,7.43,,,41,70,0,32.58,0.00,0.5,0.00,6.12,0.3,39.50,
1369766,2,2021-01-31 23:25:00,2021-01-31 23:38:00,,6.26,,,74,137,0,16.85,0.00,0.5,3.90,0.00,0.3,24.05,
1369767,6,2021-01-31 23:01:06,2021-02-01 00:02:03,,19.70,,,265,188,0,53.68,0.00,0.5,0.00,0.00,0.3,54.48,


In [22]:
important_df = df[important_columns].copy()

In [23]:
important_df['tpep_pickup_datetime']
important_df['tpep_pickup_datetime'] = pd.to_datetime(important_df['tpep_pickup_datetime'])

In [24]:
year = int(params['year'])
month = int(params['month'])

outside_range = important_df[
    (important_df['tpep_pickup_datetime'].dt.year != year) |
    (important_df['tpep_pickup_datetime'].dt.month != month)
]

In [29]:
import pandas as pd

def check_date_range(df, year, month, date_column='tpep_pickup_datetime'):
    """
    Checks if the dates in the specified column are within the given year and month.
    
    Args:
    - df: The pandas DataFrame containing the data.
    - year: The expected year for the data.
    - month: The expected month for the data.
    - date_column: The column name that contains the datetime data (default is 'tpep_pickup_datetime').
    
    Returns:
    - filtered_df: DataFrame containing rows that match the specified year and month.
    - outside_range: DataFrame containing rows outside the specified year and month.
    """
    
    # Convert the date_column to datetime if it's not already in datetime format
    df[date_column] = pd.to_datetime(df[date_column])

    # Find rows where the year and month do not match
    outside_range = df[
        (df[date_column].dt.year != year) | 
        (df[date_column].dt.month != month)
    ]
    
    if not outside_range.empty:
        print(True)  # Found data outside the expected range
        
        # Filter to keep only the rows within the expected year and month
        filtered_df = df[
            (df[date_column].dt.year == year) & 
            (df[date_column].dt.month == month)
        ]
        print(f'{outside_range.shape[0]} rows were outside the specified year-month and have been filtered.')
    else:
        print(False)  # No data outside the expected range
        filtered_df = df  # No filtering needed if all rows match

    return filtered_df




True
24 rows were outside the specified year-month and have been filtered.


In [37]:
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.10,1.0,N,142,43,2,8.00,3.00,0.5,0.00,0.00,0.3,11.80,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.20,1.0,N,238,151,2,3.00,0.50,0.5,0.00,0.00,0.3,4.30,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.70,1.0,N,132,165,1,42.00,0.50,0.5,8.65,0.00,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.60,1.0,N,138,132,1,29.00,0.50,0.5,6.05,0.00,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.50,0.50,0.5,4.06,0.00,0.3,24.36,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1369764,2,2021-01-31 23:03:00,2021-01-31 23:33:00,,8.89,,,229,181,0,27.78,0.00,0.5,7.46,0.00,0.3,38.54,
1369765,2,2021-01-31 23:29:00,2021-01-31 23:51:00,,7.43,,,41,70,0,32.58,0.00,0.5,0.00,6.12,0.3,39.50,
1369766,2,2021-01-31 23:25:00,2021-01-31 23:38:00,,6.26,,,74,137,0,16.85,0.00,0.5,3.90,0.00,0.3,24.05,
1369767,6,2021-01-31 23:01:06,2021-02-01 00:02:03,,19.70,,,265,188,0,53.68,0.00,0.5,0.00,0.00,0.3,54.48,


In [32]:
import os

def save_filtered_data(df, original_name, output_dir='../data/filtered'):
    """
    Saves the filtered DataFrame to a parquet file with a prefix 'filtered_'.
    
    Args:
    - df: The filtered pandas DataFrame.
    - original_name: The original name of the data file (used for constructing the output filename).
    - output_dir: The directory where the filtered file will be saved (default is '../data/filtered').
    
    Returns:
    - None
    """
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)
    
    # Construct the new filename by adding the prefix 'filtered_' to the original filename
    filtered_filename = f'filtered_{original_name}.parquet'
    output_path = os.path.join(output_dir, filtered_filename)
    
    # Save the DataFrame to a parquet file
    df.to_parquet(output_path)
    print(f'Filtered data saved to {output_path}')

In [33]:
# Example usage
params = {'year': 2021, 'month': 1}
columns = ['tpep_pickup_datetime', 'PULocationID']


filtered_df = check_date_range(important_df, params['year'], params['month'])
# Save the filtered data
original_filename = f'yellow_tripdata_{params["year"]}-{params["month"]:02d}'
save_filtered_data(filtered_df, original_filename)

True
24 rows were outside the specified year-month and have been filtered.
Filtered data saved to ../data/filtered/filtered_yellow_tripdata_2021-01.parquet


In [76]:
import pandas as pd
import os

def check_date_range(df, year, month, date_column):
    """
    Checks if the dates in the specified column are within the given year and month.
    
    Args:
    - df: The pandas DataFrame containing the data.
    - year: The expected year for the data.
    - month: The expected month for the data.
    - date_column: The column name that contains the datetime data (default is 'pickup_datetime').
    
    Returns:
    - filtered_df: DataFrame containing rows that match the specified year and month.
    """
    
    # Step 1: Rename the datetime column to 'pickup_datetime'
    df = df.rename(columns={date_column: 'pickup_datetime'})

    # Convert the renamed 'pickup_datetime' column to datetime
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])

    # Find rows where the year and month do not match
    outside_range = df[
        (df['pickup_datetime'].dt.year != year) | 
        (df['pickup_datetime'].dt.month != month)
    ]
    
    if not outside_range.empty:
        print(True)  # Found data outside the expected range
        
        # Filter to keep only the rows within the expected year and month
        filtered_df = df[
            (df['pickup_datetime'].dt.year == year) & 
            (df['pickup_datetime'].dt.month == month)
        ]
        print(f'{outside_range.shape[0]} rows were outside the specified year-month and have been filtered.')
    else:
        print('No data outside the expected range')
        filtered_df = df  # No filtering needed if all rows match

    return filtered_df


def data_filter(df, file_type):
    """
    Adjusts the datetime column, filters important columns, and drops rows with NaN values.
    
    Args:
    - df: The pandas DataFrame.
    - file_type: The type of file being processed (e.g., 'fhv_tripdata', 'yellow_tripdata').
    
    Returns:
    - Filtered DataFrame with the adjusted datetime column and no NaN values in important columns.
    """
    # Step 1: Rename the datetime column to 'pickup_datetime'
    df = df.rename(columns={'pickup_datetime': 'pickup_datetime'})
    
    # Handle the special case for 'fhv_tripdata' where the column name is 'PUlocationID' instead of 'PULocationID'
    if file_type == 'fhv_tripdata':
        df = df.rename(columns={'PUlocationID': 'PULocationID'})
    
    # Step 2: Filter to keep only the important columns ('pickup_datetime' and 'PULocationID')
    df = df[['pickup_datetime', 'PULocationID']]
    
    # Step 3: Drop rows with NaN values in 'pickup_datetime' or 'PULocationID'
    df = df.dropna(subset=['pickup_datetime', 'PULocationID'])
    
    return df


def save_filtered_data(df, original_name, output_dir='../data/filtered'):
    """
    Saves the filtered DataFrame to a parquet file with a prefix 'filtered_'.
    
    Args:
    - df: The filtered pandas DataFrame.
    - original_name: The original name of the data file (used for constructing the output filename).
    - output_dir: The directory where the filtered file will be saved (default is '../data/filtered').
    
    Returns:
    - None
    """
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)
    
    # Construct the new filename by adding the prefix 'filtered_' to the original filename
    filtered_filename = f'filtered_{original_name}.parquet'
    output_path = os.path.join(output_dir, filtered_filename)
    
    # Save the DataFrame to a parquet file
    df.to_parquet(output_path)
    print(f'Filtered data saved to {output_path}')


def process_file(filename, input_dir='../data/raw', output_dir='../data/filtered'):
    """
    Processes a single parquet file by filtering data, removing NaN values, and saving the result.
    
    Args:
    - filename: The name of the file to process.
    - input_dir: The directory where the parquet file is located.
    - output_dir: The directory where the filtered parquet file will be saved.
    
    Returns:
    - None
    """
    try:
        # Construct the filtered file name and path
        filtered_filename = f'filtered_{filename}'
        filtered_file_path = os.path.join(output_dir, filtered_filename)

        # Check if the filtered file already exists
        if Path(filtered_file_path).exists():
            print(f'{filtered_filename} already exists. Skipping processing.')
            return  # Skip this file since it's already processed
        
        # Known patterns and corresponding original date columns
        file_patterns = {
            'yellow_tripdata': 'tpep_pickup_datetime',
            'green_tripdata': 'lpep_pickup_datetime',
            'fhv_tripdata': 'pickup_datetime',   # Corrected for fhv files
            'fhvhv_tripdata': 'pickup_datetime'
        }
        
        for pattern, original_date_column in file_patterns.items():
            if filename.startswith(pattern):
                try:
                    # Extract year and month from the filename
                    parts = filename.split('_')
                    file_year, file_month = parts[-1].split('.')[0].split('-')
                    file_year = int(file_year)
                    file_month = int(file_month)

                    # Load the parquet file
                    file_path = os.path.join(input_dir, filename)
                    df = pd.read_parquet(file_path)

                    # Check if the dates are within the expected range
                    df = check_date_range(df, file_year, file_month, original_date_column)
                    
                    # Apply data filtering (adjust datetime, filter columns, drop NaN)
                    df = data_filter(df, pattern)  # Pass the file pattern to handle column name differences
                    
                    # Save the filtered DataFrame
                    save_filtered_data(df, filename.replace('.parquet', ''), output_dir)
                
                except Exception as e:
                    print(f"Error processing file {filename}: {e}")
                break

    except FileNotFoundError:
        print(f"File {filename} not found.")
    except Exception as e:
        print(f"Unexpected error processing {filename}: {e}")


def process_all_parquet_files_in_directory(input_dir='../data/raw', output_dir='../data/filtered'):
    """
    Processes all parquet files in the input directory that follow the known patterns.
    
    Args:
    - input_dir: The directory where the raw parquet files are located.
    - output_dir: The directory where the filtered parquet files will be saved.
    
    Returns:
    - None
    """
    for filename in os.listdir(input_dir):
        if filename.endswith('.parquet'):
            try:
                process_file(filename, input_dir, output_dir)
            except Exception as e:
                print(f"Failed to process {filename}: {e}")
                continue


In [77]:
# Test script for processing parquet files

# Directory setup for test (change if needed)
input_directory = '../data/raw'
output_directory = '../data/filtered'

# Assuming you have some .parquet files in the input directory
# Example: 'yellow_tripdata_2021-01.parquet', 'green_tripdata_2021-01.parquet'

def test_parquet_processing():
    print(f"Starting the parquet processing from {input_directory} to {output_directory}...")

    # Call the function to process all files in the directory
    process_all_parquet_files_in_directory(input_dir=input_directory, output_dir=output_directory)

    print(f"Processing complete. Check {output_directory} for the filtered files.")

# Run the test
if __name__ == "__main__":
    test_parquet_processing()


Starting the parquet processing from ../data/raw to ../data/filtered...
No data outside the expected range
Filtered data saved to ../data/filtered/filtered_fhvhv_tripdata_2022-03.parquet
filtered_yellow_tripdata_2023-07.parquet already exists. Skipping processing.
No data outside the expected range
Filtered data saved to ../data/filtered/filtered_fhv_tripdata_2021-12.parquet
filtered_yellow_tripdata_2021-08.parquet already exists. Skipping processing.
True
2 rows were outside the specified year-month and have been filtered.
Filtered data saved to ../data/filtered/filtered_green_tripdata_2021-06.parquet
No data outside the expected range
Filtered data saved to ../data/filtered/filtered_fhvhv_tripdata_2021-07.parquet
filtered_yellow_tripdata_2022-04.parquet already exists. Skipping processing.
No data outside the expected range
Filtered data saved to ../data/filtered/filtered_fhvhv_tripdata_2021-12.parquet
filtered_yellow_tripdata_2022-05.parquet already exists. Skipping processing.
filt