## Part 1: Data Ingestion


### 1) Programmatic Download

In [47]:
import polars as pl
import numpy as np
import duckdb
from urllib import request
from sys import exit
from datetime import datetime

import os
import shutil


validated = True
exceptionList = []

parq_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet'
lut_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"

# Create the data/raw/ directory when ran

newpath = '.\\data\\raw'
if not os.path.exists(newpath):
    os.makedirs(newpath)

# Download and write the parquet file
with request.urlopen(parq_url) as parq_file, open('.\\data\\raw\\yellow_tripdata_2024-01.parquet', 'wb+') as out_file:
    shutil.copyfileobj(parq_file, out_file)
    
# Download and write the csv file
with request.urlopen(lut_url) as lut_file, open('.\\data\\raw\\taxi_zone_lookup.csv', 'wb+') as out_file:
    shutil.copyfileobj(lut_file, out_file)

df = pl.read_parquet('.\\data\\raw\\yellow_tripdata_2024-01.parquet')    
lut = pl.read_csv('.\\data\\raw\\taxi_zone_lookup.csv')

print(df)
print(lut)


shape: (2_964_624, 19)
┌──────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ VendorID ┆ tpep_pick ┆ tpep_drop ┆ passenger ┆ … ┆ improveme ┆ total_amo ┆ congestio ┆ Airport_f │
│ ---      ┆ up_dateti ┆ off_datet ┆ _count    ┆   ┆ nt_surcha ┆ unt       ┆ n_surchar ┆ ee        │
│ i32      ┆ me        ┆ ime       ┆ ---       ┆   ┆ rge       ┆ ---       ┆ ge        ┆ ---       │
│          ┆ ---       ┆ ---       ┆ i64       ┆   ┆ ---       ┆ f64       ┆ ---       ┆ f64       │
│          ┆ datetime[ ┆ datetime[ ┆           ┆   ┆ f64       ┆           ┆ f64       ┆           │
│          ┆ ns]       ┆ ns]       ┆           ┆   ┆           ┆           ┆           ┆           │
╞══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 2        ┆ 2024-01-0 ┆ 2024-01-0 ┆ 1         ┆ … ┆ 1.0       ┆ 22.7      ┆ 2.5       ┆ 0.0       │
│          ┆ 1         ┆ 1         ┆           ┆   ┆           ┆    

### 2) Validation checks

        a) Verify all expected columns exist in the dataset

        b) Check that date columns are valid datetime types

        c) Report total row count and print a summary to the console

        d) Raise an exception or exit with an error message if validation fails

### a)

In [48]:

#check for tpep_pickup_datetime
if(df.get_column('tpep_pickup_datetime', default = None) is None):
    exceptionList.append("The 'tpep_pickup_datetime' column was not found.")
    validated = False
else:
    print("The 'tpep_pickup_datetime' column exists.")

#check for tpep_dropoff_datetime
if(df.get_column('tpep_pickup_datetime', default = None) is None):
    exceptionList.append("The 'tpep_pickup_datetime' column was not found.")
    validated = False
else:
    print("The 'tpep_pickup_datetime' column exists.")

#check for PULocationID
if(df.get_column('PULocationID', default = None) is None):
    exceptionList.append("The 'PULocationID' column was not found.")
    validated = False
else:
    print("The 'PULocationID' column exists.")

#check for DOLocationID
if(df.get_column('DOLocationID', default = None) is None):
    exceptionList.append("The 'DOLocationID' column was not found.")
    validated = False
else:
    print("The 'DOLocationID' column exists.")
    
#check for passenger_count
if(df.get_column('passenger_count', default = None) is None):
    exceptionList.append("The 'passenger_count' column was not found.")
    validated = False
else:
    print("The 'passenger_count' column exists.")
    
#check for trip_distance
if(df.get_column('trip_distance', default = None) is None):
    exceptionList.append("The 'trip_distance' column was not found.")
    validated = False
else:
    print("The 'trip_distance' column exists.")
    
