For this project, **the NYC Taxi Trip dataset** (specifically, **the Yellow Taxi Trip dataset**) was used because of its suitability for large-scale data processing, it being publicly available and fit for this project.

First, we start by mounting the Google Drive and save in a path

In [1]:
from google.colab import drive
import os
import shutil

# Mount Google Drive
drive.mount('/content/drive')

# Define folder path
data_path = '/content/drive/MyDrive/NYC_Taxi_Trip'

# Clean up the folder if it already exists, and recreate it
if os.path.exists(data_path):
    shutil.rmtree(data_path)
os.makedirs(data_path, exist_ok=True)

print(f"Environment setup complete. Data will be saved in: {data_path}")


Mounted at /content/drive
Environment setup complete. Data will be saved in: /content/drive/MyDrive/NYC_Taxi_Trip


**Download the files**

To meet the 2+ GB requirement, multiple months were downloaded  (i.e., Jan, Feb, Mar, Apr of 2024) and concatenated into a single DataFrame.

In [2]:
files = {
    "yellow_tripdata_2024-01.parquet": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet",
    "yellow_tripdata_2024-02.parquet": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet",
    "yellow_tripdata_2024-03.parquet": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet",
    "yellow_tripdata_2024-04.parquet": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-04.parquet"
}

# Download only if not already present
for file_name, url in files.items():
    file_path = os.path.join(data_path, file_name)
    if not os.path.exists(file_path):
        print(f"Downloading {file_name}...")
        !wget -P {data_path} {url}
    else:
        print(f"{file_name} already exists. Skipping download.")

print("\nAll files are downloaded and verified.")


Downloading yellow_tripdata_2024-01.parquet...
--2025-05-20 22:11:16--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.245.50, 65.8.245.51, 65.8.245.171, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.8.245.50|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49961641 (48M) [binary/octet-stream]
Saving to: ‚Äò/content/drive/MyDrive/NYC_Taxi_Trip/yellow_tripdata_2024-01.parquet‚Äô


2025-05-20 22:11:17 (33.1 MB/s) - ‚Äò/content/drive/MyDrive/NYC_Taxi_Trip/yellow_tripdata_2024-01.parquet‚Äô saved [49961641/49961641]

Downloading yellow_tripdata_2024-02.parquet...
--2025-05-20 22:11:17--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.245.50, 65.8.245.51, 65.8.245.171, ...
Connecting to d37ci6vzurychx.cloudf

**Checking if the paths are downloaded and formatted properly**

In [3]:
print("Files in the folder:")
print(os.listdir(data_path))


Files in the folder:
['yellow_tripdata_2024-01.parquet', 'yellow_tripdata_2024-02.parquet', 'yellow_tripdata_2024-03.parquet', 'yellow_tripdata_2024-04.parquet']


****

**The code below was done to prevent Google Colab from timing out and having to start afresh**

In [4]:
import threading
import time
import IPython

# Function to keep the session alive
def prevent_timeout():
    while True:
        time.sleep(60)
        IPython.display.display(IPython.display.Javascript('console.log("Preventing Timeout...");'))

# Start the thread
thread = threading.Thread(target=prevent_timeout)
thread.daemon = True
thread.start()



**The next step is to read the file**.

We'll try different methods for reading these parquet files and compare their computational efficiency. The methods we'll explore are:

1) Pandas

2) Dask

**NB: Modin was explored but resulted to having errors due to RAM not being enough for the execution. So, the decision was made to make use of just Pandas and Dask.**

**1**) **PANDAS**

In [5]:
import pandas as pd
import time

# Define the folder path and file names
data_path = '/content/drive/MyDrive/NYC_Taxi_Trip'
file_paths = [
    f"{data_path}/yellow_tripdata_2024-01.parquet",
    f"{data_path}/yellow_tripdata_2024-02.parquet",
    f"{data_path}/yellow_tripdata_2024-03.parquet",
    f"{data_path}/yellow_tripdata_2024-04.parquet"
]

