In [1]:
# Libraries

import pandas as pd
import warnings
import sqlalchemy 


warnings.filterwarnings('ignore')
print('SQLAlchemy version', sqlalchemy.__version__)
print('Pandas version', pd.__version__)

SQLAlchemy version 2.0.37
Pandas version 1.5.3


In [2]:
df = pd.read_csv('dataset/green_tripdata_2019-10.csv', nrows=100)
df

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2019-10-01 00:26:02,2019-10-01 00:39:58,N,1,112,196,1,5.88,18.0,0.50,0.5,0.00,0.0,,0.3,19.30,2,1,0.0
1,1,2019-10-01 00:18:11,2019-10-01 00:22:38,N,1,43,263,1,0.80,5.0,3.25,0.5,0.00,0.0,,0.3,9.05,2,1,0.0
2,1,2019-10-01 00:09:31,2019-10-01 00:24:47,N,1,255,228,2,7.50,21.5,0.50,0.5,0.00,0.0,,0.3,22.80,2,1,0.0
3,1,2019-10-01 00:37:40,2019-10-01 00:41:49,N,1,181,181,1,0.90,5.5,0.50,0.5,0.00,0.0,,0.3,6.80,2,1,0.0
4,2,2019-10-01 00:08:13,2019-10-01 00:17:56,N,1,97,188,1,2.52,10.0,0.50,0.5,2.26,0.0,,0.3,13.56,1,1,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2,2019-10-01 00:02:53,2019-10-01 00:14:32,N,1,126,74,1,3.10,12.0,0.50,0.5,0.00,0.0,,0.3,13.30,1,1,0.0
96,2,2019-10-01 00:18:45,2019-10-01 00:29:23,N,1,42,74,1,1.64,9.5,0.50,0.5,0.00,0.0,,0.3,10.80,2,1,0.0
97,2,2019-10-01 00:41:32,2019-10-01 00:52:51,N,1,75,42,1,3.17,11.5,0.50,0.5,1.50,0.0,,0.3,14.30,1,1,0.0
98,2,2019-10-01 00:36:54,2019-10-01 00:54:20,N,1,92,179,1,5.48,19.5,0.50,0.5,0.00,0.0,,0.3,20.80,2,1,0.0


In [3]:
# Generate Schema using pandas

print(pd.io.sql.get_schema(df, name='green_taxi_data')) ##DDL: describe how schema should be in DB


CREATE TABLE "green_taxi_data" (
"VendorID" INTEGER,
  "lpep_pickup_datetime" TEXT,
  "lpep_dropoff_datetime" TEXT,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" INTEGER,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" INTEGER,
  "trip_type" INTEGER,
  "congestion_surcharge" REAL
)


In [4]:
# Need to change "lpep_pickup_datetime", "lpep_dropoff_datetime" to datetime
# Better to change REAL to float or other types
df['lpep_dropoff_datetime'] = pd.to_datetime(df.lpep_dropoff_datetime)
df['lpep_pickup_datetime'] = pd.to_datetime(df.lpep_pickup_datetime)

# Generate Schema using pandas

print(pd.io.sql.get_schema(df, name='green_taxi_data')) # DDL: describe how schema should be in DB

CREATE TABLE "green_taxi_data" (
"VendorID" INTEGER,
  "lpep_pickup_datetime" TIMESTAMP,
  "lpep_dropoff_datetime" TIMESTAMP,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" INTEGER,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" INTEGER,
  "trip_type" INTEGER,
  "congestion_surcharge" REAL
)


## Load green_taxi_data

In [3]:
# Connect to Postgres

engine = sqlalchemy.create_engine('postgresql://root:root@localhost:5432/ny_taxi')
connection = engine.connect()
connection

<sqlalchemy.engine.base.Connection at 0x1128ef910>

In [4]:
print(pd.io.sql.get_schema(df, name='green_taxi_data', con=engine)) ##DDL: describe how schema should be in DB


CREATE TABLE green_taxi_data (
	"VendorID" BIGINT, 
	lpep_pickup_datetime TEXT, 
	lpep_dropoff_datetime TEXT, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" BIGINT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type BIGINT, 
	trip_type BIGINT, 
	congestion_surcharge FLOAT(53)
)




In [5]:
# User iterator to ingest data iteratively by a smaller chunksize

df_iter = pd.read_csv('dataset/green_tripdata_2019-10.csv', iterator=True, chunksize=100000)
df = next(df_iter)
df['lpep_dropoff_datetime'] = pd.to_datetime(df.lpep_dropoff_datetime)
df['lpep_pickup_datetime'] = pd.to_datetime(df.lpep_pickup_datetime)

In [6]:
# Create a table with columns, do not want to insert anything yet

df.head(n=0).to_sql(name='green_taxi_data', con=engine, if_exists='replace')
print('successfully created table')

successfully created table


In [7]:
# Check whether the table is created
check_table = pd.read_sql_query(sqlalchemy.text('select * from green_taxi_data limit 10;')
                                , con=connection)
check_table

