In [1]:
import pandas as pd
from sqlalchemy import create_engine
from tqdm.notebook import tqdm
from io import StringIO
import time

In [2]:
# ------------------------
# Config
# ------------------------

prefix = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/"
file_url = prefix + "yellow_tripdata_2021-01.csv.gz"
table_name = "yellow_taxi_data"

In [3]:
chunksize = 100_000

In [4]:
# ------------------------
# Data Types
# ------------------------

dtype = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "RatecodeID": "Int64",
    "store_and_fwd_flag": "string",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64"
}

parse_dates = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime"
]

In [5]:
# ------------------------
# Create DB Engine
# ------------------------

engine = create_engine(
    "postgresql://root:root@localhost:5432/ny_taxi"
)

In [6]:
# ------------------------
# Create Schema
# ------------------------

print("Creating table schema...")

df_schema = pd.read_csv(
    file_url,
    nrows=0
)

df_schema.to_sql(
    name=table_name,
    con=engine,
    if_exists="replace",
    index=False
)

print("Schema created")

Creating table schema...
Schema created


In [7]:
# ------------------------
# Iterator
# ------------------------

df_iter = pd.read_csv(
    file_url,
    dtype=dtype,
    parse_dates=parse_dates,
    chunksize=chunksize
)

In [8]:
# ------------------------
# COPY function
# ------------------------

def copy_to_postgres(df, table, conn):

    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False)
    buffer.seek(0)

    cursor = conn.cursor()

    cursor.copy_expert(
        f"COPY {table} FROM STDIN WITH CSV",
        buffer
    )

    cursor.close()


In [9]:
# ------------------------
# Ingestion Loop
# ------------------------

print("Starting ingestion...")

conn = engine.raw_connection()

start_time = time.time()
total_rows = 0

for i, df_chunk in enumerate(tqdm(df_iter, desc="Ingesting")):

    chunk_start = time.time()

    copy_to_postgres(df_chunk, table_name, conn)

    conn.commit()

    rows = len(df_chunk)
    total_rows += rows

    chunk_time = time.time() - chunk_start

    print(
        f"Chunk {i+1} → {rows} rows "
        f"| {rows/chunk_time:.0f} rows/sec"
    )

conn.close()

print(f"\nTotal rows inserted: {total_rows}")
print(f"Total time: {time.time() - start_time:.2f} sec")

Starting ingestion...


Ingesting: 0it [00:00, ?it/s]

Chunk 1 → 100000 rows | 60817 rows/sec
Chunk 2 → 100000 rows | 62579 rows/sec
Chunk 3 → 100000 rows | 58273 rows/sec
Chunk 4 → 100000 rows | 71364 rows/sec
Chunk 5 → 100000 rows | 59887 rows/sec
Chunk 6 → 100000 rows | 53738 rows/sec
Chunk 7 → 100000 rows | 46546 rows/sec
Chunk 8 → 100000 rows | 63567 rows/sec
Chunk 9 → 100000 rows | 66468 rows/sec
Chunk 10 → 100000 rows | 41934 rows/sec
Chunk 11 → 100000 rows | 64122 rows/sec
Chunk 12 → 100000 rows | 36806 rows/sec
Chunk 13 → 100000 rows | 40515 rows/sec
Chunk 14 → 69765 rows | 48296 rows/sec

Total rows inserted: 1369765
Total time: 43.09 sec