# Start timer
start_time = time.time()

# Read all four files into DataFrames
pandas_dfs = [pd.read_parquet(file) for file in file_paths]

# Concatenate them into a single DataFrame
pandas_full_data = pd.concat(pandas_dfs, ignore_index=True)

# End timer
end_time = time.time()

# Display the time taken and DataFrame info
print(f"Time taken to read with Pandas: {end_time - start_time:.2f} seconds")
print(f"DataFrame shape: {pandas_full_data.shape}")
print("\n--- Sample Data ---")
print(pandas_full_data.head())


Time taken to read with Pandas: 6.01 seconds
DataFrame shape: (13069067, 19)

--- Sample Data ---
   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         2  2024-01-01 00:57:55   2024-01-01 01:17:43              1.0   
1         1  2024-01-01 00:03:00   2024-01-01 00:09:36              1.0   
2         1  2024-01-01 00:17:06   2024-01-01 00:35:01              1.0   
3         1  2024-01-01 00:36:38   2024-01-01 00:44:56              1.0   
4         1  2024-01-01 00:46:51   2024-01-01 00:52:57              1.0   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           1.72         1.0                  N           186            79   
1           1.80         1.0                  N           140           236   
2           4.70         1.0                  N           236            79   
3           1.40         1.0                  N            79           211   
4           0.80         1.0                  N         

**Pandas Approach (pd.read_parquet)**:

This is the traditional way of reading parquet files in Python.
It loads the entire data into memory, which is fast for small datasets but not memory-efficient for large datasets.

We measure the time it takes to load and check the shape of the DataFrame.

**What This Code Does**:

- Lists the file paths for each parquet file we downloaded.

- Reads each file into separate DataFrames.

- Concatenates them into one large DataFrame.

- Measures the time taken to read and concatenate.

- Displays the first few rows for inspection.

**The expected output: The Analysis** shows;

1) Time Taken: 6.01 seconds is pretty decent for 13 million rows.

2) DataFrame Shape: 13,069,067 rows √ó 19 columns which is expected for combining four large parquet files.

3) Data Sample: Data looks clean and well-structured with clear column names.

**2**)  **DASK**

In [6]:
import dask.dataframe as dd
import time

# Start timer
start_time = time.time()

# Read all Parquet files using Dask
dask_df = dd.read_parquet(f"{data_path}/*.parquet")

# Compute the DataFrame to load it fully into memory and measure the time
dask_full_data = dask_df.compute()

# End timer
end_time = time.time()

# Display the time taken and DataFrame info
print(f"Time taken to read with Dask: {end_time - start_time:.2f} seconds")
print(f"DataFrame shape: {dask_full_data.shape}")
print("\n--- Sample Data ---")
print(dask_full_data.head())



Time taken to read with Dask: 10.94 seconds
DataFrame shape: (13069067, 19)

--- Sample Data ---
   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         2  2024-01-01 00:57:55   2024-01-01 01:17:43              1.0   
1         1  2024-01-01 00:03:00   2024-01-01 00:09:36              1.0   
2         1  2024-01-01 00:17:06   2024-01-01 00:35:01              1.0   
3         1  2024-01-01 00:36:38   2024-01-01 00:44:56              1.0   
4         1  2024-01-01 00:46:51   2024-01-01 00:52:57              1.0   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           1.72         1.0                  N           186            79   
1           1.80         1.0                  N           140           236   
2           4.70         1.0                  N           236            79   
3           1.40         1.0                  N            79           211   
4           0.80         1.0                  N          

**Dask Approach (dd.read_parquet)**:

Dask is designed to handle large datasets that don't fit into memory.
It reads everything at once, loads data in chunks and processes them in parallel.

The DataFrame created is a "lazy" one, which means it doesn't actually load all the data until you perform an operation like .compute().

It is said to be much more scalable than Pandas for large datasets.

**Analysis**:

