# NORMALIZE AND CLEAN DATA

In [1]:
import pandas as pd
import sys
import os

# Add project root to Python path to allow module imports
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.append(project_root)

# Import custom utility functions for data processing
from src.utils.normalizing import normalize          # Standardize or scale data
from src.utils.qa_rules import run_quality_check, summarize_qa_flags  # Apply and summarize QA rules
from src.utils.cleaning import clean                 # Perform data cleaning

## Normalize Data of a month using built functions
**Objective:** Load the raw January data and apply the `normalize` function (from `src.utils.normalizing`).
* **Enrich data:** Add `PU/DO_Borough`, `payment_type_name`, etc.
* **Feature Engineering:** Create derived columns like `trip_duration`, `avg_speed`, and `pickup_day_of_week`.
* **Feature Selection:** Drop irrelevant columns identified in Notebook 1, specifically 'airport_fee' since it has only 5 nonnull values; 'store_and_fwd_flag', 'VendorID' because they are irrelevant to analysis; 'mta_tax','improvement_surcharge' due to their low variance.

In [2]:
df1 = pd.read_parquet("../raw/yellow_tripdata_2021-01.parquet")
df1_normalized = normalize(df1)
print("Successfully normalized January data")

Successfully normalized January data


In [3]:
print("First 10 rows of data before normalized: ")
df1.head(10)

First 10 rows of data before normalized: 


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
5,1,2021-01-01 00:16:29,2021-01-01 00:24:30,1.0,1.6,1.0,N,224,68,1,8.0,3.0,0.5,2.35,0.0,0.3,14.15,2.5,
6,1,2021-01-01 00:00:28,2021-01-01 00:17:28,1.0,4.1,1.0,N,95,157,2,16.0,0.5,0.5,0.0,0.0,0.3,17.3,0.0,
7,1,2021-01-01 00:12:29,2021-01-01 00:30:34,1.0,5.7,1.0,N,90,40,2,18.0,3.0,0.5,0.0,0.0,0.3,21.8,2.5,
8,1,2021-01-01 00:39:16,2021-01-01 01:00:13,1.0,9.1,1.0,N,97,129,4,27.5,0.5,0.5,0.0,0.0,0.3,28.8,0.0,
9,1,2021-01-01 00:26:12,2021-01-01 00:39:46,2.0,2.7,1.0,N,263,142,1,12.0,3.0,0.5,3.15,0.0,0.3,18.95,2.5,


In [4]:
print("First 10 rows of data after normalized with new columns at the end: ")
df1_normalized.head(10)

First 10 rows of data after normalized with new columns at the end: 


Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,...,ratecodeID_name,payment_type_name,PU_Borough,PU_Zone,DO_Borough,DO_Zone,trip_duration_seconds,trip_duration_minutes,avg_speed_mph,pickup_day_of_week
0,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,142,43,2,8.0,3.0,...,Standard rate,Cash,Manhattan,Lincoln Square East,Manhattan,Central Park,362.0,6.0,20.88,Friday
1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,238,151,2,3.0,0.5,...,Standard rate,Cash,Manhattan,Upper West Side North,Manhattan,Manhattan Valley,59.0,1.0,12.2,Friday
2,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,132,165,1,42.0,0.5,...,Standard rate,Credit card,Queens,JFK Airport,Brooklyn,Midwood,1656.0,28.0,31.96,Friday
3,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,138,132,1,29.0,0.5,...,Standard rate,Credit card,Queens,LaGuardia Airport,Queens,JFK Airport,913.0,15.0,41.8,Friday
4,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,68,33,1,16.5,0.5,...,Standard rate,Credit card,Manhattan,East Chelsea,Brooklyn,Brooklyn Heights,992.0,17.0,17.93,Friday
5,2021-01-01 00:16:29,2021-01-01 00:24:30,1.0,1.6,1.0,224,68,1,8.0,3.0,...,Standard rate,Credit card,Manhattan,Stuy Town/Peter Cooper Village,Manhattan,East Chelsea,481.0,8.0,11.98,Friday
6,2021-01-01 00:00:28,2021-01-01 00:17:28,1.0,4.1,1.0,95,157,2,16.0,0.5,...,Standard rate,Cash,Queens,Forest Hills,Queens,Maspeth,1020.0,17.0,14.47,Friday
7,2021-01-01 00:12:29,2021-01-01 00:30:34,1.0,5.7,1.0,90,40,2,18.0,3.0,...,Standard rate,Cash,Manhattan,Flatiron,Brooklyn,Carroll Gardens,1085.0,18.0,18.91,Friday
8,2021-01-01 00:39:16,2021-01-01 01:00:13,1.0,9.1,1.0,97,129,4,27.5,0.5,...,Standard rate,Dispute,Brooklyn,Fort Greene,Queens,Jackson Heights,1257.0,21.0,26.06,Friday
9,2021-01-01 00:26:12,2021-01-01 00:39:46,2.0,2.7,1.0,263,142,1,12.0,3.0,...,Standard rate,Credit card,Manhattan,Yorkville West,Manhattan,Lincoln Square East,814.0,14.0,11.94,Friday