#check for fare amount
if(df.get_column('fare_amount', default = None) is None):
    exceptionList.append("The 'fare_amount' column was not found.")
    validated = False
else:
    print("The 'fare_amount' column exists.")
    
#check for tip amount
if(df.get_column('tip_amount', default = None) is None):
    exceptionList.append("The 'tip_amount' column was not found.")
    validated = False
else:
    print("The 'tip_amount' column exists.")\
    
#check for total amount
if(df.get_column('total_amount', default = None) is None):
    exceptionList.append("The 'total_amount' column was not found.")
    validated = False
else:
    print("The 'total_amount' column exists.")
        
#check for payment type
if(df.get_column('payment_type', default = None) is None):
    exceptionList.append("The 'payment_type' column was not found.")
    validated = False
else:
    print("The 'payment_type' column exists.")


The 'tpep_pickup_datetime' column exists.
The 'tpep_pickup_datetime' column exists.
The 'PULocationID' column exists.
The 'DOLocationID' column exists.
The 'passenger_count' column exists.
The 'trip_distance' column exists.
The 'fare_amount' column exists.
The 'tip_amount' column exists.
The 'total_amount' column exists.
The 'payment_type' column exists.


### b)

In [49]:

print(f'Columns types: {df['tpep_pickup_datetime', 'tpep_dropoff_datetime'].dtypes}')

if(df['tpep_pickup_datetime'].dtype == pl.Datetime):
    print("The 'tpep_pickup_datetime' column is the correct Datetime type.")
else:
    exceptionList.append("The 'tpep_pickup_datetime' column is not the correct Datetime type.")
    validated = False


if(df['tpep_dropoff_datetime'].dtype == pl.Datetime):
    print("The 'tpep_dropoff_datetime' column is the correct Datetime type.")
else:
    exceptionList.append("The 'tpep_dropoff_datetime' column is not the correct Datetime type. Aborting...")
    validated = False




Columns types: [Datetime(time_unit='ns', time_zone=None), Datetime(time_unit='ns', time_zone=None)]
The 'tpep_pickup_datetime' column is the correct Datetime type.
The 'tpep_dropoff_datetime' column is the correct Datetime type.


### c)

In [50]:
print("\n--STATISTICS--")
print(f'Number of rows: {df.height}\n')
print(f'Number of columns: {df.width}\n')
print(f'Columns: {df.columns}\n')
print(f'Data types: {df.dtypes}\n')
print(f'Shape: {df.shape}\n')
print(f'Head: {df.head()}\n')

#storage of initial set for use

init_size = df.height



--STATISTICS--
Number of rows: 2964624

Number of columns: 19

Columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee']

Data types: [Int32, Datetime(time_unit='ns', time_zone=None), Datetime(time_unit='ns', time_zone=None), Int64, Float64, Int64, String, Int32, Int32, Int64, Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64]

Shape: (2964624, 19)

Head: shape: (5, 19)
┌──────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ VendorID ┆ tpep_pick ┆ tpep_drop ┆ passenger ┆ … ┆ improveme ┆ total_amo ┆ congestio ┆ Airport_f │
│ ---      ┆ up_dateti ┆ off_datet ┆ _count    ┆   ┆ nt_surcha ┆ unt       ┆ n_surchar ┆ ee        │
│ i32      ┆ me

### d)

In [51]:
if(not validated):
    print("ERROR - The dataset is NOT valid. Here are the exceptions: ")
    for ex in exceptionList:
        print(f'    - {ex}')
    print("Aborting...")
    exit
    

## Part 2: Data Transformation & Analysis


### 4) Data Cleaning -

        

        

        

        

### e) Remove rows with null values in critical columns (pickup/dropoff times, locations, fare)

In [52]:
df = df.filter(
    pl.col('tpep_pickup_datetime').is_not_null(),
    pl.col('tpep_dropoff_datetime').is_not_null(),
    pl.col('PULocationID').is_not_null(),
    pl.col('DOLocationID').is_not_null(),
    pl.col('passenger_count').is_not_null(),
    pl.col('trip_distance').is_not_null(),
    pl.col('fare_amount').is_not_null(),
    pl.col('tip_amount').is_not_null(),
    pl.col('total_amount').is_not_null(),
    pl.col('payment_type').is_not_null(),
)