- Time Taken: 10.94 seconds with Dask compared to 6.01 seconds with Pandas.

- DataFrame Shape: Identical to the Pandas read ‚Äî 13,069,067 rows √ó 19 columns.

- Data Sample: Data looks clean and matches the structure from the Pandas read.

**Observations**:

Dask is slower here than Pandas which is surprising but maybe not entirely unexpected for these reasons:

- Dask shines when the dataset does not fit into memory.

- the dataset was small enough (500 MB combined) for Pandas to load it faster since it works in-memory.

- Dask is optimized for parallelism and out-of-memory computation, but in this case, Pandas did not face memory constraints.

**Probably, Dask would have done better**

If this was a 5GB or 10GB dataset, it would have most likely outperform Pandas.

If we were performing complex operations (like groupby, joins, or merges) on large chunks, Dask's chunk-based processing would be faster.

**DATA VALIDATION and CLEANING**

- Checking for missing values

In [7]:
# Step 1: Check for Missing Values
print("üîç Checking for Missing Values...")

# Calculate the number and percentage of missing values
missing_values = pandas_full_data.isna().sum()
missing_percentage = (missing_values / len(pandas_full_data)) * 100

# Combine into a DataFrame for better readability
missing_data = pd.DataFrame({
    'Missing Values': missing_values,
    'Percentage (%)': missing_percentage
}).sort_values(by='Missing Values', ascending=False)

# Display columns with missing values only
missing_data = missing_data[missing_data['Missing Values'] > 0]

# Output the result
if missing_data.empty:
    print("‚úÖ No missing values found!")
else:
    print("‚ö†Ô∏è Missing values detected:")
    print(missing_data)


üîç Checking for Missing Values...
‚ö†Ô∏è Missing values detected:
                      Missing Values  Percentage (%)
store_and_fwd_flag           1160538        8.880037
RatecodeID                   1160538        8.880037
passenger_count              1160538        8.880037
Airport_fee                  1160538        8.880037
congestion_surcharge         1160538        8.880037


All the columns with missing values have the same number of NaNs (8.88% of the dataset). It may be that these rows might be related to a specific set of trips that might not have been recorded.

Further investigation into the dataset will give a clearer view on what could be done with the missing values.

In [8]:
# Filter rows with missing values
missing_data_rows = pandas_full_data[pandas_full_data.isna().any(axis=1)]

# Display basic info
print("Number of rows with missing values:", len(missing_data_rows))
print("\n--- Sample of rows with missing values ---")
display(missing_data_rows.head())

# Check distribution by month
print("\nüîç Distribution by Month:")
missing_data_rows['tpep_pickup_datetime'] = pd.to_datetime(missing_data_rows['tpep_pickup_datetime'])
print(missing_data_rows['tpep_pickup_datetime'].dt.month.value_counts())

# Check distribution by VendorID
print("\nüîç Distribution by VendorID:")
print(missing_data_rows['VendorID'].value_counts())

# Check distribution by PULocationID (pickup location)
print("\nüîç Top 10 Pickup Locations with Missing Data:")
print(missing_data_rows['PULocationID'].value_counts().head(10))

# Check if all columns are missing at the same time
print("\nüîç Are all these columns missing simultaneously?")
print(missing_data_rows[['store_and_fwd_flag', 'RatecodeID', 'passenger_count', 'Airport_fee', 'congestion_surcharge']].isna().all(axis=1).value_counts())


Number of rows with missing values: 1160538

--- Sample of rows with missing values ---


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2824462,2,2024-01-01 00:34:19,2024-01-01 00:51:22,,2.04,,,143,141,0,12.72,0.0,0.5,0.0,0.0,1.0,16.72,,
2824463,1,2024-01-01 00:14:31,2024-01-01 00:19:29,,1.6,,,236,238,0,9.3,1.0,0.5,2.86,0.0,1.0,17.16,,
2824464,1,2024-01-01 00:35:11,2024-01-01 01:13:40,,0.0,,,142,79,0,21.01,0.0,0.5,0.0,0.0,1.0,25.01,,
2824465,1,2024-01-01 00:33:37,2024-01-01 00:50:34,,0.0,,,237,4,0,17.79,0.0,0.5,0.0,0.0,1.0,21.79,,
2824466,1,2024-01-01 00:49:04,2024-01-01 01:01:16,,0.0,,,244,50,0,34.65,0.0,0.5,0.0,0.0,1.0,38.65,,



