### Libraries imports

In [1]:
import pandas as pd

from sqlalchemy import create_engine
from sqlalchemy.schema import DropTable

### Data Load

In [14]:
csv_name = './data/green_tripdata_2019-09.csv'
zones_csv_name = './data/taxi_zone_lookup.csv'
table_name = 'green_taxi_data'

df = pd.read_csv(csv_name)

zones_df = pd.read_csv(zones_csv_name)

  df = pd.read_csv(csv_name)


In [7]:
df.shape

(449063, 20)

In [72]:
df.head(5)

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2019-09-01 00:10:53,2019-09-01 00:23:46,N,1.0,65,189,5.0,2.0,10.5,0.5,0.5,2.36,0.0,,0.3,14.16,1.0,1.0,0.0
1,2.0,2019-09-01 00:31:22,2019-09-01 00:44:37,N,1.0,97,225,5.0,3.2,12.0,0.5,0.5,0.0,0.0,,0.3,13.3,2.0,1.0,0.0
2,2.0,2019-09-01 00:50:24,2019-09-01 01:03:20,N,1.0,37,61,5.0,2.99,12.0,0.5,0.5,0.0,0.0,,0.3,13.3,2.0,1.0,0.0
3,2.0,2019-09-01 00:27:06,2019-09-01 00:33:22,N,1.0,145,112,1.0,1.73,7.5,0.5,0.5,1.5,0.0,,0.3,10.3,1.0,1.0,0.0
4,2.0,2019-09-01 00:43:23,2019-09-01 00:59:54,N,1.0,112,198,1.0,3.42,14.0,0.5,0.5,3.06,0.0,,0.3,18.36,1.0,1.0,0.0


### Data Transformation

In [73]:
df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)

### Data Upload

In [2]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [76]:
print(pd.io.sql.get_schema(df, name='green_taxi_data', con=engine))


CREATE TABLE green_taxi_data (
	"VendorID" FLOAT(53), 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" FLOAT(53), 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type FLOAT(53), 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [3]:
db = engine.connect()

In [78]:
query = """
SELECT 1 as number;
"""

pd.read_sql(query, con=engine)

Unnamed: 0,number
0,1


In [24]:
query = """
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND 
    schemaname != 'information_schema';
"""

pd.read_sql(query, con=engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,green_taxi_trips,root,,True,False,False,False
1,public,taxi_trips_zones,root,,True,False,False,False


In [35]:
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)

df = next(df_iter)

df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)

df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')

%time df.to_sql(name=table_name, con=engine, if_exists='append')

1000

In [101]:
from time import time

In [102]:
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)

while True: 
    try:
        t_start = time()
        
        df = next(df_iter)
    
        df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
        df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    
        df.to_sql(name=table_name, con=engine, if_exists='append')
    
        t_end = time()
    
        print('inserted another chunk, took %.3f second' % (t_end - t_start))
    
    except StopIteration:
        print("Finished ingesting data into the postgres database")
        break

inserted another chunk, took 5.695 second
inserted another chunk, took 5.579 second
inserted another chunk, took 5.367 second


  df = next(df_iter)


inserted another chunk, took 5.637 second
inserted another chunk, took 2.295 second
Finished ingesting data into the postgres database


In [16]:
zones_df.LocationID.nunique()

265

In [19]:
zones_df.shape

(265, 4)

In [18]:
df['DOLocationID'].nunique()

260

In [20]:
zones_df.to_sql(name='taxi_trips_zones', con=engine, if_exists='append')

265

### Querying data

In [27]:
query = """
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'green_taxi_trips';
"""
pd.read_sql(query, con=engine)

Unnamed: 0,column_name,data_type
0,congestion_surcharge,double precision
1,VendorID,bigint
2,lpep_pickup_datetime,timestamp without time zone
3,lpep_dropoff_datetime,timestamp without time zone
4,index,bigint
5,RatecodeID,bigint
6,PULocationID,bigint
7,DOLocationID,bigint
8,passenger_count,bigint
9,trip_distance,double precision


In [33]:
query = """
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'taxi_trips_zones';
"""
pd.read_sql(query, con=engine)

Unnamed: 0,column_name,data_type
0,index,bigint
1,LocationID,bigint
2,Borough,text
3,Zone,text
4,service_zone,text


In [25]:
query = """
SELECT
    COUNT(*)
FROM green_taxi_trips
"""
pd.read_sql(query, con=engine)

Unnamed: 0,count
0,449063


In [35]:
query = """
SELECT
    ttz."Borough",
    gtt."total_amount"
FROM green_taxi_trips gtt
LEFT JOIN taxi_trips_zones ttz
    ON gtt."PULocationID" = ttz."LocationID"
WHERE gtt."lpep_pickup_datetime" = '2019-09-18' and gtt."total_amount" > 50000
ORDER BY gtt."total_amount" DESC
"""
pd.read_sql(query, con=engine)

