In [1]:
%pip install polars
%pip install duckdb
%pip install pyarrow
%pip install numpy
%pip install pandas
import os
import httpx 
import polars as pl
import duckdb
import time
import shutil
import pyarrow
import numpy
import pandas


Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
#Using httpx: streams in files chunk by chunk with some error checking
def download_file(url, filename):
    print(f"\nAttempting to download {filename}.......\n")
    timeout = httpx.Timeout(10.0)

    with httpx.Client(timeout = timeout) as client:
        with client.stream("GET", url) as response:
            response.raise_for_status()
            with open(filename, "wb") as file:
                for chunk in response.iter_bytes():
                    file.write(chunk)
    print("\n[Download Successful!....]\n")
    
    
    

In [3]:
#Downloads the necessary files
download_file("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet", "yellow_tripdata_2024-01.parquet")

download_file("https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv", "taxi_zone_lookup.csv")



Attempting to download yellow_tripdata_2024-01.parquet.......


[Download Successful!....]


Attempting to download taxi_zone_lookup.csv.......


[Download Successful!....]



In [4]:
#Verifies that all columns exist in the DataFrame
df_parquet = pl.read_parquet("yellow_tripdata_2024-01.parquet")

column_names = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'PULocationID', 
                'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'tip_amount', 'total_amount', 'payment_type']

missing_columns = [col for col in column_names if col not in df_parquet.columns]
if missing_columns:
    raise ValueError(f"\nColumns{missing_columns} do not exist in the DataFrame.\n")

print("\n[...All Columns Present]\n")


[...All Columns Present]



In [5]:
#Ensures the Date columns are valid datetypes
for col in column_names:
    if col.endswith('_datetime'):
        if df_parquet[col].dtype != pl.Datetime:
            raise ValueError(f"Column {col} is not a valid datetime type")
print("\n[Date Columns were successfully varified]\n")


[Date Columns were successfully varified]



In [6]:
#Report total row count
print(f"\nThe DataFrame contains: {len(df_parquet)} rows\n")
print("\n [Data Validation was a success......]")


The DataFrame contains: 2964624 rows


 [Data Validation was a success......]


In [7]:
#Removing nulls and filtering off data according to specifications
filterd_df = df_parquet.drop_nulls(subset = ['tpep_pickup_datetime', 'tpep_dropoff_datetime','PULocationID', 
                'DOLocationID','fare_amount'])
print(f"There were {(len(df_parquet)-len(filterd_df))} rows removed because of the NULL values in critical columns")



filterd_df = filterd_df.filter((pl.col('trip_distance') > 0) & (pl.col('fare_amount') >= 0) & (pl.col('fare_amount') <= 500) &
                               ((pl.col('tpep_pickup_datetime') < pl.col('tpep_dropoff_datetime'))))

#Displaying how many rows were removed
deleted_rows_tripdistance = df_parquet.filter(
    (pl.col('trip_distance') <= 0)
)
print(f"\nThe following rows were deleted because trip distance was 0: {len(deleted_rows_tripdistance)} rows\n")

deleted_rows_fare = df_parquet.filter((pl.col('fare_amount') < 0) | (pl.col('fare_amount') > 500))
print(f"\nThe following rows were deleted because of an invalid fare: {len(deleted_rows_tripdistance)} rows\n")

deleted_rows_dropoff = df_parquet.filter(((pl.col('tpep_pickup_datetime') >= pl.col('tpep_dropoff_datetime'))))
print(f"\nThe following rows were deleted because drop off time was greater than pick up time: {len(deleted_rows_dropoff)} rows \n")




There were 0 rows removed because of the NULL values in critical columns

The following rows were deleted because trip distance was 0: 60371 rows


The following rows were deleted because of an invalid fare: 60371 rows


The following rows were deleted because drop off time was greater than pick up time: 870 rows 



In [8]:
#Creating the 4 derived columns 
filterd_df = filterd_df.with_columns((pl.col('tpep_dropoff_datetime') - pl.col('tpep_pickup_datetime')).alias('trip_duration_minutes'))

# Calculate trip duration in minutes and add it as a new column
filterd_df = filterd_df.with_columns(
    (pl.col('tpep_dropoff_datetime') - pl.col('tpep_pickup_datetime')).alias('trip_duration')
)

# Convert the Duration to seconds for easier division
filterd_df = filterd_df.with_columns(
    (pl.col("trip_duration").dt.total_seconds() / 60.0).alias("trip_duration_minutes")
)

