In [1]:
# pgcli -h localhost -p 5432 -U postgres -d ny_taxi
# pip install -r requirements.in

In [2]:
import requests
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine # for type hinting
import psycopg2
from io import BytesIO, StringIO
from getpass import getpass


def downcast(df: pd.DataFrame, unique_thresh: float = 0.05) -> pd.DataFrame:
    '''Compression of the common dtypes "float64", "int64", "object" or "string"'''
    mem_before = df.memory_usage(deep=True).sum()
    mem_before_mb = round(mem_before / (1024**2), 2)

    df = df.convert_dtypes()

    for column in df.select_dtypes(["string", "object"]):
        if (len(df[column].unique()) / len(df[column])) < unique_thresh:
            df[column] = df[column].astype("category")

    for column in df.select_dtypes(["float"]):
        df[column] = pd.to_numeric(df[column], downcast="float")

    for column in df.select_dtypes(["integer"]):
        if df[column].min() >= 0:
            df[column] = pd.to_numeric(df[column], downcast="unsigned")
        else:
            df[column] = pd.to_numeric(df[column], downcast="signed")

    mem_after = df.memory_usage(deep=True).sum()
    mem_after_mb = round(mem_after / (1024**2), 2)
    compression = round(((mem_before - mem_after) / mem_before) * 100)

    print(
        f"DataFrame compressed by {compression}% from {mem_before_mb} MB down to {mem_after_mb} MB."
    )
    return df


def clean_columns(df: pd.DataFrame) -> pd.DataFrame:
    df.columns = [col.strip().lower().replace(" ", "_") for col in df.columns]
    return df


def fast_insert(df: pd.DataFrame, table_name: str, engine: Engine, chunksize: int=100_000):
    # replace or create a new table with the same schema as the DataFrame
    df.head(0).to_sql(table_name, engine, if_exists='replace', index=False)

    try:
        conn = engine.raw_connection() # direct, low-level connection to the database
        with conn.cursor() as cur:
            # use StringIO to write the DataFrame as an in-memory CSV file
            output = StringIO()

            # iterate through the DataFrame in chunks
            for (idx, chunk) in df.groupby(np.arange(len(df)) // chunksize):
                # write each chunk to the StringIO object
                chunk.to_csv(output, sep='\t', header=False, index=False)
                output.seek(0)

                # use COPY command to load the data into PostgreSQL
                cur.copy_from(output, table_name, null='')
                conn.commit()

                # reset the StringIO object for the next chunk
                output.seek(0)
                output.truncate(0)
    finally:
        conn.close()

[Yellow Taxi Data Dictionary](https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf)

***Tabula requires Java, may be easier to view PDF in browser, copy table manually, or load the parquet in this repo***

In [4]:
# import tabula

# url = 'https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf'

# # Read & Parse the PDF Table
# trips_metadata = (
#     tabula.read_pdf(url, stream=True, pages=1, lattice=True)
#     .iloc[1:]
#     .drop(columns=['Unnamed: 2', 'Description', 'Description.1'])
#     .rename(columns={'Unnamed: 0': 'field', 'Field Name': 'description'})
#     .assign(
#         column=lambda df: df['field'].str.strip().str.lower().str.replace(' ', '_')
#         , description=lambda df: df['description'].str.strip()
#     )
# )

# trips_metadata.to_parquet('data/trips_metadata.parquet')

trips_metadata = pd.read_parquet('data/trips_metadata.parquet')
trips_metadata.head(3)

Unnamed: 0,field,description
1,vendorid,A code indicating the TPEP provider that provi...
2,tpep_pickup_datetime,The date and time when the meter was engaged.
3,tpep_dropoff_datetime,The date and time when the meter was disengaged.


[NYC Yellow Taxi Data (Jan, 2022)](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

In [3]:
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet'
r = requests.get(url, stream=True) # download (stream) the file

# load contents of the streamed response into a BytesIO object (aka an in-memory, RAM not disk, file)
with BytesIO(r.content) as f: 
    trips = (
        pq.read_table(f) # PyArrow is specifically designed to read/write Parquet data and is optimised for performance and memory usage
        .to_pandas()
        .pipe(clean_columns)
        .pipe(downcast)
    )

trips.sample(10_000, random_state=42).to_parquet('data/trips_sample.parquet.gzip', compression='gzip')
trips

DataFrame compressed by 54% from 472.34 MB down to 218.53 MB.


Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2,3.8,1,N,142,236,1,14.5,3.0,0.5,3.65,0.0,0.3,21.95,2.5,0.0
1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1,2.1,1,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.3,13.3,0.0,0.0
2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1,0.97,1,N,166,166,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56,0.0,0.0
3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1,1.09,1,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5,0.0
4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1,4.3,1,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.3,30.3,2.5,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2463926,2,2022-01-31 23:36:53,2022-01-31 23:42:51,,1.32,,,90,170,0,8.0,0.0,0.5,2.39,0.0,0.3,13.69,,
2463927,2,2022-01-31 23:44:22,2022-01-31 23:55:01,,4.19,,,107,75,0,16.8,0.0,0.5,4.35,0.0,0.3,24.45,,
2463928,2,2022-01-31 23:39:00,2022-01-31 23:50:00,,2.1,,,113,246,0,11.22,0.0,0.5,2.0,0.0,0.3,16.52,,
2463929,2,2022-01-31 23:36:42,2022-01-31 23:48:45,,2.92,,,148,164,0,12.4,0.0,0.5,0.0,0.0,0.3,15.7,,


In [5]:
db = {
    'drivername': 'postgresql'
    , 'username': 'postgres'
    , 'password': getpass() # better to use a .env file, but this is just a demo
    , 'host': 'localhost'
    , 'port': '5432'
    , 'database': 'ny_taxi'
}
drivername, username, password, host, port, database = db.values()

engine = create_engine(f"{drivername}://{username}:{password}@{host}:{port}/{database}") # , echo=True

In [6]:
trips_metadata.to_sql('yellow_taxi_trips_metadata', con=engine, if_exists='replace', index=False)

19

In [7]:
# check the table DDL
print(pd.io.sql.get_schema(trips, 'yellow_taxi_trips', con=engine).strip(), ';', end='', sep='')

CREATE TABLE yellow_taxi_trips (
	vendorid SMALLINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count SMALLINT, 
	trip_distance FLOAT(53), 
	ratecodeid SMALLINT, 
	store_and_fwd_flag TEXT, 
	pulocationid INTEGER, 
	dolocationid INTEGER, 
	payment_type SMALLINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
);

[Use Pyscopg2 to load data into Postgres faster than pd.DataFrame.to_sql()](https://stackoverflow.com/questions/23103962/how-to-write-dataframe-to-postgres-table)

In [8]:
drivername = 'postgresql+psycopg2'
engine = create_engine(f"{drivername}://{username}:{password}@{host}:{port}/{database}")

# turned it into a function and added iteration to handle larger dataframes
fast_insert(trips, 'yellow_taxi_trips', engine)