Part 1: Data Ingestion

In [7]:
#Downloading the data

import requests
import os 

#This is the directory where the files are stored
os.makedirs("data/raw", exist_ok=True)

#We then make a function to call to download each file
def download_file(url, destination_path):
    print(f"Beginning download of {url}...")
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status() # Check if request was successful

        #We then open the file to download the data to
        with open(destination_path, 'wb') as file:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    file.write(chunk)

        print(f"Download completed and saved to {destination_path}.")
    except requests.exceptions.RequestException as e:
        print(f"An error occurred while downloading {url}: {e}")

In [None]:
#Using our previous download helper function, we download the two required files

yellow_taxi_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
taxi_zone_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"

download_file(yellow_taxi_url, "data/raw/yellow_tripdata_2024-01.parquet")
download_file(taxi_zone_url, "data/raw/taxi_zone_lookup.csv")

print("All files downloaded successfully.")

Begin download of https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet...
Download completed and saved to data/raw/yellow_tripdata_2024-01.parquet.
Begin download of https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv...
Download completed and saved to data/raw/taxi_zone_lookup.csv.
All files downloaded successfully.


In [29]:
#We now load the data in a polars dataframe for data validation

import polars as pl

#Load the datasets
taxi_trip_df = pl.read_parquet("data/raw/yellow_tripdata_2024-01.parquet")
taxi_zone_df = pl.read_csv("data/raw/taxi_zone_lookup.csv")

In [30]:
#Verifying all expected columns exist

expected_columns = {
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "passenger_count",
    "trip_distance",
    "fare_amount",
    "tip_amount",
    "total_amount",
    "payment_type",
}

for col in expected_columns:
    if col not in taxi_trip_df.columns:
        raise Exception(f"Missing column: {col} in the taxi trip dataset, aborting...")


In [31]:
#Checking date columns are of valid datetime type

date_columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]

for col in date_columns:
    #The below if state checks if the column for the two essential date columns is of datetime type of the various measurement units
    if taxi_trip_df[col].dtype not in (pl.Datetime, pl.Datetime("us"), pl.Datetime("ms"), pl.Datetime("ns")):
        raise Exception(f"Column {col} is not of datetime type, aborting...")
    

In [32]:
#Printing a summary of the data validation to the console
print("Summary of Data Validation: \n")

print("All expected columns are present in the taxi trip dataset.")
print("Date columns are of valid datetime type in taxi trip dataset. \n")

print(f"Total number of rows in taxi trip dataset: {len(taxi_trip_df)}")
print(f"Total number of rows in taxi zone dataset: {len(taxi_zone_df)}\n")

print(f"Taxi trip dataset schema: \n {taxi_trip_df.schema}")
print(f"Taxi zone dataset schema: \n {taxi_zone_df.schema}\n")

print("Data validation completed successfully.")

Summary of Data Validation: 

All expected columns are present in the taxi trip dataset.
Date columns are of valid datetime type in taxi trip dataset. 

Total number of rows in taxi trip dataset: 2964624
Total number of rows in taxi zone dataset: 265

Taxi trip dataset schema: 
 Schema([('VendorID', Int32), ('tpep_pickup_datetime', Datetime(time_unit='ns', time_zone=None)), ('tpep_dropoff_datetime', Datetime(time_unit='ns', time_zone=None)), ('passenger_count', Int64), ('trip_distance', Float64), ('RatecodeID', Int64), ('store_and_fwd_flag', String), ('PULocationID', Int32), ('DOLocationID', Int32), ('payment_type', Int64), ('fare_amount', Float64), ('extra', Float64), ('mta_tax', Float64), ('tip_amount', Float64), ('tolls_amount', Float64), ('improvement_surcharge', Float64), ('total_amount', Float64), ('congestion_surcharge', Float64), ('Airport_fee', Float64)])
Taxi zone dataset schema: 
 Schema([('LocationID', Int64), ('Borough', String), ('Zone', String), ('service_zone', String)]

Part 2: Data Transformation & Analysis