# Ingest NY GREEN trip data

In [2]:
import pandas as pd
import gzip

In [11]:
green_taxi_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-09.csv.gz"
df = pd.read_csv(green_taxi_url, dtype={'store_and_fwd_flag': "object"})
df

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2019-09-01 00:10:53,2019-09-01 00:23:46,N,1.0,65,189,5.0,2.00,10.50,0.50,0.5,2.36,0.00,,0.3,14.16,1.0,1.0,0.0
1,2.0,2019-09-01 00:31:22,2019-09-01 00:44:37,N,1.0,97,225,5.0,3.20,12.00,0.50,0.5,0.00,0.00,,0.3,13.30,2.0,1.0,0.0
2,2.0,2019-09-01 00:50:24,2019-09-01 01:03:20,N,1.0,37,61,5.0,2.99,12.00,0.50,0.5,0.00,0.00,,0.3,13.30,2.0,1.0,0.0
3,2.0,2019-09-01 00:27:06,2019-09-01 00:33:22,N,1.0,145,112,1.0,1.73,7.50,0.50,0.5,1.50,0.00,,0.3,10.30,1.0,1.0,0.0
4,2.0,2019-09-01 00:43:23,2019-09-01 00:59:54,N,1.0,112,198,1.0,3.42,14.00,0.50,0.5,3.06,0.00,,0.3,18.36,1.0,1.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
449058,,2019-09-30 23:13:00,2019-09-30 23:41:00,,,66,71,,7.84,43.25,2.75,0.5,0.00,0.00,,0.0,46.50,,,
449059,,2019-09-30 23:26:00,2019-09-30 23:46:00,,,55,26,,3.56,25.75,2.75,0.5,0.00,0.00,,0.0,29.00,,,
449060,,2019-09-30 23:15:00,2019-09-30 23:43:00,,,139,78,,18.47,50.47,2.75,0.5,0.00,6.12,,0.0,59.84,,,
449061,,2019-09-30 23:19:00,2019-10-01 00:06:00,,,242,188,,20.10,58.22,2.75,0.5,0.00,6.12,,0.0,67.59,,,


In [26]:
df.dtypes

VendorID                 float64
lpep_pickup_datetime      object
lpep_dropoff_datetime     object
store_and_fwd_flag        object
RatecodeID               float64
PULocationID               int64
DOLocationID               int64
passenger_count          float64
trip_distance            float64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
ehail_fee                float64
improvement_surcharge    float64
total_amount             float64
payment_type             float64
trip_type                float64
congestion_surcharge     float64
dtype: object

In [27]:
df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)

In [28]:
df.dtypes

VendorID                        float64
lpep_pickup_datetime     datetime64[ns]
lpep_dropoff_datetime    datetime64[ns]
store_and_fwd_flag               object
RatecodeID                      float64
PULocationID                      int64
DOLocationID                      int64
passenger_count                 float64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                       float64
congestion_surcharge            float64
dtype: object

Number of taxi trip in September 18th 2019

In [29]:
start_day = '2019-09-18 00:00:00'
end_day = '2019-09-18 23:59:59'
taxi = df[(df['lpep_pickup_datetime'] >= start_day) & (df['lpep_dropoff_datetime'] <= end_day)]
len(taxi)

15612

pick up day with the largest trip distance

In [18]:
df.sort_values(by="trip_distance", ascending=False)[['lpep_pickup_datetime','trip_distance']]

Unnamed: 0,lpep_pickup_datetime,trip_distance
337292,2019-09-26 19:32:52,341.64
260941,2019-09-21 00:18:15,135.53
194647,2019-09-16 07:17:37,114.30
355914,2019-09-28 03:30:40,89.64
300408,2019-09-24 01:16:42,82.12
...,...,...
236791,2019-09-19 13:50:01,0.00
352177,2019-09-27 21:19:05,0.00
109394,2019-09-09 14:54:41,0.00
197868,2019-09-16 11:28:05,0.00


Read Zone data

In [12]:
zone_url = "https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv"
zone = pd.read_csv(zone_url)
zone.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [31]:
taxi_zone = pd.merge(df, zone, left_on="PULocationID", right_on="LocationID", how="inner")
taxi_zone

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,LocationID,Borough,Zone,service_zone
0,2.0,2019-09-01 00:10:53,2019-09-01 00:23:46,N,1.0,65,189,5.0,2.00,10.50,...,,0.3,14.16,1.0,1.0,0.0,65,Brooklyn,Downtown Brooklyn/MetroTech,Boro Zone
1,2.0,2019-09-01 00:31:22,2019-09-01 00:44:37,N,1.0,97,225,5.0,3.20,12.00,...,,0.3,13.30,2.0,1.0,0.0,97,Brooklyn,Fort Greene,Boro Zone
2,2.0,2019-09-01 00:50:24,2019-09-01 01:03:20,N,1.0,37,61,5.0,2.99,12.00,...,,0.3,13.30,2.0,1.0,0.0,37,Brooklyn,Bushwick South,Boro Zone
3,2.0,2019-09-01 00:27:06,2019-09-01 00:33:22,N,1.0,145,112,1.0,1.73,7.50,...,,0.3,10.30,1.0,1.0,0.0,145,Queens,Long Island City/Hunters Point,Boro Zone
4,2.0,2019-09-01 00:43:23,2019-09-01 00:59:54,N,1.0,112,198,1.0,3.42,14.00,...,,0.3,18.36,1.0,1.0,0.0,112,Brooklyn,Greenpoint,Boro Zone
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
449058,,2019-09-30 23:13:00,2019-09-30 23:41:00,,,66,71,,7.84,43.25,...,,0.0,46.50,,,,66,Brooklyn,DUMBO/Vinegar Hill,Boro Zone
449059,,2019-09-30 23:26:00,2019-09-30 23:46:00,,,55,26,,3.56,25.75,...,,0.0,29.00,,,,55,Brooklyn,Coney Island,Boro Zone
449060,,2019-09-30 23:15:00,2019-09-30 23:43:00,,,139,78,,18.47,50.47,...,,0.0,59.84,,,,139,Queens,Laurelton,Boro Zone
449061,,2019-09-30 23:19:00,2019-10-01 00:06:00,,,242,188,,20.10,58.22,...,,0.0,67.59,,,,242,Bronx,Van Nest/Morris Park,Boro Zone


