In [1]:
import pandas as pd
from sqlalchemy import create_engine
from time import time

In [2]:
# create the connection to postgresql server in docker for data ingestion

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

engine.connect()

<sqlalchemy.engine.base.Connection at 0x13a0e3750>

In [3]:
df = pd.read_parquet("data/yellow_tripdata_2023-01.parquet")

In [4]:
df.to_csv('data/yellow_tripdata_2023-01.csv', index=False)

In [5]:
# manually set the dtype to str for col 6 due to pandas data interpretation error

df = pd.read_csv('data/yellow_tripdata_2023-01.csv', dtype={6: 'str'})

In [6]:
# convert dtypes to datetime values

df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [7]:
# generates the SQL command needed to create the table without interacting with the db
# without adding 'print()' there are no line breaks

print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [8]:
# insert the schema and data types without any data to ensure the correct structure
# if it already exists, it will be replaced

df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [9]:
# Batch data ingestion into manageable sizes due to large file size
# manually set the dtype to str for col 6 due to pandas data interpretation error

df_iter = pd.read_csv('data/yellow_tripdata_2023-01.csv', dtype={6: 'str'}, iterator=True, chunksize=100000)

In [10]:
# infinite loop until StopIteration error (data transfer complete)

while True:
    try:
        t_start = time()
        
        df = next(df_iter) # fetches the next chunk after each iteration of 100,000 values
    
        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append') # inserts the chunk of data into the table
    
        t_end = time()
    
        print(f'Inserted {len(df)} rows... took {t_end - t_start:.3f} seconds')
    except StopIteration:
        print("No more data to process.")
        break


Inserted 100000 rows... took 4.723 seconds
Inserted 100000 rows... took 4.092 seconds
Inserted 100000 rows... took 4.068 seconds
Inserted 100000 rows... took 4.009 seconds
Inserted 100000 rows... took 4.757 seconds
Inserted 100000 rows... took 4.492 seconds
Inserted 100000 rows... took 4.225 seconds
Inserted 100000 rows... took 4.198 seconds
Inserted 100000 rows... took 4.254 seconds
Inserted 100000 rows... took 3.986 seconds
Inserted 100000 rows... took 4.148 seconds
Inserted 100000 rows... took 4.174 seconds
Inserted 100000 rows... took 4.888 seconds
Inserted 100000 rows... took 4.459 seconds
Inserted 100000 rows... took 4.690 seconds
Inserted 100000 rows... took 4.214 seconds
Inserted 100000 rows... took 5.028 seconds
Inserted 100000 rows... took 4.505 seconds
Inserted 100000 rows... took 4.247 seconds
Inserted 100000 rows... took 3.895 seconds
Inserted 100000 rows... took 4.077 seconds
Inserted 100000 rows... took 4.229 seconds
Inserted 100000 rows... took 4.166 seconds
Inserted 10

In [11]:
# Adding a new data set for the analysis phase

df_zones = pd.read_csv('data/taxi+_zone_lookup.csv')

In [12]:
df_zones.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [13]:
# pushing the data to the postgresSQL database

df_zones.to_sql(name='zones', con=engine, if_exists='replace')

265