In [5]:
# Import necessary libraries
import pandas as pd
import numpy as np
from google.cloud import storage

# Set Pandas options to always display floats with a decimal point (not scientific notation)
pd.set_option('display.float_format', '{:.2f}'.format)
pd.set_option('display.width', 1000)


In [26]:

def clean_data(df):
    
    # Remove unnecessary columns
    columns_to_drop = ['VendorID', 'RatecodeID', 'store_and_fwd_flag',  'payment_type', 'congestion_surcharge', 'Airport_fee'] 
    df = df.drop(columns=columns_to_drop, errors='ignore')

    # Remove rows with null values
    df = df.dropna()

    rename_columns = {
        'tpep_pickup_datetime': 'Pick_up_time',
        'tpep_dropoff_datetime': 'Drop_off_time',
        'passenger_count': 'Passenger_count',
        'trip_distance': 'Trip_distance',
        'PULocationID': 'Pick_up_location',
        'DOLocationID': 'Drop_off_location',
        'fare_amount': 'Fare_amount',
        'extra': 'Extra_charge',
        'mta_tax': 'Mta_tax',
        'tip_amount': 'Tip_amount',
        'tolls_amount': 'Tolls_amount',
        'improvement_surcharge': 'Improvement_surcharge',
        'total_amount': 'Total_amount'
    }
    df = df.rename(columns=rename_columns)

    # Apply appropriate data types
    df = df.astype({
       'Pick_up_time': 'datetime64[us]',
        'Drop_off_time': 'datetime64[us]',
        'Passenger_count': 'float64',
        'Trip_distance': 'float64',
        'Pick_up_location': 'int64',
        'Drop_off_location': 'int64',
        'Fare_amount': 'float64',
        'Extra_charge': 'float64',
        'Mta_tax': 'float64',
        'Tip_amount': 'float64',
        'Tolls_amount': 'float64',
        'Improvement_surcharge': 'float64',
        'Total_amount': 'float64'
    })

    return df


In [27]:
# Create a client object that points to GCS
storage_client = storage.Client()

# Define the GCS bucket name and prefix (if any)
bucket_name = 'my-bigdata-project-md'
input_prefix = 'landing/'
output_prefix = 'cleaned/'

# Get a list of the 'blobs' (objects or files) in the bucket
blobs = storage_client.list_blobs(bucket_name, prefix=input_prefix)



In [28]:
# Iterate through the list and clean each file
for blob in blobs:
    if blob.name.endswith('.parquet'):
        print(f"Processing file: {blob.name}")
        # Read the data
        df = pd.read_parquet(f"gs://{bucket_name}/{blob.name}", engine='pyarrow')

        # Clean the data
        cleaned_df = clean_data(df)
        # Save the cleaned data to the cleaned folder
        output_blob_name = blob.name.replace(input_prefix, output_prefix)
        cleaned_df.to_parquet(f"gs://{bucket_name}/{output_blob_name}", engine='pyarrow')
        print(f"Cleaned data saved to: {output_blob_name}")

        

print("Data cleaning completed.")


Processing file: landing/yellow_tripdata_2015-01.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2015-01.parquet
Processing file: landing/yellow_tripdata_2015-02.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2015-02.parquet
Processing file: landing/yellow_tripdata_2015-03.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2015-03.parquet
Processing file: landing/yellow_tripdata_2015-04.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2015-04.parquet
Processing file: landing/yellow_tripdata_2015-05.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2015-05.parquet
Processing file: landing/yellow_tripdata_2015-06.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2015-06.parquet
Processing file: landing/yellow_tripdata_2015-07.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2015-07.parquet
Processing file: landing/yellow_tripdata_2015-08.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2015-08.parquet
Processing file: landing/yellow_tripdata

Cleaned data saved to: cleaned/yellow_tripdata_2020-09.parquet
Processing file: landing/yellow_tripdata_2020-10.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2020-10.parquet
Processing file: landing/yellow_tripdata_2020-11.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2020-11.parquet
Processing file: landing/yellow_tripdata_2020-12.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2020-12.parquet
Processing file: landing/yellow_tripdata_2021-01.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2021-01.parquet
Processing file: landing/yellow_tripdata_2021-02.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2021-02.parquet
Processing file: landing/yellow_tripdata_2021-03.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2021-03.parquet
Processing file: landing/yellow_tripdata_2021-04.parquet
Cleaned data saved to: cleaned/yellow_tripdata_2021-04.parquet
Processing file: landing/yellow_tripdata_2021-05.parquet
Cleaned data saved to: cleaned/yellow_tr

In [29]:
cleaned_df.describe()


Unnamed: 0,Pick_up_time,Drop_off_time,Passenger_count,Trip_distance,Pick_up_location,Drop_off_location,Fare_amount,Extra_charge,Mta_tax,Tip_amount,Tolls_amount,Improvement_surcharge,Total_amount
count,3196564,3196564,3196564.0,3196564.0,3196564.0,3196564.0,3196564.0,3196564.0,3196564.0,3196564.0,3196564.0,3196564.0,3196564.0
mean,2023-12-15 04:05:00.811431,2023-12-15 04:23:08.783988,1.41,3.39,165.39,164.32,19.51,1.55,0.48,3.62,0.58,0.97,28.54
min,2002-12-31 22:16:54,2003-01-01 15:02:08,0.0,0.0,1.0,1.0,-1087.3,-7.5,-0.5,-80.0,-70.0,-1.0,-1094.05
25%,2023-12-07 20:59:03.750000,2023-12-07 21:16:01,1.0,1.0,132.0,114.0,9.3,0.0,0.5,1.0,0.0,1.0,15.96
50%,2023-12-14 16:43:47,2023-12-14 17:07:41,1.0,1.7,162.0,162.0,13.5,1.0,0.5,2.88,0.0,1.0,21.35
75%,2023-12-21 16:31:34,2023-12-21 16:53:03.250000,2.0,3.2,234.0,234.0,22.6,2.5,0.5,4.56,0.0,1.0,31.44
max,2024-01-03 19:42:57,2024-01-03 20:15:55,9.0,161726.1,265.0,265.0,2320.11,51.68,4.0,4174.0,161.38,1.0,4269.16
std,,,0.91,94.98,63.83,69.55,19.18,1.83,0.12,4.78,2.24,0.22,24.15