# Calculate trip speed in miles per hour and add it as a new column and handling division by zero

filterd_df = filterd_df.with_columns(
    (pl.col('trip_distance') / pl.when(pl.col('trip_duration_minutes') > 0).then(pl.col('trip_duration_minutes')).otherwise(1)).alias('trip_speed_mph')
)

#Extract the hour from the pickup time stamp
filterd_df = filterd_df.with_columns(pl.col("tpep_pickup_datetime").dt.hour().alias("pickup_hour"))

#Extract the day from the pickup time stamp
filterd_df = filterd_df.with_columns(pl.col("tpep_pickup_datetime").dt.day().alias("pickup_day_of_week"))






In [9]:
#Joining to taxi zones to the cleaned DataFrame
df_taxi_zones = pl.read_csv("taxi_zone_lookup.csv")
filterd_df = (
    filterd_df.join(
        df_taxi_zones.rename({
            "LocationID": "PULocationID",
            "Borough": "PU_Borough",
            "Zone": "PU_Zone",
            "service_zone": "PU_service_zone"
        }),
        on="PULocationID",
        how="left"
    )
    .join(
        df_taxi_zones.rename({
            "LocationID": "DOLocationID",
            "Borough": "DO_Borough",
            "Zone": "DO_Zone",
            "service_zone": "DO_service_zone"
        }),
        on="DOLocationID",
        how="left"
    )
)



In [10]:
#Begin Sql Queries
con = duckdb.connect(':memory:')
con.register("filterd_df", filterd_df)




<_duckdb.DuckDBPyConnection at 0x110f030b0>

In [11]:
destination_folder = 'queries/' 
os.makedirs(destination_folder, exist_ok = True)

In [12]:
#Answers m
result = con.execute('''
        SELECT 
            PU_Zone,
            COUNT(*) as Number_Of_Trips
        FROM
            filterd_df
        GROUP BY
            PU_Zone
        ORDER BY
            COUNT(*) DESC
        Limit 10
        ''').pl()
print(result)
result.write_parquet("queries/ten_busiest_pickupzones.parquet")
    

shape: (10, 2)
┌──────────────────────────────┬─────────────────┐
│ PU_Zone                      ┆ Number_Of_Trips │
│ ---                          ┆ ---             │
│ str                          ┆ i64             │
╞══════════════════════════════╪═════════════════╡
│ Midtown Center               ┆ 140161          │
│ Upper East Side South        ┆ 140131          │
│ JFK Airport                  ┆ 138474          │
│ Upper East Side North        ┆ 133975          │
│ Midtown East                 ┆ 104353          │
│ Times Sq/Theatre District    ┆ 102972          │
│ Penn Station/Madison Sq West ┆ 102160          │
│ Lincoln Square East          ┆ 101800          │
│ LaGuardia Airport            ┆ 87714           │
│ Upper West Side South        ┆ 86473           │
└──────────────────────────────┴─────────────────┘


Answers: m) What are the top 10 busiest pickup zones by total number of trips? (Include zone
names from lookup table)

In [13]:
#Answers n
result = con.execute(''' 
    SELECT
        HOUR(tpep_dropoff_datetime) as hour, 
        AVG(fare_amount) as avg_fare
    FROM filterd_df
    GROUP BY hour 
''').pl()

print(result)
result.write_parquet("queries/averagefare_by_hour.parquet")

shape: (24, 2)
┌──────┬───────────┐
│ hour ┆ avg_fare  │
│ ---  ┆ ---       │
│ i64  ┆ f64       │
╞══════╪═══════════╡
│ 0    ┆ 21.142538 │
│ 1    ┆ 19.829628 │
│ 2    ┆ 17.630085 │
│ 3    ┆ 18.052616 │
│ 4    ┆ 22.054805 │
│ …    ┆ …         │
│ 19   ┆ 17.8154   │
│ 20   ┆ 18.504986 │
│ 21   ┆ 18.293999 │
│ 22   ┆ 18.950692 │
│ 23   ┆ 20.303346 │
└──────┴───────────┘


n) What is the average fare amount for each hour of the day? (Order by hour)

In [14]:
#Answer o 
result = con.execute('''
    SELECT
        payment_type, COUNT(*) * 100 / SUM(COUNT(*)) OVER() as Tip_Percentage
    FROM filterd_df
    GROUP BY payment_type
''').pl()
print(result)
result.write_parquet("queries/tippercent_by_paymenttype.parquet")

