In [1]:
#import libs 
import pandas as pd
from sqlalchemy import create_engine
from time import time

In [2]:
# import raw data from taxi&limousine commision (csv format)
# csv links found here:https://github.com/DataTalksClub/nyc-tlc-data/releases/tag/yellow
with pd.read_csv('https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz',
                iterator = True,
                compression={'method': 'gzip', 'compresslevel': 1, 'mtime': 1},
                low_memory = False) as reader:
    for chunk in reader:
        df = chunk

In [None]:
# import raw data from taxi&limousine commision (parquet format)
df = pd.read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet', engine = 'pyarrow')

In [None]:
# df EDA        
df.shape
df.columns
df.dtypes

In [3]:
# update df for consecutive handling 
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [None]:
# connect to postgres docker
user = 'root'
pwd = 'root'
host = 'localhost'
port = '5432'
db_name = 'ny_taxi'

engine = create_engine(f'postgresql://{user}:{pwd}@{host}:{port}/{db_name}')
conn = engine.connect()

In [None]:
# get schema format for table to push to postgres
print(pd.io.sql.get_schema(df, name = 'yellow_taxi_data', con = engine))

In [None]:
# push part of the df to postgres 
df.head(n=100).to_sql(name = 'yellow_taxi_data', con = engine, if_exists = 'append')


In [None]:
# time how long it takes for a partition to be pushed into postgres 
%time df.head(n=100).to_sql(name = 'yellow_taxi_data', con = engine, if_exists = 'append')

In [None]:
# full pipeline would be 
df_iter = pd.read_csv('https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz', 
                      iterator = True, 
                      chunksize = 100000,
                      compression={'method': 'gzip', 'compresslevel': 1, 'mtime': 1},
                      low_memory = False)
df = next(df_iter)

# pandas to do a create table query based on df
pd.io.sql.get_schema(df, name = 'tbl_name', con = conn)

# chunk insert df to postgres
from time import time

while True:
    t1 = time()
    df = next(df_iter)
    
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

    df.to_sql(name = 'tbl_name', con = conn, if_exists = 'append')
    t2 = time()

    print(f'inserted chunk in {t2 - t1} seconds')

### Helpful python commands 

In [None]:
# checking pandas version within kernel  
pd.__version__

In [None]:
# checking pandas version from command line/global environment 
!pip freeze | grep pandas

In [None]:
# install wget via homebrew
brew install wget