In [11]:
#In this Jupyter notebook, we create code to:
# - Download the CSV file
# - Read it in chunks with pandas
# - Convert datetime columns
# - Insert data into PostgreSQL using SQLAlchemy

import pandas as pd
from sqlalchemy import create_engine
from tqdm.auto import tqdm
import os

# import db vars from environment variables
DB_USER = os.getenv("DB_USER")
DB_PORT = os.getenv("DB_PORT")


engine = create_engine(f'postgresql://{DB_USER}:{DB_USER}@localhost:{DB_PORT}/ny_taxi')

# Read a sample of the data
prefix = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/'

dtype = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "RatecodeID": "Int64",
    "store_and_fwd_flag": "string",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64"
}

parse_dates = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime"
]

df = pd.read_csv(
    prefix + 'yellow_tripdata_2021-01.csv.gz',
    nrows=100,
    dtype=dtype,
    parse_dates=parse_dates
)

# Display first rows
print(df.head())

# Check data types
print(df.dtypes)

# Check data shape
print(df.shape)

# Get DDL schema
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))

# Create the table in the database
# n=0 ensures only the schema is created without inserting data
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

# Ingest data in chunks
df_iter = pd.read_csv(
	prefix + 'yellow_tripdata_2021-01.csv.gz',
    dtype=dtype,
    parse_dates=parse_dates,
	iterator=True,
	chunksize=100000
)

first = True

# iterate over chunks and insert into the database
for df_chunk in tqdm(df_iter):
	if first:
		# Create table schema (no data)
		df_chunk.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')
		first = False
		print("Table created.")
	
    # insert chunk
	df_chunk.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
	print("Inserted:", len(df_chunk))

   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         1  2021-01-01 00:30:10   2021-01-01 00:36:12                1   
1         1  2021-01-01 00:51:20   2021-01-01 00:52:19                1   
2         1  2021-01-01 00:43:30   2021-01-01 01:11:06                1   
3         1  2021-01-01 00:15:48   2021-01-01 00:31:01                0   
4         2  2021-01-01 00:31:49   2021-01-01 00:48:21                1   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           2.10           1                  N           142            43   
1           0.20           1                  N           238           151   
2          14.70           1                  N           132           165   
3          10.60           1                  N           138           132   
4           4.94           1                  N            68            33   

   payment_type  fare_amount  extra  mta_tax  tip_amount  tolls_amount  \


0it [00:00, ?it/s]

Table created.
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 69765
