In [None]:
import click
from io import BytesIO
import pandas as pd
import pyarrow.parquet as pq
import requests
from tqdm.auto import tqdm # tracks ingestion progress
from sqlalchemy import create_engine

In [None]:
# Getting data
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-11.parquet;

In [None]:
!wget -P ./resources/ https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

In [4]:
# Parameters for sql engine
pg_user = 'postgres'
pg_pass = 'postgres'
pg_host = 'localhost'
pg_port = 5432
pg_db = 'ny_taxi'
year = 2025
month = 11
target_table = 'green_taxi_data'
chunksize = 100000

In [None]:
df = pd.read_parquet('./resources/green_tripdata_2025-11.parquet')
df.head()

In [4]:
df.shape

(46912, 21)

In [5]:
df.dtypes

VendorID                          int32
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag               object
RatecodeID                      float64
PULocationID                      int32
DOLocationID                      int32
passenger_count                 float64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
dtype: object

In [57]:
dtype = {
    "VendorID": "int32",
    "store_and_fwd_flag": "object",
    "RatecodeID": "float64",
    "PULocationID": "int32",
    "DOLocationID": "int32",
    "passenger_count": "float64",
    "trip_distance": "float64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "payment_type": "float64",
    "trip_type": "float64",
    "congestion_surcharge": "float64",
    "cbd_congestion_fee": "float64"
}

parse_dates = [
    "lpep_pickup_datetime",
    "lpep_dropoff_datetime"
]

In [None]:
for column in df.columns:
    print(f'Column: {column} /n result: {df[column].unique()}')

In [18]:
prefix = 'https://d37ci6vzurychx.cloudfront.net/trip-data/'
url = f'{prefix}/green_tripdata_{year}-{month:02d}.parquet'
print(url)

https://d37ci6vzurychx.cloudfront.net/trip-data//green_tripdata_2025-11.parquet


In [23]:
def ingest_data(pg_user, pg_pass, pg_host, pg_port, pg_db, year, month, target_table, chunksize):
    # Ingestion logic here
    # Data source
    prefix = 'https://d37ci6vzurychx.cloudfront.net/trip-data'
    url = f'{prefix}/green_tripdata_{year}-{month:02d}.parquet'

    # Download parquet file
    response = requests.get(url, stream=True)
    response.raise_for_status()

    # Read the Parquet file from bytes
    table = pq.read_table(BytesIO(response.content))
        
    # Define sql engine and database
    engine = create_engine(f'postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}')
    

    first = True
    for batch in tqdm(table):
        df_chunk= batch.to_pandas()
        if first:
            # Create table schema (no data)
            df_chunk.head(0).to_sql(
                name=target_table,
                con=engine,
                if_exists="replace"
            )
            first = False
            print("Table created")
        
        # Insert chunk
        df_chunk.to_sql(
            name=target_table,
            con=engine,
            if_exists="append",
            chunksize=chunksize
        )

        print(f"Inserted:, {len(df_chunk)} rows")    

In [None]:
ingest_data(pg_user, pg_pass, pg_host, pg_port, pg_db, year, month, target_table, chunksize)

In [67]:
def ingest_data(pg_user, pg_pass, pg_host, pg_port, pg_db, year, month, target_table, chunksize):
    # Data source
    prefix = 'https://d37ci6vzurychx.cloudfront.net/trip-data'
    url = f'{prefix}/green_tripdata_{year}-{month:02d}.parquet'

    # Download parquet file in memory
    response = requests.get(url, stream=True)
    response.raise_for_status()

    # Read the Parquet file from bytes
    table = pq.read_table(BytesIO(response.content))

    # Define SQL engine
    engine = create_engine(f'postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}')

    # Convert table to batches and iterate
    first = True
    for batch in tqdm(table.to_batches(), desc="Processing batches"):
        df_chunk = batch.to_pandas()

        if first:
            # Create table schema (no data)
            df_chunk.head(0).to_sql(
                name=target_table,
                con=engine,
                if_exists="replace",
                index=False
            )
            first = False
            print("Table created")

        # Insert chunk
        df_chunk.to_sql(
            name=target_table,
            con=engine,
            if_exists="append",
            chunksize=chunksize,
            index=False
        )

        print(f"Inserted {len(df_chunk)} rows")

In [None]:
ingest_data(pg_user, pg_pass, pg_host, pg_port, pg_db, year, month, target_table, chunksize)