Unnamed: 0,Borough,total_amount


In [38]:
query = """
SELECT
    gtt."PULocationID",
    ttz."Borough",
    gtt."total_amount"
FROM green_taxi_trips gtt
LEFT JOIN taxi_trips_zones ttz
    ON gtt."PULocationID" = ttz."LocationID"
"""
pd.read_sql(query, con=engine)

Unnamed: 0,PULocationID,Borough,total_amount
0,65,Brooklyn,14.16
1,97,Brooklyn,13.30
2,37,Brooklyn,13.30
3,145,Queens,10.30
4,112,Brooklyn,18.36
...,...,...,...
449058,66,Brooklyn,46.50
449059,55,Brooklyn,29.00
449060,139,Queens,59.84
449061,242,Bronx,67.59


In [56]:
# This query runs correctly, directly on PostgreSQL console 
query = """
SELECT
    gtt."lpep_pickup_datetime",
    gtt."DOLocationID",
    gtt."PULocationID",
    ttz."Borough",
	ttz2."Zone",
    gtt."tip_amount"
FROM green_taxi_trips gtt
LEFT JOIN taxi_trips_zones ttz
    ON gtt."PULocationID" = ttz."LocationID"
LEFT JOIN taxi_trips_zones ttz2
    ON gtt."DOLocationID" = ttz2."LocationID"
WHERE EXTRACT(YEAR FROM gtt."lpep_pickup_datetime") = '2019'
    AND EXTRACT(MONTH FROM gtt."lpep_pickup_datetime") = '9'
    AND ttz."Zone" LIKE '%Astoria%'
ORDER BY gtt."tip_amount" DESC;
"""

pd.read_sql(query, con=engine)

In [39]:
query = """
SELECT
    *
FROM green_taxi_trips gtt
ORDER BY gtt."total_amount" DESC	
"""
pd.read_sql(query, con=engine)

Unnamed: 0,index,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,...,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,337292,2.0,2019-09-26 19:32:52,2019-09-27 01:50:27,N,4.0,265,16,1.0,341.64,...,1.0,0.50,0.0,0.0,,0.3,1762.80,2.0,1.0,0.0
1,35116,2.0,2019-09-04 09:55:07,2019-09-06 14:09:12,N,1.0,193,193,1.0,0.30,...,0.0,0.50,0.0,0.0,,0.3,1600.80,2.0,1.0,0.0
2,321679,2.0,2019-09-25 18:04:33,2019-09-26 11:48:28,N,1.0,193,193,1.0,0.00,...,0.0,17.33,0.0,0.0,,0.0,595.14,2.0,1.0,0.0
3,90880,1.0,2019-09-08 03:18:45,2019-09-08 03:18:45,Y,5.0,80,264,1.0,0.00,...,0.0,0.00,0.0,0.0,,0.0,500.01,2.0,1.0,0.0
4,334389,2.0,2019-09-26 17:14:56,2019-09-27 09:44:33,N,1.0,193,193,1.0,0.00,...,1.0,0.50,0.0,0.0,,0.3,498.80,2.0,1.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
449058,184137,2.0,2019-09-15 05:21:54,2019-09-15 05:21:58,N,5.0,130,130,1.0,0.00,...,0.0,0.00,0.0,0.0,,0.0,-96.00,3.0,2.0,0.0
449059,184139,2.0,2019-09-15 05:24:15,2019-09-15 05:26:50,N,5.0,130,130,1.0,0.00,...,0.0,0.00,0.0,0.0,,0.0,-96.00,3.0,2.0,0.0
449060,258688,2.0,2019-09-20 21:06:18,2019-09-20 21:18:05,N,5.0,28,28,2.0,0.00,...,0.0,0.00,0.0,0.0,,0.0,-119.00,3.0,2.0,0.0
449061,124975,2.0,2019-09-10 22:41:04,2019-09-10 22:44:57,N,5.0,260,260,1.0,0.31,...,0.0,0.00,0.0,0.0,,0.0,-120.00,4.0,2.0,0.0


In [105]:
# Specify the date range (replace with your desired start and end dates)
start_date = '2019-09-18 00:00:00'
end_date = '2019-09-18 23:59:59'

query = f"""
SELECT
    COUNT(*)
FROM green_taxi_data
WHERE lpep_pickup_datetime BETWEEN '{start_date}' AND '{end_date}'
    AND lpep_dropoff_datetime BETWEEN '{start_date}' AND '{end_date}' 
"""

pd.read_sql(query, con=engine)

Unnamed: 0,count
0,15612


### Drop tables and delete records

In [63]:
df = pd.DataFrame()
df.to_sql('yellow_taxi_data', con=engine, if_exists='replace', index=False)

0

### Close connection

In [87]:
db.close()