print(f'df height after removing null values: {df.height}')
filter_nulls_size = df.height

df height after removing null values: 2824462


### f) Filter out invalid trips: trips with zero or negative distance, negative fares, or fares exceeding $500 

In [53]:
df = df.filter(
    pl.col('trip_distance') > 0.0,
    pl.col('total_amount') > 0.0,
    pl.col('total_amount') <= 500
)

print(f'df height after removing invalid trips: {df.height}')
filter_invalid_trips_size = df.height

df height after removing invalid trips: 2754651


### g) Remove trips with invalid datetimes

In [54]:
df = df.filter(
    pl.col('tpep_pickup_datetime') < pl.col('tpep_dropoff_datetime'),
    pl.col('tpep_pickup_datetime') >= datetime(2024, 1,1), 
    pl.col('tpep_pickup_datetime') <= datetime(2024, 1,31),
)

print(f'df height after removing invalid dates: {df.height}')
filter_invalid_time_size = df.height

df height after removing invalid dates: 2658164


### h) Document how many rows were removed and why (print summary to console) 


In [55]:
final_size = df.height

print("--POST CLEANING SUMMARY--")

print(f'\nInitial number of rows: {init_size}\n')
print(f'After removing all null values from the critical columns, there were {filter_nulls_size} rows.\n{init_size} - {filter_nulls_size} = {init_size - filter_nulls_size} rows were removed.\n')
print(f'After removing all invalid trips (negative distance, negative fares and fares exceeding $500), there were {filter_invalid_trips_size} rows.\n{filter_nulls_size} - {filter_invalid_trips_size} = {filter_nulls_size - filter_invalid_trips_size} rows were removed.\n')
print(f'After removing all rows with invalid dropoff and pickup times, there were {final_size} rows.\n{filter_invalid_trips_size} - {final_size} = {filter_invalid_trips_size - final_size} rows were removed.\n')
print(f'After all cleaning procedures, the number of rows removed are: {init_size} - {final_size} = {init_size - final_size} total rows removed during cleaning.')


--POST CLEANING SUMMARY--

Initial number of rows: 2964624

After removing all null values from the critical columns, there were 2824462 rows.
2964624 - 2824462 = 140162 rows were removed.

After removing all invalid trips (negative distance, negative fares and fares exceeding $500), there were 2754651 rows.
2824462 - 2754651 = 69811 rows were removed.

After removing all rows with invalid dropoff and pickup times, there were 2658164 rows.
2754651 - 2658164 = 96487 rows were removed.

After all cleaning procedures, the number of rows removed are: 2964624 - 2658164 = 306460 total rows removed during cleaning.


<font color='green'>(From the console output)</font>

--POST CLEANING SUMMARY--

Initial number of rows: 2964624

After removing all null values from the critical columns, there were 2824462 rows.
2964624 - 2824462 = 140162 rows were removed.

After removing all invalid trips (negative distance, negative fares and fares exceeding $500), there were 2754651 rows.
2824462 - 2754651 = 69811 rows were removed.

After removing all rows with invalid dropoff and pickup times, there were 2754593 rows.
2754651 - 2754593 = 58 rows were removed.

After all cleaning procedures, the number of rows removed are: 2964624 - 2754593 = 210031 total rows removed during cleaning.

### 5) Feature Engineering -

Create 4 new derived columns:

        

        

         

        

### i) trip_duration_minutes: calculated from pickup and dropoff timestamps 

In [56]:
df = df.with_columns(
    ((pl.col('tpep_dropoff_datetime') - pl.col('tpep_pickup_datetime'))
     .dt.total_seconds() / 60).alias('trip_duration_minutes')
)

print(df['trip_duration_minutes'])

shape: (2_658_164,)
Series: 'trip_duration_minutes' [f64]
[
	19.8
	6.6
	17.916667
	8.3
	6.1
	…
	13.25
	8.866667
	7.4
	8.966667
	2.45
]


