In [1]:
import os
from datetime import datetime
import pandas as pd
import glob


import pyarrow as pa
import pyarrow.parquet as pq


In [2]:
date_range = pd.date_range(start='2021-01', end='2024-04', freq='ME')
print("Date range:", date_range)

Date range: DatetimeIndex(['2021-01-31', '2021-02-28', '2021-03-31', '2021-04-30',
               '2021-05-31', '2021-06-30', '2021-07-31', '2021-08-31',
               '2021-09-30', '2021-10-31', '2021-11-30', '2021-12-31',
               '2022-01-31', '2022-02-28', '2022-03-31', '2022-04-30',
               '2022-05-31', '2022-06-30', '2022-07-31', '2022-08-31',
               '2022-09-30', '2022-10-31', '2022-11-30', '2022-12-31',
               '2023-01-31', '2023-02-28', '2023-03-31', '2023-04-30',
               '2023-05-31', '2023-06-30', '2023-07-31', '2023-08-31',
               '2023-09-30', '2023-10-31', '2023-11-30', '2023-12-31',
               '2024-01-31', '2024-02-29', '2024-03-31'],
              dtype='datetime64[ns]', freq='ME')


In [3]:
cwd = os.getcwd()
print("Current Working Directory:", cwd)

Current Working Directory: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning


In [4]:
data_dir = os.path.join(os.getcwd(), "..", "Datasets", "taxi_parquets")
print(f"Data directory: {data_dir}")

# Check if the directory exists
if not os.path.exists(data_dir):
    print(f"Directory {data_dir} does not exist")
else:
    # List all files in the directory to check for existence and naming
    all_files_in_dir = os.listdir(data_dir)
    print(f"Files in directory {data_dir}: {all_files_in_dir}")

all_files = []


