In [5]:
import pandas as pd
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import click
from tqdm.auto import tqdm

In [None]:
def run(engine, table, filepath, filename, filetype, dtype, parse_dates, chunksize):
    df_iter = None
    match filetype:
        case 'csv':
            df_iter = pd.read_csv(filepath + filename, dtype=dtype, parse_dates=parse_dates, chunksize=chunksize, iterator=True)
            first = True
            for df_chunk in tqdm(df_iter):
                if first:
                    df_chunk.head(0).to_sql(name=table, con=engine, if_exists='replace', index=False)
                    first = False
                    print('Table Created.')
                df_chunk.to_sql(name=table, con=engine, if_exists='append', index=False)
                print("Inserted:", len(df_chunk))
        case 'parquet':
            parquet_file = pq.ParquetFile(filepath + filename)
            first = True
            for df_chunk in tqdm(parquet_file.iter_batches(batch_size=chunksize)):
                df = df_chunk.to_pandas()
                if first:
                    df.head(0).to_sql(name=table, con=engine, if_exists='replace', index=False)
                    first = False
                    print('Table Created.')
                df.to_pandas().to_sql(name=table, con=engine, if_exists='append', index=False, method='multi')
                print("Inserted:", len(df))
        case _:
            df_iter = None
    
    if df_iter is None:        
        print('Please enter a valid filetype argument. Available options: csv or parquet.')

In [None]:
@click.command()
@click.option('--host', default='localhost', help='PostgreSQL host')
@click.option('--port', default=5432, help='PostgreSQL port')
@click.option('--user', default='root', help='PostgreSQL user')
@click.option('--password', default='root', help='PostgreSQL password')
@click.option('--db', default='ny-taxi-db', help='PostgreSQL database name')
@click.option('--table', required=True, help='PostgreSQL table name')
@click.option('--filepath', required=True, default='./', help='Data file path')
@click.option('--filename', required=True, help='Data filename')
@click.option('--filetype', required=True, help='Data file Type - csv, parquet')
@click.option('--chunksize', required=True, default=100000, type=int, help='Chunk size for ingestion')

def main(host, port, user, password, db, table, filepath, filename, filetype, chunksize):

    # explicitly state the column datatypes as pandas might read them differently
    dtype = {
        "VendorID": "Int64",
        "passenger_count": "Int64",
        "trip_distance": "float64",
        "RatecodeID": "Int64",
        "store_and_fwd_flag": "string",
        "PULocationID": "Int64",
        "DOLocationID": "Int64",
        "payment_type": "Int64",
        "fare_amount": "float64",
        "extra": "float64",
        "mta_tax": "float64",
        "tip_amount": "float64",
        "tolls_amount": "float64",
        "improvement_surcharge": "float64",
        "total_amount": "float64",
        "congestion_surcharge": "float64"
    }

    parse_dates = [
        "tpep_pickup_datetime",
        "tpep_dropoff_datetime"
    ]

    engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')

    run(engine, table, filepath, filename, filetype, dtype, parse_dates, chunksize)
    print('Insert complete.')

In [None]:
if __name__=='__main__':
    main()