### j) trip_speed_mph: distance divided by duration (handle division by zero)

In [57]:
df = df.with_columns(
    pl.when((pl.col('trip_duration_minutes')/60.0) > 0)        
    .then((pl.col('trip_distance') / (pl.col('trip_duration_minutes')/60)))
    .otherwise(0)   #if the duration is 0, then the speed would be zero
    .alias("trip_speed_mph")
)

print(df["trip_speed_mph"])

shape: (2_658_164,)
Series: 'trip_speed_mph' [f64]
[
	5.212121
	16.363636
	15.739535
	10.120482
	7.868852
	…
	18.792453
	13.87218
	8.756757
	15.055762
	9.55102
]


### k) pickup_hour: hour of day (0-23) extracted from pickup timestamp 

In [58]:
df = df.with_columns(
    (pl.col('tpep_pickup_datetime').dt.hour()).alias("pickup_hour")
)

print(df['tpep_pickup_datetime', 'pickup_hour'])

shape: (2_658_164, 2)
┌──────────────────────┬─────────────┐
│ tpep_pickup_datetime ┆ pickup_hour │
│ ---                  ┆ ---         │
│ datetime[ns]         ┆ i8          │
╞══════════════════════╪═════════════╡
│ 2024-01-01 00:57:55  ┆ 0           │
│ 2024-01-01 00:03:00  ┆ 0           │
│ 2024-01-01 00:17:06  ┆ 0           │
│ 2024-01-01 00:36:38  ┆ 0           │
│ 2024-01-01 00:46:51  ┆ 0           │
│ …                    ┆ …           │
│ 2024-01-30 23:56:51  ┆ 23          │
│ 2024-01-30 23:55:12  ┆ 23          │
│ 2024-01-30 23:57:20  ┆ 23          │
│ 2024-01-31 00:00:00  ┆ 0           │
│ 2024-01-30 23:55:53  ┆ 23          │
└──────────────────────┴─────────────┘


### l) pickup_day_of_week: day name (Monday-Sunday) extracted from pickup timestamp

In [59]:
df = df.with_columns(
    pl.col('tpep_pickup_datetime').dt.strftime("%A").alias('pickup_day_of_week')
)


print(df['tpep_pickup_datetime', 'pickup_hour', 'pickup_day_of_week'])

df.write_parquet('.\\data\\raw\\yellow_tripdata_2024-01.parquet')

shape: (2_658_164, 3)
┌──────────────────────┬─────────────┬────────────────────┐
│ tpep_pickup_datetime ┆ pickup_hour ┆ pickup_day_of_week │
│ ---                  ┆ ---         ┆ ---                │
│ datetime[ns]         ┆ i8          ┆ str                │
╞══════════════════════╪═════════════╪════════════════════╡
│ 2024-01-01 00:57:55  ┆ 0           ┆ Monday             │
│ 2024-01-01 00:03:00  ┆ 0           ┆ Monday             │
│ 2024-01-01 00:17:06  ┆ 0           ┆ Monday             │
│ 2024-01-01 00:36:38  ┆ 0           ┆ Monday             │
│ 2024-01-01 00:46:51  ┆ 0           ┆ Monday             │
│ …                    ┆ …           ┆ …                  │
│ 2024-01-30 23:56:51  ┆ 23          ┆ Tuesday            │
│ 2024-01-30 23:55:12  ┆ 23          ┆ Tuesday            │
│ 2024-01-30 23:57:20  ┆ 23          ┆ Tuesday            │
│ 2024-01-31 00:00:00  ┆ 0           ┆ Wednesday          │
│ 2024-01-30 23:55:53  ┆ 23          ┆ Tuesday            │
└─────────────────

### 6) SQL Analysis - 


### m) What are the top 10 busiest pickup zones by total number of trips? (Include zone names from lookup table)

In [60]:
con = duckdb.connect()