üîç Distribution by Month:
tpep_pickup_datetime
3    426190
4    408576
2    185610
1    140162
Name: count, dtype: int64

üîç Distribution by VendorID:
VendorID
2    857921
1    301604
6      1013
Name: count, dtype: int64

üîç Top 10 Pickup Locations with Missing Data:
PULocationID
79     49534
236    37802
249    35310
239    35041
161    33311
234    30155
68     27512
170    27408
107    27300
262    27298
Name: count, dtype: int64

üîç Are all these columns missing simultaneously?
True    1160538
Name: count, dtype: int64


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  missing_data_rows['tpep_pickup_datetime'] = pd.to_datetime(missing_data_rows['tpep_pickup_datetime'])


From the output above, it will be better to clean by removing/dropping the rows with missing values.

In [9]:
# üßπ Step 1: Remove rows where all five columns are missing
columns_to_check = [
    'passenger_count',
    'RatecodeID',
    'store_and_fwd_flag',
    'congestion_surcharge',
    'Airport_fee'
]

# Filtering out rows where all specified columns are NaN
cleaned_data = dask_full_data.dropna(subset=columns_to_check)

# üìù Step 2: Validation
print(f"Original DataFrame Shape: {dask_full_data.shape}")
print(f"New DataFrame Shape after Cleanup: {cleaned_data.shape}")

# Checking for any remaining missing values
missing_summary = cleaned_data.isna().sum()
print("\nüîç Remaining Missing Values after Cleanup:")
print(missing_summary[missing_summary > 0])

# Checking the distribution by month and VendorID
print("\nüîç Distribution by Month after Cleanup:")
print(cleaned_data['tpep_pickup_datetime'].dt.month.value_counts())

print("\nüîç Distribution by VendorID after Cleanup:")
print(cleaned_data['VendorID'].value_counts())


Original DataFrame Shape: (13069067, 19)
New DataFrame Shape after Cleanup: (11908529, 19)

üîç Remaining Missing Values after Cleanup:
Series([], dtype: int64)

üîç Distribution by Month after Cleanup:
tpep_pickup_datetime
3     3156421
4     3105706
1     2824459
2     2821923
12         19
5           1
Name: count, dtype: int64

üîç Distribution by VendorID after Cleanup:
VendorID
2    9024022
1    2884507
Name: count, dtype: int64


Next is to validate the data and handle all anormalities

In [10]:
# Step 1: Investigate the anomalies for May and December
anomalies = cleaned_data[cleaned_data['tpep_pickup_datetime'].dt.month.isin([5, 12])]
print("\nüîé Anomalous Records (May and December):")
print(anomalies)

# Step 2: Data Type Validation
print("\nüîé Data Types:")
print(cleaned_data.dtypes)

# Step 3: Range Checks
print("\nüîé Range Check Summaries:")
print(f"Passenger Count: {cleaned_data['passenger_count'].min()} to {cleaned_data['passenger_count'].max()}")
print(f"Fare Amount: {cleaned_data['fare_amount'].min()} to {cleaned_data['fare_amount'].max()}")
print(f"Trip Distance: {cleaned_data['trip_distance'].min()} to {cleaned_data['trip_distance'].max()}")



üîé Anomalous Records (May and December):
         VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