shape: (5, 2)
┌──────────────┬────────────────┐
│ payment_type ┆ Tip_Percentage │
│ ---          ┆ ---            │
│ i64          ┆ f64            │
╞══════════════╪════════════════╡
│ 0            ┆ 4.015232       │
│ 2            ┆ 14.73513       │
│ 1            ┆ 80.081608      │
│ 4            ┆ 0.79706        │
│ 3            ┆ 0.37097        │
└──────────────┴────────────────┘


o) What percentage of tips use each payment type?

In [15]:
#Answer for p
result = con.execute('''
    SELECT 
        DAYNAME(tpep_dropoff_datetime) as day,
        AVG(tip_amount / fare_amount) * 100 as average_tip_amount
    FROM 
        filterd_df
    WHERE 
        fare_amount > 0 AND
        tip_amount IS NOT NULL AND
        fare_amount IS NOT NULL
    GROUP BY 
        day
''').pl()


print(result)
result.write_parquet("queries/averagetip_percent_by_dayoweek.parquet")

shape: (7, 2)
┌───────────┬────────────────────┐
│ day       ┆ average_tip_amount │
│ ---       ┆ ---                │
│ str       ┆ f64                │
╞═══════════╪════════════════════╡
│ Thursday  ┆ 24.445185          │
│ Tuesday   ┆ 20.968798          │
│ Saturday  ┆ 21.336866          │
│ Sunday    ┆ 20.272882          │
│ Wednesday ┆ 21.19645           │
│ Monday    ┆ 20.455245          │
│ Friday    ┆ 20.963301          │
└───────────┴────────────────────┘


p) What is the average tip percentage (tip_amount/fare_amount) by day of week, for
credit card payments only?

In [16]:
#Answer for q
result = con.execute(''' 
    SELECT
        PU_Zone,
        DO_Zone,
        COUNT(*) as Dropoff_Pickup_Combination_Count
    FROM 
        filterd_df
    GROUP BY
        PU_Zone, DO_Zone 
    ORDER BY
        Dropoff_Pickup_Combination_Count DESC
    LIMIT 5;
''').pl()


print(result)
result.write_parquet("queries/mostfrquent_pickup_dropoff_pairs.parquet")

shape: (5, 3)
┌───────────────────────┬───────────────────────┬─────────────────────────────────┐
│ PU_Zone               ┆ DO_Zone               ┆ Dropoff_Pickup_Combination_Cou… │
│ ---                   ┆ ---                   ┆ ---                             │
│ str                   ┆ str                   ┆ i64                             │
╞═══════════════════════╪═══════════════════════╪═════════════════════════════════╡
│ Upper East Side South ┆ Upper East Side North ┆ 21642                           │
│ Upper East Side North ┆ Upper East Side South ┆ 19199                           │
│ Upper East Side North ┆ Upper East Side North ┆ 15200                           │
│ Upper East Side South ┆ Upper East Side South ┆ 14116                           │
│ Midtown Center        ┆ Upper East Side South ┆ 10139                           │
└───────────────────────┴───────────────────────┴─────────────────────────────────┘


q) What are the top 5 most common pickup-dropoff zone pairs? (Include zone names)

In [17]:
#One extra query to make the heatmap
result = con.execute('''
    SELECT 
        DAYNAME(tpep_pickup_datetime) as day_of_week,
        HOUR(tpep_pickup_datetime) as hour,
        COUNT(*) as trip_count
    FROM filterd_df
    GROUP BY day_of_week, hour
    ORDER BY hour
''').pl()


result_heat.write_parquet("queries/trips_by_day_hour.parquet")

In [18]:
#Creates a cleaned and updated Trip Data file to use in the dashboard
filterd_df.write_parquet('Transformed_TripData.parquet')


In [19]:
#Moving files to be ignored 
source_file = 'yellow_tripdata_2024-01.parquet'
destination_folder = 'data/raw/'

os.makedirs(destination_folder, exist_ok = True)
shutil.move(source_file, os.path.join(destination_folder, os.path.basename(source_file)))

source_file = 'taxi_zone_lookup.csv'
shutil.move(source_file, os.path.join(destination_folder, os.path.basename(source_file)))

file = open("data/raw/.gitignore", "w")
file.close()





In [20]:
#Creating the .gitignore files
file = open("data/.gitignore", "w")
file.close()