Data directory: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets
Files in directory c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets: ['fhvhv_2021_01.parquet', 'fhvhv_2021_02.parquet', 'fhvhv_2021_03.parquet', 'fhvhv_2021_04.parquet', 'fhvhv_2021_05.parquet', 'fhvhv_2021_06.parquet', 'fhvhv_2021_07.parquet', 'fhvhv_2021_08.parquet', 'fhvhv_2021_09.parquet', 'fhvhv_2021_10.parquet', 'fhvhv_2021_11.parquet', 'fhvhv_2021_12.parquet', 'fhvhv_2022_01.parquet', 'fhvhv_2022_02.parquet', 'fhvhv_2022_03.parquet', 'fhvhv_2022_04.parquet', 'fhvhv_2022_05.parquet', 'fhvhv_2022_06.parquet', 'fhvhv_2022_07.parquet', 'fhvhv_2022_08.parquet', 'fhvhv_2022_09.parquet', 'fhvhv_2022_10.parquet', 'fhvhv_2022_11.parquet', 'fhvhv_2022_12.parquet', 'fhvhv_2023_01.parquet', 'fhvhv_2023_02.parquet', 'fhvhv_2023_03.parquet', 'fhvhv_2023_04.parquet', 'fhvhv_2023_05.parquet', 'fhvhv_2023_06.p

In [5]:
for date in date_range:
    search_pattern = os.path.join(data_dir, f"yellow_{date.strftime('%Y_%m')}*.parquet")
    print(f"Searching for files with pattern: {search_pattern}")
    files = glob.glob(search_pattern)
    if files:
        print(f"Files found for pattern {search_pattern}: {files}")
    all_files.extend(files)  # Add the found files to the list

print("All files found:", all_files)

Searching for files with pattern: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_2021_01*.parquet
Files found for pattern c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_2021_01*.parquet: ['c:\\Users\\35385\\Desktop\\CS_Summer_2024\\Shared_GH\\New-York-App\\data-analytics\\cleaning\\..\\Datasets\\taxi_parquets\\yellow_2021_01.parquet']
Searching for files with pattern: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_2021_02*.parquet
Files found for pattern c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_2021_02*.parquet: ['c:\\Users\\35385\\Desktop\\CS_Summer_2024\\Shared_GH\\New-York-App\\data-analytics\\cleaning\\..\\Datasets\\taxi_parquets\\yellow_2021_02.parquet']
Searching for files with pattern: c:\Users

In [6]:
print("Number of files found:", len(all_files))

Number of files found: 39


In [7]:
print("File path for yellow_2021_01:", r"/data-analytics/Datasets/taxi_parquets/yellow_2021_01.parquet")

File path for yellow_2021_01: /data-analytics/Datasets/taxi_parquets/yellow_2021_01.parquet


In [8]:
def renaming_yellow_to_standard(df):
    """ 
    Function for renaming the columns of a dataset to standard names, which will ease the cleaning process
    """
    if isinstance(df, pd.DataFrame):
        df.rename(columns={
            'tpep_pickup_datetime': 'pickup_datetime', 
            'tpep_dropoff_datetime': 'dropoff_datetime', 
            'PULocationID': 'pickup_zone', 
            'DOLocationID': 'dropoff_zone'
        }, inplace=True)
        return df
    else:
        print("Warning: The input is not a DataFrame in renaming_yellow_to_standard")


In [9]:
def convert_float_to_int(df):
    """ 
    Function for converting datatypes of specific columns of a DataFrame to appropriate types.
    """
    if isinstance(df, pd.DataFrame):
        if "RatecodeID" in df.columns:
            df["RatecodeID"] = df["RatecodeID"].fillna(0).astype("int32")
        if "passenger_count" in df.columns:
            df["passenger_count"] = df["passenger_count"].fillna(0).astype("int32")
        if "pickup_zone" in df.columns:
            df["pickup_zone"] = df["pickup_zone"].fillna(0).astype("int32")
        if "dropoff_zone" in df.columns:
            df["dropoff_zone"] = df["dropoff_zone"].fillna(0).astype("int32")    
        return df
    else:
        print("Warning: The input is not a DataFrame in convert_float_to_int")


In [10]:
print("Current Working Directory:", cwd)

taxi_zone_dir = os.path.join(os.getcwd(), "..", "Datasets", "taxi_other")


# Define the directory where the data is located relative to the current working directory
print("Taxi Zone CSV Directory:", taxi_zone_dir)

# Define the file path relative to the data directory
taxi_zone_path = os.path.join(cwd, taxi_zone_dir, "taxi_zone_lookup.csv")

taxi_zone = pd.read_csv(taxi_zone_path, keep_default_na=True, delimiter=",", skipinitialspace=True, encoding="Windows-1252")

def valid_zones_1(df):
    manhattan_df = df[df["Borough"] == "Manhattan"]
    unique_zones = manhattan_df["LocationID"].unique()
    
    print(f"Number of Unique Zones: {len(unique_zones)}")
    print("List of Unique Zones:", unique_zones)

valid_zones_1(taxi_zone)
    

Current Working Directory: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning
Taxi Zone CSV Directory: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_other
Number of Unique Zones: 69
List of Unique Zones: [  4  12  13  24  41  42  43  45  48  50  68  74  75  79  87  88  90 100
 103 104 105 107 113 114 116 120 125 127 128 137 140 141 142 143 144 148
 151 152 153 158 161 162 163 164 166 170 186 194 202 209 211 224 229 230
 231 232 233 234 236 237 238 239 243 244 246 249 261 262 263]


In [11]:
# Define a function to get unique zones for Manhattan
def get_manhattan_zones(df):
    manhattan_df = df[df["Borough"] == "Manhattan"]
    unique_zones = manhattan_df["LocationID"].unique()
    return set(unique_zones)

# Get the unique Manhattan zones from the taxi_zone DataFrame
manhattan_zones = get_manhattan_zones(taxi_zone)

In [12]:
# Define the function to check pickup and dropoff zones
def check_zones(df, manhattan_zones):
    # Check if both pickup_zone and dropoff_zone are not in manhattan_zones
    invalid_zones = df[~df["pickup_zone"].isin(manhattan_zones) & ~df["dropoff_zone"].isin(manhattan_zones)]
    
    print(f"Invalid zones count: {invalid_zones.shape[0]}")
    
    if not invalid_zones.empty:
        print("Examples of rows with invalid zones:")
        print(invalid_zones.head())  # Print first few invalid rows

In [13]:
def drop_yellow_invalid_rows(df, manhattan_zones):
    if isinstance(df, pd.DataFrame):
        # Drop duplicate rows
        df = df.drop_duplicates()
        
        if "airport_fee" in df.columns:
            df = df.drop("airport_fee", axis=1)
        if "Airport_fee" in df.columns:
            df = df.drop("Airport_fee", axis=1)
            
        # Drop rows where passenger_count == 0 or >= 6
        df = df[(df["passenger_count"] > 0) & (df["passenger_count"] < 6)]
        
        # Drop rows where fare_amount or total_amount <= 0
        df = df[(df["fare_amount"] > 0) & (df["total_amount"] > 0)]
        
        # Drop rows where extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, or congestion_surcharge < 0
        df = df[(df["extra"] >= 0) & (df["mta_tax"] >= 0) & (df["tip_amount"] >= 0) & 
                (df["tolls_amount"] >= 0) & (df["improvement_surcharge"] >= 0) & 
                (df["congestion_surcharge"] >= 0)]
        
        # Drop rows where pickup_datetime == dropoff_datetime
        df = df[df["pickup_datetime"] != df["dropoff_datetime"]]
        
        # Drop rows where trip_distance <= 0
        df = df[df["trip_distance"] > 0]
        
        # Drop rows where RateCodeID != 1-6
        df = df[df["RatecodeID"].isin([1, 2, 3, 4, 5, 6])]
        
        # Drop rows where payment_type == 4
        df = df[df["payment_type"] != 4]
        
        # Drop rows where both pickup_zone and dropoff_zone are not Manhattan zones
        df = df[df["pickup_zone"].isin(manhattan_zones) | df["dropoff_zone"].isin(manhattan_zones)]
        
    else:
        print("Warning: Input is not a DataFrame")

    return df

In [14]:
def drop_yellow_columns(df):
    columns_to_drop = ["VendorID", "trip_distance", "RatecodeID", "store_and_fwd_flag", "payment_type", 
                       "fare_amount", "extra", "mta_tax", "improvement_surcharge", "tip_amount", 
                       "tolls_amount", "total_amount", "congestion_surcharge"]
    
    # Drop only the columns that exist in the DataFrame
    existing_columns_to_drop = [col for col in columns_to_drop if col in df.columns]
    df.drop(columns=existing_columns_to_drop, inplace=True)

    return df

In [15]:
def drop_missing_values(df):
    """
    Drops any rows with missing values from the DataFrame.
    
    Parameters:
        df (pd.DataFrame): The DataFrame to clean.
        
    Returns:
        pd.DataFrame: The cleaned DataFrame.
        int: The number of rows that were dropped.
    """
    
    if isinstance(df, pd.DataFrame):
        initial_row_count = df.shape[0]
        df = df.dropna()
        final_row_count = df.shape[0]
        rows_dropped = initial_row_count - final_row_count
        print(f"Number of rows dropped: {rows_dropped}")
    return df

In [16]:
def clean_parquet_files(file_paths, manhattan_zones):
    cleaned_dfs = []
    
    for file_path in file_paths:
        print(f"Processing file: {file_path}")
        
        # Read the parquet file
        df = pd.read_parquet(file_path)
        
        print("DF Shape OLD", df.shape)

        # Apply the cleaning functions
        df = renaming_yellow_to_standard(df)
        df = convert_float_to_int(df)
        df = drop_yellow_invalid_rows(df, manhattan_zones)
        df = drop_yellow_columns(df)
        df = drop_missing_values(df)

        print("DF Shape NEW", df.shape)

        
        # Append the cleaned DataFrame to the list
        cleaned_dfs.append(df)
        
        # Save the cleaned DataFrame back to a parquet file (optional)
        cleaned_file_path = file_path.replace('.parquet', '_cleaned.parquet')
        df.to_parquet(cleaned_file_path)
        print(f"Saved cleaned file: {cleaned_file_path}")
    
    # Concatenate all cleaned DataFrames into a single DataFrame
    final_df = pd.concat(cleaned_dfs, ignore_index=True)
    return final_df

In [17]:
def get_parquet_files(data_dir, date_range):
    all_files = []
    
    for date in date_range:
        search_pattern = os.path.join(data_dir, f"yellow_{date.strftime('%Y_%m')}*.parquet")
        files = glob.glob(search_pattern)
        all_files.extend(files)
    
    return all_files

# Usage example:
data_dir = "c:\\Users\\35385\\Desktop\\CS_Summer_2024\\Shared_GH\\New-York-App\\data-analytics\\cleaning\\..\\Datasets\\taxi_parquets\\"
date_range = pd.date_range(start="2021-01-01", end="2024-03-31", freq="MS")

file_paths = get_parquet_files(data_dir, date_range)

In [18]:
final_df = clean_parquet_files(file_paths, manhattan_zones)

# Save the final concatenated DataFrame to a parquet file (optional)
final_df.to_parquet('c:\\Users\\35385\\Desktop\\CS_Summer_2024\\Shared_GH\\New-York-App\\data-analytics\\cleaning\\..\\Datasets\\taxi_parquets\\yellow_final_cleaned.parquet')
print("Saved final concatenated DataFrame: yellow_final_cleaned.parquet")

Processing file: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_2021_01.parquet
DF Shape OLD (1369769, 19)
Number of rows dropped: 0
DF Shape NEW (1149499, 5)
Saved cleaned file: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_2021_01_cleaned.parquet
Processing file: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_2021_02.parquet
DF Shape OLD (1371709, 19)
Number of rows dropped: 0
DF Shape NEW (1160123, 5)
Saved cleaned file: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_2021_02_cleaned.parquet
Processing file: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_2021_03.parquet
DF Shape OLD (1925152, 19)
Number of rows dropped: 0
DF Shape NEW

In [19]:
""" 
Begin by loading 1 parquet file as pandas dataframe from each of the 4 TLC genres.
Error catching across OSes implemented: cwd, data directory, paths etc.
"""

cwd = os.getcwd()
print("Current Working Directory:", cwd)

# Define the directory where the data is located relative to the current working directory
print("Data Directory:", data_dir)

# Define the file paths relative to the data directory
yellow_final_cleaned_path = os.path.join(data_dir, "yellow_final_cleaned.parquet")

# Print the constructed file paths to verify
print("yellow_final_cleaned:", yellow_final_cleaned_path)


Current Working Directory: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning
Data Directory: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\
yellow_final_cleaned: c:\Users\35385\Desktop\CS_Summer_2024\Shared_GH\New-York-App\data-analytics\cleaning\..\Datasets\taxi_parquets\yellow_final_cleaned.parquet


In [20]:
# Read the parquet files using the relative file paths
yellow_final_cleaned = pd.read_parquet(yellow_final_cleaned_path, engine='pyarrow')

In [21]:
yellow_final_cleaned.shape

(102318739, 5)

In [22]:
yellow_final_cleaned.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 102318739 entries, 0 to 102318738
Data columns (total 5 columns):
 #   Column            Dtype         
---  ------            -----         
 0   pickup_datetime   datetime64[us]
 1   dropoff_datetime  datetime64[us]
 2   passenger_count   int32         
 3   pickup_zone       int32         
 4   dropoff_zone      int32         
dtypes: datetime64[us](2), int32(3)
memory usage: 2.7 GB


In [23]:
yellow_final_cleaned.head(10)

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,pickup_zone,dropoff_zone
0,2021-01-01 00:30:10,2021-01-01 00:36:12,1,142,43
1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,238,151
2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,68,33
3,2021-01-01 00:16:29,2021-01-01 00:24:30,1,224,68
4,2021-01-01 00:12:29,2021-01-01 00:30:34,1,90,40
5,2021-01-01 00:26:12,2021-01-01 00:39:46,2,263,142
6,2021-01-01 00:15:52,2021-01-01 00:38:07,3,164,255
7,2021-01-01 00:10:46,2021-01-01 00:32:58,2,138,166
8,2021-01-01 00:31:06,2021-01-01 00:38:52,5,142,50
9,2021-01-01 00:42:11,2021-01-01 00:44:24,5,50,142


In [24]:
def calculate_zone_busy(yellow_df):
    # Combine pickup and dropoff data into a single DataFrame; renamed and combined into a single datetime column
    pickup_data = yellow_df[['pickup_datetime', 'passenger_count', 'pickup_zone']].rename(columns={'pickup_datetime': 'datetime', 'pickup_zone': 'zone'})
    dropoff_data = yellow_df[['dropoff_datetime', 'passenger_count', 'dropoff_zone']].rename(columns={'dropoff_datetime': 'datetime', 'dropoff_zone': 'zone'})
    combined_data = pd.concat([pickup_data, dropoff_data])
    
    # Round datetime to the nearest hour
    combined_data['datetime'] = combined_data['datetime'].dt.round('h')
    
    # Extract the required time components
    combined_data['hour'] = combined_data['datetime'].dt.hour
    combined_data['day_of_week'] = combined_data['datetime'].dt.dayofweek
    combined_data['week'] = combined_data['datetime'].dt.isocalendar().week
    combined_data['month'] = combined_data['datetime'].dt.month - 1  # Convert to 0-11 for Jan-Dec
    
    # Group by hour, day_of_week, week, month, and zone, summing passenger counts
    zone_busy_df = combined_data.groupby(['hour', 'day_of_week', 'week', 'month', 'zone'])['passenger_count'].sum().reset_index()
    
    return zone_busy_df

In [25]:
yellow_final_cleaned = calculate_zone_busy(yellow_final_cleaned)
print(yellow_final_cleaned)

MemoryError: Unable to allocate 1.52 GiB for an array with shape (204637478,) and data type datetime64[us]