Which were the 3 pick up Boroughs that had a sum of total_amount superior to 50000?

In [32]:
a = taxi_zone.groupby("Borough").agg({"total_amount": "sum"})
a[a['total_amount'] >= 50000]

Unnamed: 0_level_0,total_amount
Borough,Unnamed: 1_level_1
Bronx,818158.06
Brooklyn,2619378.54
Manhattan,2427880.92
Queens,2460386.17


In [33]:
ast_sep = taxi_zone[(taxi_zone["Zone"] == "Astoria") & (taxi_zone["lpep_pickup_datetime"].dt.month == 9)]
ast_sep = ast_sep.sort_values("tip_amount", ascending=False)[['DOLocationID', 'tip_amount']]
ast_sep = pd.merge(ast_sep, zone, left_on="DOLocationID", right_on="LocationID", how="inner")
ast_sep

Unnamed: 0,DOLocationID,tip_amount,LocationID,Borough,Zone,service_zone
0,132,62.31,132,Queens,JFK Airport,Airports
1,260,30.00,260,Queens,Woodside,Boro Zone
2,137,28.00,137,Manhattan,Kips Bay,Yellow Zone
3,264,25.00,264,Unknown,NV,
4,239,20.00,239,Manhattan,Upper West Side South,Yellow Zone
...,...,...,...,...,...,...
18257,82,0.00,82,Queens,Elmhurst,Boro Zone
18258,7,0.00,7,Queens,Astoria,Boro Zone
18259,129,0.00,129,Queens,Jackson Heights,Boro Zone
18260,179,0.00,179,Queens,Old Astoria,Boro Zone


## Ingesting into PSQL

In [15]:
from sqlalchemy import create_engine

Generate DDL statements to create table in Postgres

In [16]:
engine = create_engine('postgresql://root:root@postgres:5432/ny_taxi') #need to chang localhost to postgres cause in docker network

engine.connect()

<sqlalchemy.engine.base.Connection at 0x7fe88feab210>

Connect to psql

In [50]:
print(pd.io.sql.get_schema(df, name="green_taxi_data", con=engine))


CREATE TABLE green_taxi_data (
	"VendorID" FLOAT(53), 
	lpep_pickup_datetime TEXT, 
	lpep_dropoff_datetime TEXT, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" FLOAT(53), 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type FLOAT(53), 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [13]:
chunk_size = 100_000
df = pd.read_csv(green_taxi_url, dtype={'store_and_fwd_flag': "string"}, chunksize=chunk_size)

In [17]:
from time import time
for i, chunk in enumerate(df):
    # Some simple cleaning
    chunk.lpep_pickup_datetime = pd.to_datetime(chunk.lpep_pickup_datetime)
    chunk.lpep_dropoff_datetime = pd.to_datetime(chunk.lpep_dropoff_datetime)

    # Push into psql
    try:
        start = time()
        chunk.to_sql(name='green_taxi_data', con=engine, if_exists='append')
        end = time()
        print(f"Chunk {i+1}: Successfully Ingesting {len(chunk)} into PSQL... take {round(end-start,2)} Sec")
    except Exception as e:
        print(e)
        print(f"Failed at chunk {i}, skip...")

Chunk 1: Successfully Ingesting 100000 into PSQL... take 9.42 Sec
Chunk 2: Successfully Ingesting 100000 into PSQL... take 9.4 Sec
Chunk 3: Successfully Ingesting 100000 into PSQL... take 10.14 Sec
Chunk 4: Successfully Ingesting 100000 into PSQL... take 12.97 Sec
Chunk 5: Successfully Ingesting 49063 into PSQL... take 4.64 Sec


In [19]:
df = pd.read_csv(zone_url, chunksize=chunk_size)
table = "zone"
for i, chunk in enumerate(df):
    # Push into psql
    try:
        start = time()
        chunk.to_sql(name=table, con=engine, if_exists='append')
        end = time()
        print(f"Chunk {i+1}: Successfully Ingesting {len(chunk)} into PSQL... take {round(end-start,2)} Sec")
    except Exception as e:
        print(e)
        print(f"Failed at chunk {i}, skip...")

Chunk 1: Successfully Ingesting 265 into PSQL... take 0.04 Sec


In [None]:
df