In [1]:
import pandas as pd
from sqlalchemy import create_engine
from time import time

In [2]:
# Connecting to a database
engine = create_engine('postgresql://menace:****@localhost:5432/c5')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x2893b93a620>

In [4]:
data = pd.read_csv('yellow_tripdata_2016-02.csv',chunksize=100000)

In [5]:
# Viewing the first 5 rows in the first chunk
df_taxi = next(data)
df_taxi.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2,2016-02-25 17:24:20,2016-02-25 17:27:20,2,0.7,-73.94725,40.763771,1,N,-73.992012,40.73539,2,5.0,0.0,0.5,0.0,0.0,0.3,5.8
1,2,2016-02-25 23:10:50,2016-02-25 23:31:50,2,5.52,-73.983017,40.750992,1,N,-73.988586,40.758839,2,20.0,0.5,0.5,0.0,0.0,0.3,21.3
2,2,2016-02-01 00:00:01,2016-02-01 00:10:52,6,1.99,-73.99234,40.758202,1,N,-73.964355,40.757977,1,9.5,0.5,0.5,0.7,0.0,0.3,11.5
3,1,2016-02-01 00:00:04,2016-02-01 00:05:16,1,1.5,-73.981453,40.749722,1,N,-73.982323,40.763985,2,6.5,0.5,0.5,0.0,0.0,0.3,7.8
4,2,2016-02-01 00:00:05,2016-02-01 00:20:59,1,5.6,-74.000603,40.729755,1,N,-73.951324,40.669834,1,20.0,0.5,0.5,4.0,0.0,0.3,25.3


# I noticed the data is indexed incorrectly so i am going to sort it by "tpep_pickup_datetime",
meaning from the first trip recorded.

In [64]:
df_taxi['tpep_pickup_datetime'] = pd.to_datetime(df_taxi['tpep_pickup_datetime'])
sorted_df = df_taxi.sort_values(by='tpep_pickup_datetime')
sorted_df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
58,2,2016-02-01 00:00:00,2016-02-01 19:50:51,1,3.42,-73.953468,40.767311,1,N,0.0,0.0,2,12.0,1.3,0.5,0.0,0.0,0.0,13.8
57,1,2016-02-01 00:00:00,2016-02-01 00:06:47,1,0.9,-73.981636,40.728554,1,N,-73.980484,40.739548,1,6.5,0.5,0.5,2.2,0.0,0.3,10.0
2,2,2016-02-01 00:00:01,2016-02-01 00:10:52,6,1.99,-73.99234,40.758202,1,N,-73.964355,40.757977,1,9.5,0.5,0.5,0.7,0.0,0.3,11.5
59,1,2016-02-01 00:00:01,2016-02-01 00:03:16,3,0.0,-73.949776,40.662441,1,N,-73.94825,40.662582,3,2.5,0.5,0.5,0.0,0.0,0.3,3.8
61,1,2016-02-01 00:00:03,2016-02-01 00:03:59,1,0.8,-73.976952,40.747387,1,N,-73.986938,40.744812,1,5.0,0.5,0.5,1.0,0.0,0.3,7.3


# creating empty columns to enter into the database and confirm the connection

In [5]:
data_head = pd.read_csv('yellow_tripdata_2016-02.csv',nrows=0) 
data_head

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount


0

putting everything into a function

In [160]:
def etl_tripdata(chunk_size, connection):
    data = pd.read_csv('yellow_tripdata_2016-02.csv',chunksize=chunk_size)
    count = 1
    overall_start_time = time()
    for chunk in data:
        if count > 120:
            overall_end_time = time() - overall_start_time
            print(f'finished loading a total of {chunk_size*count:,} records in {overall_end_time} seconds')
            break;
        else:
            t_start = time()
            chunk['tpep_pickup_datetime'] = pd.to_datetime(chunk['tpep_pickup_datetime'])
            chunk['tpep_dropoff_datetime'] = pd.to_datetime(chunk['tpep_dropoff_datetime'])
            sorted_df = chunk.sort_values(by='tpep_pickup_datetime')
            chunk.to_sql(name='tripdata_showcase',con=connection,if_exists='append',index=False)
            count +=1
            t_end = time()
            print(f'{count}) loaded next chunk {t_end - t_start:.3f} seconds')

