In [1]:
import pandas as pd
import os
import warnings
from sqlalchemy import create_engine

In [2]:
pd.__file__

'/workspaces/data-engineering-zoomcamp-2026/Module 1 - Containerization and Infrastructure as Code/docker-workshop/pipeline/.venv/lib/python3.13/site-packages/pandas/__init__.py'

In [3]:
warnings.filterwarnings('ignore')

## Explore Data

In [4]:
filepath = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/'
filename = 'yellow_tripdata_2021-01.csv.gz'

In [5]:
df = pd.read_csv(os.path.join(filepath, filename))

In [6]:
df.info()

<class 'pandas.DataFrame'>
RangeIndex: 1369765 entries, 0 to 1369764
Data columns (total 18 columns):
 #   Column                 Non-Null Count    Dtype  
---  ------                 --------------    -----  
 0   VendorID               1271413 non-null  float64
 1   tpep_pickup_datetime   1369765 non-null  str    
 2   tpep_dropoff_datetime  1369765 non-null  str    
 3   passenger_count        1271413 non-null  float64
 4   trip_distance          1369765 non-null  float64
 5   RatecodeID             1271413 non-null  float64
 6   store_and_fwd_flag     1271413 non-null  str    
 7   PULocationID           1369765 non-null  int64  
 8   DOLocationID           1369765 non-null  int64  
 9   payment_type           1271413 non-null  float64
 10  fare_amount            1369765 non-null  float64
 11  extra                  1369765 non-null  float64
 12  mta_tax                1369765 non-null  float64
 13  tip_amount             1369765 non-null  float64
 14  tolls_amount           136976

In [7]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2.0,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1.0,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2.0,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1.0,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1.0,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1.0,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1.0,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2.0,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1.0,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


## Connect to PostgresSQL Database

Execute the command below to run the Docker container based on the `postgres:18` image:
```bash
docker run -it --rm \
    -e POSTGRES_USER="root" \   
    -e POSTGRES_PASSWORD="root" \
    -e POSTGRES_DB="ny_taxi" \
    -v ny_taxi_postgres_data:/var/lib/postgres \
    -p 5432:5432 \  
    postgres:18
```

In [8]:
username = "root"
password = "root"
host = "localhost"
port = 5432
db_name = "ny_taxi"

In [9]:
# Create the engine for building connections to PostgresSQL DB
engine = create_engine(
    f'postgresql://{username}:{password}@{host}:{port}/{db_name}'
)

## Load Data

In [10]:
dtype = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "RatecodeID": "Int64",
    "store_and_fwd_flag": "string",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64"
}

date_cols = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime"
]

# Load data in batches
df_chunks = pd.read_csv(
    os.path.join(filepath, filename),
    dtype=dtype,
    parse_dates=date_cols,
    chunksize=100_000,
)

df_chunks = list(df_chunks)

In [11]:
for df_chunk in df_chunks:
    print(len(df_chunk))

100000
100000
100000
100000
100000
100000
100000
100000
100000
100000
100000
100000
100000
69765


## Create a Table in the PostgresSQL Database

In [12]:
df_chunk.head(0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [13]:
# Create a table with headers only but no data
df_chunk.head(0).to_sql(
    name='yellow_taxi_data',
    con=engine,                 # Use the engine to connect to DB
    if_exists='replace',
)

0

Execute the command below to access the query interface to PostgreSQL DB, and verify the existence of the new table.
```bash
uv run pgcli \
    -h localhost \  
    -p 5432 \       
    -u root \       
    -d ny_taxi \    
```

## Insert Data

In [14]:
for df_chunk in df_chunks:
    df_chunk.to_sql(
        name='yellow_taxi_data',
        con=engine,
        if_exists='append',
    )
    print(f'{len(df_chunk)} records inserted.')

100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
100000 records inserted.
69765 records inserted.
