# NYC Yellow Taxi Data - Comprehensive Data Cleaning
## Removing Invalid Records Across All Years

This notebook removes:
- Zero and negative trip distances
- Negative fares
- Other data quality issues

**Output**: Cleaned parquet files in `data/processed/`

In [2]:
# Import required libraries
import pandas as pd
import numpy as np
import os
from glob import glob
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

print("=" * 80)
print("NYC YELLOW TAXI DATA - COMPREHENSIVE CLEANING PIPELINE")
print("=" * 80)
print(f"Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

NYC YELLOW TAXI DATA - COMPREHENSIVE CLEANING PIPELINE
Start Time: 2025-10-06 21:47:25


In [3]:
# Configuration
RAW_DATA_PATH = '/Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/data/raw/'
PROCESSED_DATA_PATH = '/Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/data/processed/'

# Create output directory
os.makedirs(PROCESSED_DATA_PATH, exist_ok=True)

# Find all parquet files
all_files = []
for year in ['2022', '2023', '2024', '2025']:
    year_path = os.path.join(RAW_DATA_PATH, year)
    if os.path.exists(year_path):
        files = glob(os.path.join(year_path, '*.parquet'))
        all_files.extend(files)

print(f"\nüìÅ Found {len(all_files)} parquet files to process")
print(f"üìÇ Output directory: {PROCESSED_DATA_PATH}")


üìÅ Found 44 parquet files to process
üìÇ Output directory: /Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/data/processed/


In [4]:
# Define cleaning function
def clean_taxi_data(df):
    """
    Apply all data cleaning rules to a DataFrame
    
    Returns: cleaned_df, cleaning_stats
    """
    initial_count = len(df)
    stats = {}
    
    # 1. Remove zero and negative trip distances
    zero_distance = (df['trip_distance'] <= 0).sum()
    df = df[df['trip_distance'] > 0]
    stats['zero_negative_distance'] = zero_distance
    
    # 2. Remove negative fares
    negative_fare = (df['fare_amount'] < 0).sum()
    df = df[df['fare_amount'] >= 0]
    stats['negative_fare'] = negative_fare
    
    # 3. Remove zero fares (likely errors or no-charges that shouldn't be analyzed)
    zero_fare = (df['fare_amount'] == 0).sum()
    df = df[df['fare_amount'] > 0]
    stats['zero_fare'] = zero_fare
    
    # 4. Remove zero or negative passenger counts
    invalid_passengers = (df['passenger_count'] <= 0).sum()
    df = df[df['passenger_count'] > 0]
    stats['invalid_passengers'] = invalid_passengers
    
    # 5. Remove trips with negative duration
    df['trip_duration_minutes'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() / 60
    negative_duration = (df['trip_duration_minutes'] <= 0).sum()
    df = df[df['trip_duration_minutes'] > 0]
    stats['negative_duration'] = negative_duration
    
    # 6. Remove extreme outliers (likely data entry errors)
    extreme_fare = (df['fare_amount'] > 1000).sum()
    df = df[df['fare_amount'] <= 1000]
    stats['extreme_fare'] = extreme_fare
    
    extreme_distance = (df['trip_distance'] > 200).sum()
    df = df[df['trip_distance'] <= 200]
    stats['extreme_distance'] = extreme_distance
    
    extreme_duration = (df['trip_duration_minutes'] > 300).sum()  # 5 hours
    df = df[df['trip_duration_minutes'] <= 300]
    stats['extreme_duration'] = extreme_duration
    
    # 7. Remove duplicate records
    duplicates = df.duplicated().sum()
    df = df.drop_duplicates()
    stats['duplicates'] = duplicates
    
    # Calculate cleaning summary
    stats['initial_count'] = initial_count
    stats['final_count'] = len(df)
    stats['total_removed'] = initial_count - len(df)
    stats['removal_percentage'] = (stats['total_removed'] / initial_count * 100) if initial_count > 0 else 0
    
    # Drop temporary columns
    if 'trip_duration_minutes' in df.columns:
        df = df.drop(columns=['trip_duration_minutes'])
    
    return df, stats

print("‚úÖ Cleaning function defined")

‚úÖ Cleaning function defined


In [5]:
# Process all files
print("\n" + "=" * 80)
print("üßπ PROCESSING FILES")
print("=" * 80)

overall_stats = {
    'files_processed': 0,
    'total_initial_records': 0,
    'total_final_records': 0,
    'total_removed_records': 0,
    'zero_negative_distance': 0,
    'negative_fare': 0,
    'zero_fare': 0,
    'invalid_passengers': 0,
    'negative_duration': 0,
    'extreme_fare': 0,
    'extreme_distance': 0,
    'extreme_duration': 0,
    'duplicates': 0
}

for i, file_path in enumerate(sorted(all_files), 1):
    try:
        # Extract year and filename
        year = file_path.split('/')[-2]
        filename = os.path.basename(file_path)
        
        print(f"\n[{i}/{len(all_files)}] Processing: {year}/{filename}")
        
        # Load data
        df = pd.read_parquet(file_path)
        initial_size = len(df)
        
        # Clean data
        df_clean, stats = clean_taxi_data(df)
        
        # Create output directory for year
        year_output_path = os.path.join(PROCESSED_DATA_PATH, year)
        os.makedirs(year_output_path, exist_ok=True)
        
        # Save cleaned data
        output_file = os.path.join(year_output_path, filename)
        df_clean.to_parquet(output_file, index=False)
        
        # Update overall stats
        overall_stats['files_processed'] += 1
        overall_stats['total_initial_records'] += stats['initial_count']
        overall_stats['total_final_records'] += stats['final_count']
        overall_stats['total_removed_records'] += stats['total_removed']
        
        for key in ['zero_negative_distance', 'negative_fare', 'zero_fare', 'invalid_passengers',
                    'negative_duration', 'extreme_fare', 'extreme_distance', 'extreme_duration', 'duplicates']:
            overall_stats[key] += stats[key]
        
        # Print file summary
        print(f"  Initial: {stats['initial_count']:,} | Final: {stats['final_count']:,} | Removed: {stats['total_removed']:,} ({stats['removal_percentage']:.2f}%)")
        print(f"  ‚úÖ Saved to: {output_file}")
        
    except Exception as e:
        print(f"  ‚ùå Error processing {filename}: {str(e)}")
        continue

print("\n" + "=" * 80)
print("‚úÖ ALL FILES PROCESSED")
print("=" * 80)


üßπ PROCESSING FILES

[1/44] Processing: 2022/yellow_tripdata_2022-01.parquet
  Initial: 2,463,931 | Final: 2,299,197 | Removed: 164,734 (6.69%)
  ‚úÖ Saved to: /Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/data/processed/2022/yellow_tripdata_2022-01.parquet

[2/44] Processing: 2022/yellow_tripdata_2022-02.parquet
  Initial: 2,979,431 | Final: 2,768,518 | Removed: 210,913 (7.08%)
  ‚úÖ Saved to: /Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/data/processed/2022/yellow_tripdata_2022-02.parquet

[3/44] Processing: 2022/yellow_tripdata_2022-03.parquet
  Initial: 3,627,882 | Final: 3,376,555 | Removed: 251,327 (6.93%)
  ‚úÖ Saved to: /Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/data/processed/2022/yellow_tripdata_2022-03.parquet

[4/44] Processing: 2022/yellow_tripdata_2022-04.parquet
  Initial: 3,599,920 | Final: 3,348,837 | Removed: 251,083 (6.97%)
  ‚úÖ Saved to: /Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/data/processed/2022/yellow_tripda

In [6]:
# Display overall cleaning summary
print("\n" + "=" * 80)
print("üìä OVERALL CLEANING SUMMARY")
print("=" * 80)

print(f"\n‚úì Files Processed: {overall_stats['files_processed']}")
print(f"\n‚úì Record Counts:")
print(f"  Initial Records:  {overall_stats['total_initial_records']:,}")
print(f"  Final Records:    {overall_stats['total_final_records']:,}")
print(f"  Removed Records:  {overall_stats['total_removed_records']:,}")
print(f"  Retention Rate:   {(overall_stats['total_final_records']/overall_stats['total_initial_records']*100):.2f}%")

print(f"\n‚úì Records Removed by Category:")
print(f"  Zero/Negative Distance:  {overall_stats['zero_negative_distance']:,}")
print(f"  Negative Fare:           {overall_stats['negative_fare']:,}")
print(f"  Zero Fare:               {overall_stats['zero_fare']:,}")
print(f"  Invalid Passengers:      {overall_stats['invalid_passengers']:,}")
print(f"  Negative Duration:       {overall_stats['negative_duration']:,}")
print(f"  Extreme Fare (>$1000):   {overall_stats['extreme_fare']:,}")
print(f"  Extreme Distance (>200mi): {overall_stats['extreme_distance']:,}")
print(f"  Extreme Duration (>5hrs): {overall_stats['extreme_duration']:,}")
print(f"  Duplicates:              {overall_stats['duplicates']:,}")

print(f"\nüìÇ Cleaned data saved to: {PROCESSED_DATA_PATH}")
print(f"\n‚è±Ô∏è  Completion Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\n" + "=" * 80)
print("‚úÖ DATA CLEANING COMPLETE - READY FOR ANALYSIS")
print("=" * 80)


üìä OVERALL CLEANING SUMMARY

‚úì Files Processed: 44

‚úì Record Counts:
  Initial Records:  150,692,482
  Final Records:    130,836,094
  Removed Records:  19,856,388
  Retention Rate:   86.82%

‚úì Records Removed by Category:
  Zero/Negative Distance:  3,014,621
  Negative Fare:           2,904,853
  Zero Fare:               34,624
  Invalid Passengers:      1,866,558
  Negative Duration:       295,419
  Extreme Fare (>$1000):   153
  Extreme Distance (>200mi): 1,071
  Extreme Duration (>5hrs): 110,031
  Duplicates:              4

üìÇ Cleaned data saved to: /Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/data/processed/

‚è±Ô∏è  Completion Time: 2025-10-06 21:54:52

‚úÖ DATA CLEANING COMPLETE - READY FOR ANALYSIS


In [7]:
# Save cleaning report
report_path = '/Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/outputs/reports/'
os.makedirs(report_path, exist_ok=True)

report_file = os.path.join(report_path, f'cleaning_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')

with open(report_file, 'w') as f:
    f.write("=" * 80 + "\n")
    f.write("NYC YELLOW TAXI DATA - CLEANING REPORT\n")
    f.write("=" * 80 + "\n\n")
    f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
    f.write(f"Files Processed: {overall_stats['files_processed']}\n\n")
    f.write("RECORD COUNTS:\n")
    f.write(f"  Initial Records:  {overall_stats['total_initial_records']:,}\n")
    f.write(f"  Final Records:    {overall_stats['total_final_records']:,}\n")
    f.write(f"  Removed Records:  {overall_stats['total_removed_records']:,}\n")
    f.write(f"  Retention Rate:   {(overall_stats['total_final_records']/overall_stats['total_initial_records']*100):.2f}%\n\n")
    f.write("RECORDS REMOVED BY CATEGORY:\n")
    f.write(f"  Zero/Negative Distance:    {overall_stats['zero_negative_distance']:,}\n")
    f.write(f"  Negative Fare:             {overall_stats['negative_fare']:,}\n")
    f.write(f"  Zero Fare:                 {overall_stats['zero_fare']:,}\n")
    f.write(f"  Invalid Passengers:        {overall_stats['invalid_passengers']:,}\n")
    f.write(f"  Negative Duration:         {overall_stats['negative_duration']:,}\n")
    f.write(f"  Extreme Fare (>$1000):     {overall_stats['extreme_fare']:,}\n")
    f.write(f"  Extreme Distance (>200mi): {overall_stats['extreme_distance']:,}\n")
    f.write(f"  Extreme Duration (>5hrs):  {overall_stats['extreme_duration']:,}\n")
    f.write(f"  Duplicates:                {overall_stats['duplicates']:,}\n\n")
    f.write(f"Output Location: {PROCESSED_DATA_PATH}\n")
    f.write("=" * 80 + "\n")

print(f"\nüìÑ Cleaning report saved to: {report_file}")


üìÑ Cleaning report saved to: /Users/yash/Documents/Projects/NYC_Yellow_Taxi_Analytics/outputs/reports/cleaning_report_20251006_215452.txt


In [8]:
# Quick validation - Load and verify one cleaned file
print("\n" + "=" * 80)
print("üîç VALIDATION CHECK")
print("=" * 80)

# Get first cleaned file
cleaned_files = glob(os.path.join(PROCESSED_DATA_PATH, '**/*.parquet'), recursive=True)
if cleaned_files:
    validation_file = cleaned_files[0]
    df_validation = pd.read_parquet(validation_file)
    
    print(f"\n‚úì Validating: {os.path.basename(validation_file)}")
    print(f"\n  Records: {len(df_validation):,}")
    print(f"\n  Quality Checks:")
    print(f"    Zero/Negative Distance: {(df_validation['trip_distance'] <= 0).sum()} (Expected: 0) {'‚úÖ' if (df_validation['trip_distance'] <= 0).sum() == 0 else '‚ùå'}")
    print(f"    Negative Fare:          {(df_validation['fare_amount'] < 0).sum()} (Expected: 0) {'‚úÖ' if (df_validation['fare_amount'] < 0).sum() == 0 else '‚ùå'}")
    print(f"    Zero Fare:              {(df_validation['fare_amount'] == 0).sum()} (Expected: 0) {'‚úÖ' if (df_validation['fare_amount'] == 0).sum() == 0 else '‚ùå'}")
    print(f"    Invalid Passengers:     {(df_validation['passenger_count'] <= 0).sum()} (Expected: 0) {'‚úÖ' if (df_validation['passenger_count'] <= 0).sum() == 0 else '‚ùå'}")
    
    print(f"\n  Sample Data (First 5 rows):")
    print(df_validation[['tpep_pickup_datetime', 'trip_distance', 'fare_amount', 'total_amount', 'passenger_count']].head())
    
    print("\n‚úÖ Validation passed! Cleaned data is ready for analysis.")
else:
    print("\n‚ùå No cleaned files found for validation.")

print("\n" + "=" * 80)


üîç VALIDATION CHECK

‚úì Validating: yellow_tripdata_2022-10.parquet

  Records: 3,395,481

  Quality Checks:
    Zero/Negative Distance: 0 (Expected: 0) ‚úÖ
    Negative Fare:          0 (Expected: 0) ‚úÖ
    Zero Fare:              0 (Expected: 0) ‚úÖ
    Invalid Passengers:     0 (Expected: 0) ‚úÖ

  Sample Data (First 5 rows):
  tpep_pickup_datetime  trip_distance  fare_amount  total_amount  \
0  2022-10-01 00:03:41           1.70          9.5         15.95   
1  2022-10-01 00:14:30           0.72          5.5          9.30   
2  2022-10-01 00:27:13           1.74          9.0         12.36   
3  2022-10-01 00:22:52           6.80         25.5         29.30   
4  2022-10-01 00:33:19           1.88         10.5         14.30   

   passenger_count  
0              1.0  
1              2.0  
2              1.0  
3              1.0  
4              3.0  

‚úÖ Validation passed! Cleaned data is ready for analysis.

