In [1]:
import pandas as pd
import duckdb
from sqlalchemy import create_engine
from tqdm.auto import tqdm
import click
from pathlib import Path

In [5]:
Path.cwd()
Path(Path.cwd(),"query.sql")

PosixPath('/home/lu-landru/zoocamp/module1/DE-ZoomCamp-Module1-Docker-SQL/query.sql')

In [None]:
### SCHEMA #########

dtype={

 'VendorID': 'Int64',
 'store_and_fwd_flag': 'object',
 'RatecodeID': 'Int64',
 'PULocationID': 'Int64',
 'DOLocationID': 'Int64',
 'passenger_count': 'Int64',
 'trip_distance': 'float64',
 'fare_amount': 'float64',
 'extra': 'float64',
 'mta_tax': 'float64',
 'tip_amount': 'float64',
 'tolls_amount': 'float64',
 'ehail_fee': 'float64',
 'improvement_surcharge': 'float64',
 'total_amount': 'float64',
 'payment_type': 'float64',
 'trip_type': 'Int64',
 'congestion_surcharge': 'float64',
 'cbd_congestion_fee': 'float64'  
}

parse_dates = [
    "lpep_pickup_datetime",
    "lpep_dropoff_datetime"
]

df = pd.read_parquet(url).astype(dtype)

In [3]:

################
## VARIABLES###
##############


year = 2025
month = 11
port = 5432
user = "root"
database = "ny_taxi"
host = "localhost"
password = "root"
batch_size = 10000
prefix = "https://d37ci6vzurychx.cloudfront.net/trip-data/"
file_name = "green_tripdata"
target_table = "yellow_taxi_data"



###############################################
## initialize batches to be ingested into db
###############################################


def init_batch(prefix,file_name,batch_size,year,month):

    url =f"{prefix}{file_name}_{year}-{month:02d}.parquet"
    open("query.sql","r").read().format(url)

    con = duckdb.connect()
    con.execute("INSTALL httpfs; LOAD httpfs;")
    record_batch_reader = con.execute(
        f"""
        SELECT VendorID::BIGINT as VendorID ,
               lpep_pickup_datetime::TIMESTAMP as lpep_pickup_datetime ,
               lpep_dropoff_datetime::TIMESTAMP as lpep_dropoff_datetime ,
               store_and_fwd_flag::VARCHAR as store_and_fwd_flag ,
               RatecodeID::BIGINT as RatecodeID ,
               PULocationID::BIGINT as PULocationID ,
               DOLocationID::BIGINT as DOLocationID ,
               passenger_count::BIGINT as passenger_count ,
               trip_distance::DOUBLE as trip_distance ,
               fare_amount::DOUBLE as fare_amount ,
               extra::DOUBLE as extra ,
               mta_tax::DOUBLE as mta_tax ,
               tip_amount::DOUBLE as tip_amount ,
               tolls_amount::DOUBLE as tolls_amount ,
               ehail_fee::DOUBLE as ehail_fee ,
               improvement_surcharge::DOUBLE as improvement_surcharge ,
               total_amount::DOUBLE as total_amount ,
               payment_type::DOUBLE as payment_type ,
               trip_type::BIGINT as trip_type ,
               congestion_surcharge::DOUBLE as congestion_surcharge ,
               cbd_congestion_fee::DOUBLE as cbd_congestion_fee 
        FROM '{url}'
    """).fetch_record_batch(rows_per_batch=batch_size)

    return record_batch_reader



###############################
## ingest batches into database
###############################


def ingest_data(batch_reader,user,password,host,port,database,target_table):

    engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{database}')
    first = True

    for batch in tqdm(batch_reader):
        chunk_df = batch.to_pandas()

        if first:
            chunk_df.head(n=0).to_sql(name = target_table, con = engine, if_exists = "replace")
            print(pd.io.sql.get_schema(chunk_df, name = target_table,con=engine))
            first = False
        chunk_df.to_sql(name = target_table, con =engine, if_exists = 'append')
        print(f"Processing batch of {len(chunk_df)} rows")



batch_reader = init_batch(prefix=prefix,file_name=file_name,batch_size=batch_size,year=year,month=month)

ingest_data(batch_reader=batch_reader,
            user=user,
            password=password,
            host=host,
            port=port,
            database=database,
            target_table=target_table)



0it [00:00, ?it/s]


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" BIGINT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type FLOAT(53), 
	trip_type BIGINT, 
	congestion_surcharge FLOAT(53), 
	cbd_congestion_fee FLOAT(53)
)


Processing batch of 10000 rows
Processing batch of 10000 rows
Processing batch of 10000 rows


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processing batch of 10000 rows
Processing batch of 6912 rows


In [25]:
df.dtypes

VendorID                          Int64
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag               object
RatecodeID                        Int64
PULocationID                      Int64
DOLocationID                      Int64
passenger_count                   Int64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                         Int64
congestion_surcharge            float64
cbd_congestion_fee              float64
dtype: object

In [37]:
print(open("query.sql","r").read().format('tt'))

SELECT VendorID::BIGINT,
               lpep_pickup_datetime::TIMESTAMP,
               lpep_dropoff_datetime::TIMESTAMP,
               store_and_fwd_flag::VARCHAR,
               RatecodeID::BIGINT,
               PULocationID::BIGINT,
               DOLocationID::BIGINT,
               passenger_count::BIGINT,
               trip_distance::DOUBLE,
               fare_amount::DOUBLE,
               extra::DOUBLE,
               mta_tax::DOUBLE,
               tip_amount::DOUBLE,
               tolls_amount::DOUBLE,
               ehail_fee::DOUBLE,
               improvement_surcharge::DOUBLE,
               total_amount::DOUBLE,
               payment_type::DOUBLE,
               trip_type::BIGINT,
               congestion_surcharge::DOUBLE,
               cbd_congestion_fee::DOUBLE
        FROM 'tt'


In [35]:
with open('query.sql','r') as q:
  q.read().format('table')