In [5]:
print("Data after normalized info: ")
df1_normalized.info()

Data after normalized info: 
<class 'pandas.core.frame.DataFrame'>
Int64Index: 1369769 entries, 0 to 1369768
Data columns (total 24 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   tpep_pickup_datetime   1369769 non-null  datetime64[ns]
 1   tpep_dropoff_datetime  1369769 non-null  datetime64[ns]
 2   passenger_count        1271417 non-null  float64       
 3   trip_distance          1369769 non-null  float64       
 4   RatecodeID             1271417 non-null  float64       
 5   PULocationID           1369769 non-null  int64         
 6   DOLocationID           1369769 non-null  int64         
 7   payment_type           1369769 non-null  int64         
 8   fare_amount            1369769 non-null  float64       
 9   extra                  1369769 non-null  float64       
 10  tip_amount             1369769 non-null  float64       
 11  tolls_amount           1369769 non-null  float64       
 12 

## Applying QA steps 
**Objective:** Apply the `run_quality_check` function (from `src.utils.qa_rules`) to the normalized data.
* This will return `df1_flag`, a DataFrame containing 11 boolean flag columns based on 11 rules.
* Then, use `summarize_qa_flags` to generate the summary string ("count/pct%") for the report.

In [6]:
df1_flag = run_quality_check(df1_normalized)
january = summarize_qa_flags(df1_flag)
print("Successfully run quality check!")

Successfully run quality check!


In [7]:
print("First 10 rows of January's flag: ")
df1_flag.head(10)

First 10 rows of January's flag: 


Unnamed: 0,is_duplicate,invalid_time_order,invalid_duration,invalid_distance,invalid_speed,invalid_fare_amount,invalid_tip_amount,invalid_total_amount,invalid_payment_type,invalid_passenger_count,invalid_zone
0,False,False,False,False,False,False,False,False,False,False,False
1,False,False,False,False,False,False,False,False,False,False,False
2,False,False,False,False,False,False,False,False,False,False,False
3,False,False,False,False,False,False,False,False,False,True,False
4,False,False,False,False,False,False,False,False,False,False,False
5,False,False,False,False,False,False,False,False,False,False,False
6,False,False,False,False,False,False,False,False,False,False,False
7,False,False,False,False,False,False,False,False,False,False,False
8,False,False,False,False,False,False,False,False,False,False,False
9,False,False,False,False,False,False,False,False,False,False,False


In [8]:
print("Summary of January's flag, 0-10 indicates rule ID, row 11 is the sum of trips that has violations: ")
january

Summary of January's flag, 0-10 indicates rule ID, row 11 is the sum of trips that has violations: 


0          0/0.0%
1      5642/0.41%
2     16119/1.18%
3     19952/1.46%
4     24893/1.82%
5      7411/0.54%
6         59/0.0%
7      7114/0.52%
8          0/0.0%
9     26726/1.95%
10    27137/1.98%
11    79108/5.78%
dtype: object

## Clean Data of a month using built function
**Objective:** Apply the `clean` function (from `src.utils.cleaning`).
* This function will take `df1_normalized` and `df1_flag` as input.
* It will filter and remove rows that violate the rules according to our defined strategy.

In [9]:
df1_cleaned, df1_standard = clean(df1_normalized, df1_flag)
print("Successfully cleaned data!")
print("Cleaned data has shape: ", df1_cleaned.shape)

Successfully cleaned data!
Cleaned data has shape:  (1340386, 24)


In [10]:
print("Data after cleaned info: ")
df1_cleaned.info()

Data after cleaned info: 
<class 'pandas.core.frame.DataFrame'>
Int64Index: 1340386 entries, 0 to 1369768
Data columns (total 24 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   tpep_pickup_datetime   1340386 non-null  datetime64[ns]
 1   tpep_dropoff_datetime  1340386 non-null  datetime64[ns]
 2   passenger_count        1249613 non-null  float64       
 3   trip_distance          1340386 non-null  float64       
 4   RatecodeID             1249613 non-null  float64       
 5   PULocationID           1340386 non-null  int64         
 6   DOLocationID           1340386 non-null  int64         
 7   payment_type           1340386 non-null  int64         
 8   fare_amount            1340386 non-null  float64       
 9   extra                  1340386 non-null  float64       
 10  tip_amount             1340386 non-null  float64       
 11  tolls_amount           1340386 non-null  float64       
 12  to

In [11]:
print("More information about cleaned data (numerical values):")
df1_cleaned.describe()

More information about cleaned data (numerical values):


Unnamed: 0,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,tip_amount,tolls_amount,total_amount,congestion_surcharge,trip_duration_seconds,trip_duration_minutes,avg_speed_mph
count,1249613.0,1340386.0,1249613.0,1340386.0,1340386.0,1340386.0,1340386.0,1340386.0,1340386.0,1340386.0,1340386.0,1249613.0,1340386.0,1340386.0,1340386.0
mean,1.415902,4.704354,1.022069,165.0348,161.7922,1.189912,11.92878,0.9854489,1.940023,0.2433507,17.36598,2.269395,845.5518,14.0922,19.11607
std,1.064091,398.1971,0.3911283,67.51307,71.93394,0.5668484,12.50073,1.233965,2.533959,1.514854,14.3086,0.7562342,3759.726,62.66288,1417.377
min,0.0,0.01,1.0,1.0,1.0,0.0,-150.0,-5.5,-30.72,-31.12,-176.42,-2.5,31.0,1.0,0.01
25%,1.0,1.02,1.0,125.0,107.0,1.0,6.0,0.0,0.0,0.0,10.8,2.5,341.0,6.0,9.59
50%,1.0,1.7,1.0,162.0,162.0,1.0,8.5,0.5,1.86,0.0,13.8,2.5,548.0,9.0,11.89
75%,1.0,3.05,1.0,236.0,236.0,1.0,13.0,2.5,2.75,0.0,18.96,2.5,877.0,15.0,15.3
max,8.0,263163.3,99.0,265.0,265.0,4.0,6960.5,8.25,1140.44,811.75,7661.28,2.5,1729062.0,28818.0,1054122.0


## Clean Data of 12 months and save it to processed


In [21]:
from pathlib import Path

raw_dir = Path("../raw")
raw_files = list(raw_dir.glob("yellow_tripdata_2021-*.parquet"))

cleaned_dir = Path("../processed/cleaned_data")
flag_dir = Path("../processed/flags_for_analysis")

reports_dir = Path("../reports")

In [None]:
id = [1,2,3,4,5,6,7,8,9,10,11,'NaN']
rule = ['is_duplicate', 'invalid_time_order', 'invalid_duration', 'invalid_distance', 
    'invalid_speed', 'invalid_fare_amount', 'invalid_tip_amount', 'invalid_total_amount', 
    'invalid_payment_type', 'invalid_passenger_count', 'invalid_zone (Unknown)', 'NaN']

action = ['Exclude', 'Exclude', 'Exclude', 'Exclude', 
        'Exclude', 'Flag', 'Flag', 'Flag', 'Flag',
        'Flag', 'Flag', 'Total:']

rule_definition = {'ID': id, 'Rule': rule, 'Action': action}
final_qa_report_df = pd.DataFrame(data = rule_definition, columns=['ID', 'Rule', 'Action'])
final_qa_report_df


Unnamed: 0,ID,Rule,Action
0,1.0,is_duplicate,Exclude
1,2.0,invalid_time_order,Exclude
2,3.0,invalid_duration,Exclude
3,4.0,invalid_distance,Exclude
4,5.0,invalid_speed,Exlucde
5,6.0,invalid_fare_amount,Flag
6,7.0,invalid_tip_amount,Flag
7,8.0,invalid_total_amount,Flag
8,9.0,invalid_payment_type,Flag
9,10.0,invalid_passenger_count,Flag


In [None]:
for file in raw_files:
    # Take month's name:
    month_str = file.name.split('-')[1].split('.')[0]
    month_col_name = pd.to_datetime(f'2021-{month_str}-01').strftime('%B') 
    month_int = pd.to_datetime(f"2021-{month_str}-01").month

    print(f"--- Processing: {month_col_name}: {file.name} ---")
    df_file = pd.read_parquet(file)
    
    # Normalize data:
    normalized_file = normalize(df_file)

    # Applying QA rules, add QA summary to final QA report:
    file_flag = run_quality_check(normalized_file, month_int)
    flag_summary = summarize_qa_flags(file_flag)
    final_qa_report_df[month_col_name] = flag_summary

    # Clean data and take flag of not removed rows:
    file_cleaned, file_standard = clean(normalized_file, file_flag)

    # Save cleaned file to folder processed/cleaned_data
    cleaned_out = cleaned_dir / f"cleaned_{file.name}"
    if cleaned_out.exists():
        print(f"Skipped (already exists): {cleaned_out.name}")
    else:
        file_cleaned.to_parquet(cleaned_out, index=False, engine="pyarrow")
        print(f"Saved cleaned file: {cleaned_out.name}")
    
    # Save flags for later analysis to folder processed/flags_for_analysis
    flag_out = flag_dir / f"flag_{file.name}"
    if flag_out.exists():
        print(f"Skipped (already exists): {flag_out.name}")
    else:
        file_standard.to_parquet(flag_out, index=False, engine="pyarrow")
        print(f"Saved flag file for upcoming analysis: {flag_out.name}")

# Save QA summary to folder reports
qa_path = reports_dir / "qa_summary.csv"
final_qa_report_df.to_csv(qa_path, index=False)
print(f"QA report saved to {qa_path}")
print("Data is ready for analysis!")


--- Processing: January: yellow_tripdata_2021-01.parquet ---
Saved cleaned file: cleaned_yellow_tripdata_2021-01.parquet
Saved flag file for upcoming analysis: flag_yellow_tripdata_2021-01.parquet
--- Processing: February: yellow_tripdata_2021-02.parquet ---
Saved cleaned file: cleaned_yellow_tripdata_2021-02.parquet
Saved flag file for upcoming analysis: flag_yellow_tripdata_2021-02.parquet
--- Processing: March: yellow_tripdata_2021-03.parquet ---
Saved cleaned file: cleaned_yellow_tripdata_2021-03.parquet
Saved flag file for upcoming analysis: flag_yellow_tripdata_2021-03.parquet
--- Processing: April: yellow_tripdata_2021-04.parquet ---
Saved cleaned file: cleaned_yellow_tripdata_2021-04.parquet
Saved flag file for upcoming analysis: flag_yellow_tripdata_2021-04.parquet
--- Processing: May: yellow_tripdata_2021-05.parquet ---
Saved cleaned file: cleaned_yellow_tripdata_2021-05.parquet
Saved flag file for upcoming analysis: flag_yellow_tripdata_2021-05.parquet
--- Processing: June: 

In [32]:
df3 = pd.read_parquet("../processed/cleaned_data/cleaned_yellow_tripdata_2021-05.parquet")
df3.head(10)

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,...,ratecodeID_name,payment_type_name,PU_Borough,PU_Zone,DO_Borough,DO_Zone,trip_duration_seconds,trip_duration_minutes,avg_speed_mph,pickup_day_of_week
0,2021-05-01 00:37:18,2021-05-01 00:41:07,2.0,0.7,1.0,141,263,1,5.0,3.0,...,Standard rate,Credit card,Manhattan,Lenox Hill West,Manhattan,Yorkville West,229.0,4.0,11.0,Saturday
1,2021-05-01 00:43:01,2021-05-01 00:49:19,1.0,1.4,1.0,263,75,2,6.5,3.0,...,Standard rate,Cash,Manhattan,Yorkville West,Manhattan,East Harlem South,378.0,6.0,13.33,Saturday
2,2021-05-01 00:05:54,2021-05-01 00:31:46,1.0,5.7,1.0,142,129,2,21.5,3.0,...,Standard rate,Cash,Manhattan,Lincoln Square East,Queens,Jackson Heights,1552.0,26.0,13.22,Saturday
3,2021-05-01 00:08:21,2021-05-01 00:19:20,1.0,3.04,1.0,231,97,1,11.5,0.5,...,Standard rate,Credit card,Manhattan,TriBeCa/Civic Center,Brooklyn,Fort Greene,659.0,11.0,16.61,Saturday
4,2021-05-01 00:32:44,2021-05-01 00:48:44,1.0,4.04,1.0,148,17,1,15.5,0.5,...,Standard rate,Credit card,Manhattan,Lower East Side,Brooklyn,Bedford,960.0,16.0,15.15,Saturday
5,2021-05-01 00:08:33,2021-05-01 00:19:25,0.0,3.1,1.0,231,68,1,11.5,3.0,...,Standard rate,Credit card,Manhattan,TriBeCa/Civic Center,Manhattan,East Chelsea,652.0,11.0,17.12,Saturday
6,2021-05-01 00:34:16,2021-05-01 00:42:12,0.0,2.1,1.0,142,68,1,8.5,3.0,...,Standard rate,Credit card,Manhattan,Lincoln Square East,Manhattan,East Chelsea,476.0,8.0,15.88,Saturday
7,2021-05-01 00:25:53,2021-05-01 00:36:21,1.0,3.6,1.0,148,112,1,13.0,3.0,...,Standard rate,Credit card,Manhattan,Lower East Side,Brooklyn,Greenpoint,628.0,10.0,20.64,Saturday
8,2021-05-01 00:49:19,2021-05-01 01:01:18,2.0,3.1,1.0,148,229,1,12.0,3.0,...,Standard rate,Credit card,Manhattan,Lower East Side,Manhattan,Sutton Place/Turtle Bay North,719.0,12.0,15.52,Saturday
9,2021-05-01 00:06:23,2021-05-01 00:18:50,1.0,2.9,1.0,68,143,2,11.5,3.0,...,Standard rate,Cash,Manhattan,East Chelsea,Manhattan,Lincoln Square West,747.0,12.0,13.98,Saturday


In [36]:
df3.columns

Index(['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count',
       'trip_distance', 'RatecodeID', 'PULocationID', 'DOLocationID',
       'payment_type', 'fare_amount', 'extra', 'tip_amount', 'tolls_amount',
       'total_amount', 'congestion_surcharge', 'ratecodeID_name',
       'payment_type_name', 'PU_Borough', 'PU_Zone', 'DO_Borough', 'DO_Zone',
       'trip_duration_seconds', 'trip_duration_minutes', 'avg_speed_mph',
       'pickup_day_of_week'],
      dtype='object')