256             2  2023-12-31 23:56:46   2024-01-01 00:12:06              2.0   
369             2  2023-12-31 23:39:17   2023-12-31 23:42:00              2.0   
753             2  2023-12-31 23:41:02   2023-12-31 23:48:03              1.0   
2210            2  2023-12-31 23:57:17   2024-01-01 00:01:50              1.0   
2615            2  2023-12-31 23:56:45   2024-01-01 00:00:28              1.0   
2985            2  2023-12-31 23:49:12   2024-01-01 00:04:32              1.0   
3176            2  2023-12-31 23:47:28   2023-12-31 23:57:07              2.0   
4137            2  2023-12-31 23:58:35   2024-01-01 00:13:06              6.0   
4142            2  2023-12-31 23:58:37   2024-01-01 00:08:37              2.0   
8628            2  2023-12-31 23:54:27   2024-01-01 00:13:12              1.0   
53119           2  2002-12-31 22:59:39   2002-12-31 23:05:41     

From the Output so far, it is noticed that
May (Month 5) has just 1 record and also December (Month 12) has Only 19 records.

Given how large other months are (e.g., over 3 million records for March and also for April), this clearly shows May and December data are either incomplete or corrupted. They can be safely removed to keep this analysis meaningful.

In [11]:
# Remove records from May (5) and December (12)
cleaned_data = cleaned_data[~cleaned_data['tpep_pickup_datetime'].dt.month.isin([5, 12])]

# Confirm removal
print("\n‚úÖ Distribution by Month after Removal:")
print(cleaned_data['tpep_pickup_datetime'].dt.month.value_counts().sort_index())



‚úÖ Distribution by Month after Removal:
tpep_pickup_datetime
1    2824459
2    2821923
3    3156421
4    3105706
Name: count, dtype: int64


Next is to create YAML schema with:

Column names

File separator: |

and validate the DataFrame against this schema.

In [12]:
import yaml

# Define schema content
schema = {
    'separator': '|',
    'columns': list(cleaned_data.columns)
}

# Save schema to YAML file
with open('schema.yaml', 'w') as file:
    yaml.dump(schema, file, sort_keys=False)

print("\n‚úÖ YAML schema file 'schema.yaml' created.")



‚úÖ YAML schema file 'schema.yaml' created.


In [13]:
# Load schema from YAML
with open('schema.yaml', 'r') as file:
    loaded_schema = yaml.safe_load(file)

# Compare column names
df_columns = list(cleaned_data.columns)
schema_columns = loaded_schema['columns']

print("\n‚úÖ Column Match:", df_columns == schema_columns)
if df_columns != schema_columns:
    print("‚ö†Ô∏è Mismatch detected!")
else:
    print("‚úÖ Column names and order match the schema.")



‚úÖ Column Match: True
‚úÖ Column names and order match the schema.


In [14]:
# Output file path
output_file = 'cleaned_data_output.txt.gz'

# Write to .gz file with pipe (|) separator
cleaned_data.to_csv(output_file, sep='|', index=False, compression='gzip')

print(f"\n‚úÖ File saved as: {output_file}")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


‚úÖ File saved as: cleaned_data_output.txt.gz


In [15]:
import os

# File size in MB
file_size_mb = os.path.getsize(output_file) / (1024 * 1024)

# Summary
print("\nüìä File Summary:")
print(f"Total Rows: {cleaned_data.shape[0]}")
print(f"Total Columns: {cleaned_data.shape[1]}")
print(f"File Size: {file_size_mb:.2f} MB")



üìä File Summary:
Total Rows: 11908509
Total Columns: 19
File Size: 206.27 MB


In [16]:
from google.colab import files

# Download final data file
files.download("cleaned_data_output.txt.gz")

# Download YAML schema
files.download("schema.yaml")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [17]:
requirements = """pandas
pyyaml
dask[dataframe]
modin[all]
ray
"""

with open("requirements.txt", "w") as f:
    f.write(requirements)


In [18]:
from google.colab import files
files.download("requirements.txt")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
from google.colab import drive

# Attempt to mount Google Drive with authentication
drive.mount('/content/drive', force_remount=True)