result = con.execute('''
    SELECT PULocationID, l.Zone,
    COUNT(*) AS total_trips 
    FROM df
    JOIN lut l
    ON PULocationID = l.LocationID
    GROUP BY PULocationID, l.Zone
    ORDER BY total_trips DESC
    LIMIT 10;
    ''').fetchdf()

print(result)

   PULocationID                          Zone  total_trips
0           132                   JFK Airport       134796
1           237         Upper East Side South       131820
2           161                Midtown Center       130421
3           236         Upper East Side North       124528
4           162                  Midtown East        98000
5           186  Penn Station/Madison Sq West        97443
6           230     Times Sq/Theatre District        96214
7           142           Lincoln Square East        95600
8           138             LaGuardia Airport        84654
9           239         Upper West Side South        79765


### n) What is the average fare amount for each hour of the day? (Order by hour) 

In [61]:
result = con.execute('''
    SELECT 
    EXTRACT(hour FROM tpep_pickup_datetime) AS pickup_hour,
    AVG(fare_amount) AS avg_fare
    FROM df
    GROUP BY EXTRACT(hour FROM tpep_pickup_datetime)
    ORDER BY pickup_hour;
    ''').fetchdf()

print(result)

    pickup_hour   avg_fare
0             0  19.644453
1             1  17.359600
2             2  16.357390
3             3  17.865052
4             4  22.875628
5             5  27.573195
6             6  22.136905
7             7  18.618964
8             8  17.614745
9             9  17.859923
10           10  17.987185
11           11  17.561081
12           12  17.726672
13           13  18.365737
14           14  19.210625
15           15  19.036270
16           16  19.412642
17           17  18.050646
18           18  16.943751
19           19  17.647684
20           20  18.122781
21           21  18.380656
22           22  19.192339
23           23  20.378185


### o) What percentage of trips use each payment type?

In [62]:
result = con.execute('''
    SELECT payment_type,
    COUNT(*) * 100.0 / (SELECT COUNT(*) FROM df) AS percentage
    FROM df
    GROUP BY payment_type
    ORDER BY percentage DESC;
    ''').fetchdf()

print(result)

   payment_type  percentage
0             1   83.356858
1             2   15.426099
2             4    0.831250
3             3    0.385793


### p) What is the average tip percentage (tip_amount/fare_amount) by day of week, for credit card payments only? 

In [63]:
result = con.execute('''
    SELECT 
    pickup_day_of_week,
    AVG(tip_amount / fare_amount) AS avg_tip_percentage
    FROM df
    WHERE payment_type = 1
    AND fare_amount > 0
    GROUP BY pickup_day_of_week
    ORDER BY pickup_day_of_week;
    ''').fetchdf()

print(result)

  pickup_day_of_week  avg_tip_percentage
0             Friday            0.255957
1             Monday            0.255141
2           Saturday            0.262940
3             Sunday            0.251011
4           Thursday            0.297342
5            Tuesday            0.257301
6          Wednesday            0.257425


### q) What are the top 5 most common pickup-dropoff zone pairs? (Include zone names)

In [64]:
result = con.execute('''
    SELECT PULocationID,
    p.Zone AS pickup_zone,
    DOLocationID,
    d.Zone AS dropoff_zone,
    COUNT(*) AS trip_count
    FROM df
    JOIN lut p
    ON PULocationID = p.LocationID
    JOIN lut d
    ON DOLocationID = d.LocationID
    GROUP BY 
    PULocationID, p.Zone,
    DOLocationID, d.Zone
    ORDER BY trip_count DESC
    LIMIT 5;
    ''').fetchdf()

print(result)

   PULocationID            pickup_zone  DOLocationID           dropoff_zone  \
0           237  Upper East Side South           236  Upper East Side North   
1           236  Upper East Side North           237  Upper East Side South   
2           236  Upper East Side North           236  Upper East Side North   
3           237  Upper East Side South           237  Upper East Side South   
4           161         Midtown Center           237  Upper East Side South   

   trip_count  
0       20478  
1       17996  
2       14334  
3       13333  
4        9572  


## Part 3: Dashboard Development



### Top 10 pickup zones by trip count

In [65]:
import altair as alt

