## Data Ingestion 
Extracted ny dataset in parquet format and ingested into postgres table in batches with proper data format

In [1]:
import pandas as pd

In [2]:
from time import time

In [3]:
%pip install pyarrow

Note: you may need to restart the kernel to use updated packages.


In [4]:
%pip install sqlalchemy

Note: you may need to restart the kernel to use updated packages.


In [5]:
%pip install psycopg2-binary 

Note: you may need to restart the kernel to use updated packages.


In [6]:
pd.__version__

'2.1.4'

In [7]:
df = pd.read_parquet('yellow_tripdata_2022-01.parquet',engine='pyarrow').head(10)

In [8]:
print(pd.io.sql.get_schema(df,'yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


In [9]:
from sqlalchemy import create_engine

In [10]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [11]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7efdd2996b90>

In [12]:
print(pd.io.sql.get_schema(df,'yellow_taxi_data',con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [13]:
import pyarrow.parquet as pq

In [14]:
parquet_file = pq.ParquetFile('yellow_tripdata_2022-01.parquet')

In [None]:
for batch in parquet_file.iter_batches():
    print("RecordBatch")
    batch_df = batch.to_pandas()
    print("batch_df:", batch_df)

In [15]:
batch = parquet_file.iter_batches()

In [16]:
batch_df = next(batch)
print(batch_df.to_pandas().head(0))

Empty DataFrame
Columns: [VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee]
Index: []


In [17]:
batch_df.to_pandas().head(0).to_sql(name='yellow_taxi_data',con=engine, if_exists='replace')

0

In [18]:
batch_df.to_pandas().tpep_pickup_datetime  = pd.to_datetime(batch_df.to_pandas().tpep_pickup_datetime)
batch_df.to_pandas().tpep_dropoff_datetime  = pd.to_datetime(batch_df.to_pandas().tpep_dropoff_datetime)

In [19]:
batch = parquet_file.iter_batches()
while True:
    try:
        t_start = time()
        batch_df = next(batch)
        print(batch_df.num_rows)
        batch_df.to_pandas().tpep_pickup_datetime  = pd.to_datetime(batch_df.to_pandas().tpep_pickup_datetime)
        batch_df.to_pandas().tpep_dropoff_datetime  = pd.to_datetime(batch_df.to_pandas().tpep_dropoff_datetime)
        batch_df.to_pandas().to_sql(name='yellow_taxi_data',con=engine, if_exists='append')
        t_end = time()
        print('Inserted batch.. %.3f seconds' %(t_end - t_start))
    except StopIteration:
        break

65536
Inserted batch.. 6.388 seconds
65536
Inserted batch.. 6.180 seconds
65536
Inserted batch.. 6.118 seconds
65536
Inserted batch.. 6.233 seconds
65536
Inserted batch.. 6.294 seconds
65536
Inserted batch.. 6.223 seconds
65536
Inserted batch.. 6.201 seconds
65536
Inserted batch.. 6.136 seconds
65536
Inserted batch.. 6.193 seconds
65536
Inserted batch.. 6.235 seconds
65536
Inserted batch.. 6.317 seconds
65536
Inserted batch.. 6.218 seconds
65536
Inserted batch.. 6.413 seconds
65536
Inserted batch.. 6.427 seconds
65536
Inserted batch.. 6.410 seconds
65536
Inserted batch.. 6.169 seconds
65536
Inserted batch.. 6.126 seconds
65536
Inserted batch.. 6.126 seconds
65536
Inserted batch.. 6.179 seconds
65536
Inserted batch.. 6.218 seconds
65536
Inserted batch.. 6.249 seconds
65536
Inserted batch.. 6.492 seconds
65536
Inserted batch.. 6.056 seconds
65536
Inserted batch.. 6.474 seconds
65536
Inserted batch.. 6.165 seconds
65536
Inserted batch.. 6.231 seconds
65536
Inserted batch.. 6.585 seconds
6

#### Inserted 2463931 rows to sql table