In [None]:
# create a table in Postgres first, need to connect to Postgresql first
# insert data in chuncks

In [31]:
import pandas as pd
from sqlalchemy import create_engine
import pyarrow.parquet as pq
from sqlalchemy import text

In [2]:
pd.__version__

'2.2.2'

In [36]:
# use this way to just get df file and get DDL command to create table. To actually load data in chunks, use pyarrow down below
df = pd.read_parquet('/Users/estelle/Downloads/yellow_tripdata_2021-01.parquet')

In [42]:
table_name = 'yellow_taxi_data'
chunk_size = 100000

In [24]:
# to change data type, this can be omitted here
#df.to_datetime(df.tpep_pickup_datetime)

In [15]:
#user:password@host:portal/database_name
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [17]:
#test if engine works
#engine.connect()

<sqlalchemy.engine.base.Connection at 0x10da49110>

In [29]:
#only can be used in Pandas, this uses df to generate DDL command used to create a table in Postgres
create_table_sql = pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine)

In [34]:
#create table in Postgresql
with engine.connect() as conn:
    conn.execute(text(f"DROP TABLE IF EXISTS {table_name};"))  # Optional: drop if exists
    conn.execute(text(create_table_sql)) # text: This wraps your raw SQL string in a sqlalchemy.sql.text object, which allows SQLAlchemy to safely execute raw SQL.
    conn.commit()

In [41]:
#load data in chuncks
pf = pq.ParquetFile('/Users/estelle/Downloads/yellow_tripdata_2021-01.parquet')
total_rows = pf.metadata.num_rows
print(f'Total rows in file:{total_rows:,}') # add , to do thousands

Total rows in file:1,369,769


In [46]:
#using pyarrow to load in chunck
# first Read all data (all row groups) into a PyArrow Table, then slice it
for i in range(0, total_rows, chunk_size):
    #pf.num_row_groups return number of groups, and range create a list like in a for loop
    table_chunck = pf.read_row_groups(range(pf.num_row_groups)).slice(i, chunk_size)
    df_chunk = table_chunck.to_pandas()
    df_chunk.to_sql(table_name, engine, if_exists='append', index = False)
    print(f'Inserted rows {i+1} to {min(i + chunk_size, total_rows):,}')
print('Done loading all chunks.')

Inserted rows 1 to 100,000
Inserted rows 100001 to 200,000
Inserted rows 200001 to 300,000
Inserted rows 300001 to 400,000
Inserted rows 400001 to 500,000
Inserted rows 500001 to 600,000
Inserted rows 600001 to 700,000
Inserted rows 700001 to 800,000
Inserted rows 800001 to 900,000
Inserted rows 900001 to 1,000,000
Inserted rows 1000001 to 1,100,000
Inserted rows 1100001 to 1,200,000
Inserted rows 1200001 to 1,300,000
Inserted rows 1300001 to 1,369,769
Done loading all chunks.


769