Unnamed: 0,index,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,...,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge


In [8]:
# Time it
from time import time

In [9]:
# Load data in batches
df_iter = pd.read_csv('dataset/green_tripdata_2019-10.csv', iterator=True, chunksize=100000)

try:
  while True:
    start = time()
    df = next(df_iter)
    df['lpep_dropoff_datetime'] = pd.to_datetime(df.lpep_dropoff_datetime)
    df['lpep_pickup_datetime'] = pd.to_datetime(df.lpep_pickup_datetime)
    df.to_sql(name='green_taxi_data', con=engine, if_exists='append')
    end = time()
    print(f'Successfully inserted {len(df)} data for {end-start:.2f} seconds')
except:
  print("No more data to insert")

Successfully inserted 100000 data for 15.09 seconds
Successfully inserted 100000 data for 16.11 seconds
Successfully inserted 100000 data for 16.99 seconds
Successfully inserted 100000 data for 14.25 seconds
Successfully inserted 76386 data for 10.45 seconds
No more data to insert


In [10]:
# Check data amount is correct
csv_data = pd.read_csv('dataset/green_tripdata_2019-10.csv')
sql_data = pd.read_sql_query(sqlalchemy.text('select count(*) from green_taxi_data'), connection)
print(f'CSV data has {len(csv_data)} rows. SQL data has {sql_data.iloc[0,0]} rows')

CSV data has 476386 rows. SQL data has 476386 rows


In [17]:
sql_data = pd.read_sql_query(sqlalchemy.text('select * from green_taxi_data order by index limit 10;'), connection)
sql_data

Unnamed: 0,index,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,...,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,0,2,2019-10-01 00:26:02,2019-10-01 00:39:58,N,1,112,196,1,5.88,...,0.5,0.5,0.0,0.0,,0.3,19.3,2,1.0,0.0
1,1,1,2019-10-01 00:18:11,2019-10-01 00:22:38,N,1,43,263,1,0.8,...,3.25,0.5,0.0,0.0,,0.3,9.05,2,1.0,0.0
2,2,1,2019-10-01 00:09:31,2019-10-01 00:24:47,N,1,255,228,2,7.5,...,0.5,0.5,0.0,0.0,,0.3,22.8,2,1.0,0.0
3,3,1,2019-10-01 00:37:40,2019-10-01 00:41:49,N,1,181,181,1,0.9,...,0.5,0.5,0.0,0.0,,0.3,6.8,2,1.0,0.0
4,4,2,2019-10-01 00:08:13,2019-10-01 00:17:56,N,1,97,188,1,2.52,...,0.5,0.5,2.26,0.0,,0.3,13.56,1,1.0,0.0
5,5,2,2019-10-01 00:35:01,2019-10-01 00:43:40,N,1,65,49,1,1.47,...,0.5,0.5,1.86,0.0,,0.3,11.16,1,1.0,0.0
6,6,1,2019-10-01 00:28:09,2019-10-01 00:30:49,N,1,7,179,1,0.6,...,0.5,0.5,1.0,0.0,,0.3,6.3,1,1.0,0.0
7,7,2,2019-10-01 00:28:26,2019-10-01 00:32:01,N,1,41,74,1,0.56,...,0.5,0.5,0.0,0.0,,0.3,5.8,2,1.0,0.0
8,8,2,2019-10-01 00:14:01,2019-10-01 00:26:16,N,1,255,49,1,2.42,...,0.5,0.5,0.0,0.0,,0.3,11.8,2,1.0,0.0
9,9,1,2019-10-01 00:03:03,2019-10-01 00:17:13,Y,1,130,131,1,3.4,...,0.5,0.5,2.85,0.0,,0.3,17.15,1,1.0,0.0


In [19]:
# Close connection
connection.close()

## Load taxi_lookup table into Postgres manually

In [24]:
# Load taxi_lookup table into Postgres manually

engine = sqlalchemy.create_engine('postgresql://root:root@localhost:5432/ny_taxi')
connection = engine.connect()

In [26]:
df= pd.read_csv('dataset/taxi_zone_lookup.csv', nrows=100)
print(pd.io.sql.get_schema(df, name='taxi_zone_lookup', con=engine))


CREATE TABLE taxi_zone_lookup (
	"LocationID" BIGINT, 
	"Borough" TEXT, 
	"Zone" TEXT, 
	service_zone TEXT
)




In [27]:
df.head(n=0).to_sql(name='taxi_zone_lookup', con=engine, if_exists='replace')
print('successfully created table')

successfully created table


In [29]:
# Load data in batches
df_iter = pd.read_csv('dataset/taxi_zone_lookup.csv', iterator=True, chunksize=100000)

try:
  while True:
    start = time()
    df = next(df_iter)
    df.to_sql(name='taxi_zone_lookup', con=engine, if_exists='append')
    end = time()
    print(f'Successfully inserted {len(df)} data for {end-start:.2f} seconds')
except:
  print("No more data to insert")

Successfully inserted 265 data for 0.04 seconds
No more data to insert