sample = df.sample(n = 5000, seed=42)


con = duckdb.connect()
result = con.execute('''
SELECT PULocationID, l.Zone,
COUNT(*) AS total_trips 
FROM sample
JOIN lut l
ON PULocationID = l.LocationID
GROUP BY PULocationID, l.Zone
ORDER BY total_trips DESC
LIMIT 10;
                    
''').fetchdf()

# Query reused from the notebook


bar_chart = alt.Chart(result).mark_bar().encode(
    x = alt.X("Zone", sort = None),
    y = alt.X("total_trips", title = "Total Trips"),
).properties(
    height = 600,
    width = 600
)

text = bar_chart.mark_text(             # Added number above bars for clarity
    align='center',
    baseline='middle',
    dy=-10,                             # Raise the number
    color = 'grey'
).encode(
    text='total_trips:Q'
)

bar_chart + text

There seems to be more revenue as the Midtown Center gets closer.

### Average fare by hour of day (showing hourly patterns) 

In [66]:
result = con.execute('''
SELECT 
EXTRACT(hour FROM tpep_pickup_datetime) AS pickup_hour,
AVG(fare_amount) AS average_fare
FROM sample
GROUP BY EXTRACT(hour FROM tpep_pickup_datetime)
ORDER BY pickup_hour;
''').fetchdf()

# Query reused from the notebook

line_chart = alt.Chart(result).mark_line().encode(
    x = alt.X("pickup_hour", title="Pickup Hour", sort=None),
    y = alt.X("average_fare", title = "Average Fare")
).properties(
    height = 600,
    width = 600,
    
)

line_chart

People take longer trips in the morning.

### Distribution of trip distances (with appropriate binning) 

In [67]:
result = con.execute('''
SELECT 
trip_distance,
trip_duration_minutes
FROM sample
WHERE trip_distance < 500;
''').fetchdf()

# Used a second attribute to avoid some issues with the Series type. Just need trip_distance

hist_chart = alt.Chart(result).mark_bar().encode(
    x = alt.X("trip_distance:Q", title="Trip Distance", sort = None, bin=alt.Bin(extent = [0, 100],step = 2), axis=alt.Axis(values=list(range(0, 102, 2)))), 
    y = alt.Y("count()", title = "Total Trips"),
    tooltip = ["trip_distance","count()"]
    
).properties(
    height = 600,
    width = 1200
)

hist_chart


Major revenue is generated from short trips.

### Breakdown of payment types 

In [68]:
result = con.execute('''
SELECT 
payment_type,
fare_amount
FROM sample;
''').fetchdf()


bar_chart2 = alt.Chart(result).mark_bar(size = 40).encode(
    x = alt.X("payment_type:Q", title="Payment Types" ,sort = None, axis = alt.Axis(values=[1,2,3,4])),
    y = alt.X("count()", title = "Total Payments")
).properties(
    height = 600,
    width = 600
)

text2 = bar_chart2.mark_text(       # Added number above bars for clarity here as well
    align='center',
    baseline='middle',
    dy=-10,                         # Raise the number
    color = 'grey'
).encode(
    text='count():Q'
)

bar_chart2 + text2

Revenue is dependent on the availability of Payment Method 1.

### Trips by day of week and hour (showing weekly patterns)

In [69]:
result = con.execute('''
SELECT 
pickup_hour,
DATE_PART('dow', tpep_pickup_datetime) AS day_of_week,
COUNT(*) as trip_count,     
FROM sample
GROUP BY pickup_hour, day_of_week
ORDER BY pickup_Hour, day_of_week;
''').fetchdf()

heat_map = alt.Chart(result).mark_rect().encode(
    x=alt.X('pickup_hour:O', title='Pickup Hour'),
    y=alt.Y('day_of_week:O', title='Day of Week (Sunday is 0)'),
    color=alt.Color('trip_count:Q', title = 'Trip Count' ,scale=alt.Scale(scheme='redyellowgreen'))
).properties(
    height = 600,
    width = 600
)

heat_map

Major revenue is also made during the weekday evenings.