In [129]:
etl_tripdata(100000,engine)

1) loaded next chunk 19.829 seconds
2) loaded next chunk 19.254 seconds
3) loaded next chunk 20.105 seconds
4) loaded next chunk 19.623 seconds
5) loaded next chunk 19.200 seconds
6) loaded next chunk 20.704 seconds
7) loaded next chunk 19.524 seconds
8) loaded next chunk 20.968 seconds
9) loaded next chunk 19.351 seconds
10) loaded next chunk 20.647 seconds
finished loading a total of 1,000,000 records in 203.44093775749207 seconds


# SQL ETL

I will be making different reports from my pre-processed data

In [161]:
# How many trips were recorded in the dataset?
total_trips= '''
SELECT COUNT(*) as total_trips
FROM tripdata_project
'''
pd.read_sql(total_trips,engine)

Unnamed: 0,total_trips
0,1000000


In [162]:
# What is the average trip distance for all trips?
avg_trip_distance= '''
SELECT avg(cast(trip_distance as numeric)) as avg_trip_distance
FROM tripdata_project
'''
pd.read_sql(avg_trip_distance,engine)

Unnamed: 0,avg_trip_distance
0,2.751403


In [163]:
# Which Vendor has the highest number of trips?
highest_trip_vendor= '''
SELECT "VendorID" as highest_trip_vendor
FROM 
(
SELECT "VendorID", count(*)
from tripdata_project
GROUP BY "VendorID"
ORDER BY "VendorID" DESC
LIMIT 1
) a
'''
pd.read_sql(highest_trip_vendor,engine)

Unnamed: 0,highest_trip_vendor
0,2


In [164]:
# Which Vendor has the lowest number of trips?
lowest_trip_vendor= '''
SELECT "VendorID" as lowest_trip_vendor
FROM 
(
SELECT "VendorID", count(*)
from tripdata_project
GROUP BY "VendorID"
ORDER BY "VendorID" ASC
LIMIT 1
) a
'''
pd.read_sql(lowest_trip_vendor,engine)

Unnamed: 0,lowest_trip_vendor
0,1


In [166]:
# What is the average passenger count per trip?
avg_passenger_count='''
SELECT avg(cast(passenger_count as numeric)) as avg_passenger_count
FROM tripdata_project
'''
pd.read_sql(avg_passenger_count,engine)

Unnamed: 0,avg_passenger_count
0,1.631115


# operations_and_performance report

In [167]:
report='''
SELECT CURRENT_DATE as ingestion_date,*
FROM
(SELECT COUNT(*) as total_trips
FROM tripdata_project) a,
(SELECT avg(cast(trip_distance as numeric)) as avg_trip_distance
FROM tripdata_project) b,
(SELECT "VendorID" as highest_trip_vendor
FROM 
(
SELECT "VendorID", count(*)
from tripdata_project
GROUP BY "VendorID"
ORDER BY "VendorID" DESC
LIMIT 1
) a) c,
(SELECT "VendorID" as lowest_trip_vendor
FROM 
(
SELECT "VendorID", count(*)
from tripdata_project
GROUP BY "VendorID"
ORDER BY "VendorID" ASC
LIMIT 1
) a) d,
(SELECT avg(cast(passenger_count as numeric)) as avg_passenger_count
FROM tripdata_project) e
'''

pd.read_sql(report,engine)
# Remind me to rewrite this in CTE

Unnamed: 0,ingestion_date,total_trips,avg_trip_distance,highest_trip_vendor,lowest_trip_vendor,avg_passenger_count
0,2023-08-18,1000000,2.751403,2,1,1.631115


