In [1]:
import pandas as pd
import polars as pl
import sqlalchemy as sqla
from psycopg2 import sql
import pyarrow
import psycopg2.extras

In [2]:
df_pl = pl.read_parquet('yellow_tripdata_2021-01.parquet')

In [11]:
df_pd = pl.read_csv('yellow_tripdata_2019-01.csv')

In [3]:
engine = sqla.create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [12]:
print(pd.io.sql.get_schema(df_pd.to_pandas(), name='yellow_taxi_data_2019', con=engine))



CREATE TABLE yellow_taxi_data_2019 (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TEXT, 
	tpep_dropoff_datetime TEXT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge TEXT
)




In [9]:
def ingest_to_postgre(df):
    for col in df:
        if col.dtype == pl.Date:
            df = df.with_column(col.dt.to_pythone_datetime)
    
    # create sql identifiers for the column names
    # we do this to safely insert this into a sql query
    columns = sql.SQL(",").join(sql.Identifier(name) for name in df.columns)

    # create placeholders for the values. These will be filled later
    values = sql.SQL(",").join([sql.Placeholder() for _ in df.columns])

    table_id = "yellow_taxi_data_2019"
    
    create_stmt = sql.SQL("""
    
        CREATE TABLE yellow_taxi_data_2019 (
            "VendorID" BIGINT, 
            tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
            tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
            passenger_count FLOAT(53), 
            trip_distance FLOAT(53), 
            "RatecodeID" FLOAT(53), 
            store_and_fwd_flag TEXT, 
            "PULocationID" BIGINT, 
            "DOLocationID" BIGINT, 
            payment_type BIGINT, 
            fare_amount FLOAT(53), 
            extra FLOAT(53), 
            mta_tax FLOAT(53), 
            tip_amount FLOAT(53), 
            tolls_amount FLOAT(53), 
            improvement_surcharge FLOAT(53), 
            total_amount FLOAT(53), 
            congestion_surcharge FLOAT(53), 
            airport_fee FLOAT(53)
        )
    """
    )
    # prepare the insert query
    insert_stmt = sql.SQL("INSERT INTO {} ({}) VALUES({});").format(
        sql.Identifier(table_id), columns, values
    )

    # make a connection
    try:
        conn = psycopg2.connect(
                            host="localhost",
                            database="ny_taxi",
                            user="root",
                            password="root")
        print("Success to connect")
    except:
        print("Fail to connect")
        
    cur = conn.cursor()
    cur.execute(create_stmt)
    # do the insert
    psycopg2.extras.execute_batch(cur, insert_stmt, df.rows())
    conn.commit()


In [10]:
ingest_to_postgre(df)

In [13]:
ingest_to_postgre(df_2019)

DuplicateTable: relation "yellow_taxi_data" already exists


In [12]:
df.write_csv("yellow_tripdata_2021-01.csv")

In [26]:
df_pd = (pd.read_csv("yellow_tripdata_2021-01.csv", nrows=100)
            .assign(tpep_pickup_datetime=lambda df_pd: pd.to_datetime(df_pd.tpep_pickup_datetime), 
                    tpep_dropoff_datetime=lambda df_pd: pd.to_datetime(df_pd.tpep_dropoff_datetime))
        )
print(pd.io.sql.get_schema(df_pd, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)