In [138]:
# What is the average trip amount given by passengers?
query='''
SELECT AVG(CAST(tip_amount AS NUMERIC)) AS avg_tip_amount 
FROM tripdata_project
'''
pd.read_sql(query,engine)

Unnamed: 0,avg_tip_amount
0,1.825786


In [141]:
# What is the average trip distance by passengers?
query='''
SELECT AVG(cast(trip_distance as NUMERIC)) AS avg_trip_distance_by_passenger
FROM tripdata_project
'''
pd.read_sql(query,engine)

Unnamed: 0,avg_trip_distance_by_passenger
0,2.751403


In [145]:
# How many trips were flagged as 'store and forward'?
query='''
SELECT count(*) as store_and_forward_trips
FROM tripdata_project
WHERE store_and_fwd_flag = 'Y'
'''
pd.read_sql(query,engine)

Unnamed: 0,store_and_forward_trips
0,5918


In [144]:
# How many trips were shared rides (passenger count > 1)?
query='''
SELECT COUNT(*) as shared_ride_count
FROM tripdata_project
WHERE CAST(passenger_count as numeric) > 1
'''
pd.read_sql(query,engine)

Unnamed: 0,shared_ride_count
0,267128


# customer_demographics_and_preferences report

In [169]:
report='''
SELECT CURRENT_DATE as ingestion_date,*
FROM
(SELECT AVG(CAST(tip_amount AS NUMERIC)) AS avg_tip_amount 
FROM tripdata_project) a,
(SELECT AVG(cast(trip_distance as NUMERIC)) AS avg_trip_distance_by_passenger
FROM tripdata_project) b,
(SELECT count(*) as store_and_forward_trips
FROM tripdata_project
WHERE store_and_fwd_flag = 'Y') c,
(SELECT COUNT(*) as shared_ride_count
FROM tripdata_project
WHERE CAST(passenger_count as numeric) > 1) d
'''
pd.read_sql(report,engine)

Unnamed: 0,ingestion_date,avg_tip_amount,avg_trip_distance_by_passenger,store_and_forward_trips,shared_ride_count
0,2023-08-18,1.825786,2.751403,5918,267128


In [152]:
# What is the average fare amount per trip?
query='''
SELECT AVG(CAST(fare_amount as numeric)) as avg_fare_amount
FROM tripdata_project
'''
pd.read_sql(query,engine)

Unnamed: 0,avg_fare_amount
0,12.343774


In [155]:
# How much revenue was generated from tolls and surcharges combined?
query='''
SELECT SUM(CAST(tolls_amount as NUMERIC) + CAST(improvement_surcharge as NUMERIC)) as tolls_and_surcharges_revenue
FROM tripdata_project
'''
pd.read_sql(query,engine)

Unnamed: 0,tolls_and_surcharges_revenue
0,598262.8


In [159]:
# What is the average total amount paid by passengers?
query='''
SELECT AVG(CAST(total_amount as numeric)) as avg_total_amount
FROM tripdata_project
'''
pd.read_sql(query,engine)

Unnamed: 0,avg_total_amount
0,15.578133


# financial_performance report

In [170]:
report='''
SELECT CURRENT_DATE as ingestion_date,*
FROM
(SELECT AVG(CAST(fare_amount as numeric)) as avg_fare_amount
FROM tripdata_project) a,
(SELECT SUM(CAST(tolls_amount as NUMERIC) + CAST(improvement_surcharge as NUMERIC)) as tolls_and_surcharges_revenue
FROM tripdata_project) b,
(SELECT AVG(CAST(total_amount as numeric)) as avg_total_amount
FROM tripdata_project) c
'''
pd.read_sql(report,engine)

Unnamed: 0,ingestion_date,avg_fare_amount,tolls_and_surcharges_revenue,avg_total_amount
0,2023-08-18,12.343774,598262.8,15.578133
