## Load New York Taxi Data

In [6]:
import subprocess
import pandas as pd
from sqlalchemy import create_engine, text
from urllib.parse import quote_plus
from time import time

In [40]:
trip_data_file_path = "/var/tmp/output.parquet"
green_table_name = "green_tripdata_2019_09"
taxi_zones_data_file_path = "/workspaces/data-engineering-zoomcamp/data/taxi_zones.csv"
taxi_zones_table_name = "taxi_zones"

In [9]:
subprocess.run(["wget", "-O", trip_data_file_path, "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-09.parquet"])

--2025-01-06 08:59:59--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-09.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 143.204.102.120, 143.204.102.43, 143.204.102.123, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|143.204.102.120|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7692006 (7.3M) [binary/octet-stream]
Saving to: ‘/var/tmp/output.parquet’

     0K .......... .......... .......... .......... ..........  0% 1.33M 5s
    50K .......... .......... .......... .......... ..........  1%  589K 9s
   100K .......... .......... .......... .......... ..........  1% 1.05M 8s
   150K .......... .......... .......... .......... ..........  2% 3.37M 7s
   200K .......... .......... .......... .......... ..........  3% 21.7M 5s
   250K .......... .......... .......... .......... ..........  3% 3.65M 5s
   300K .......... .......... .......... .......... ..........  4% 27.

CompletedProcess(args=['wget', '-O', '/var/tmp/output.parquet', 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-09.parquet'], returncode=0)

In [11]:
df_first_rows = pd.read_parquet(trip_data_file_path)

In [12]:
df_first_rows.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2019-09-01 00:10:53,2019-09-01 00:23:46,N,1.0,65,189,5.0,2.0,10.5,0.5,0.5,2.36,0.0,,0.3,14.16,1.0,1.0,0.0
1,2,2019-09-01 00:31:22,2019-09-01 00:44:37,N,1.0,97,225,5.0,3.2,12.0,0.5,0.5,0.0,0.0,,0.3,13.3,2.0,1.0,0.0
2,2,2019-09-01 00:50:24,2019-09-01 01:03:20,N,1.0,37,61,5.0,2.99,12.0,0.5,0.5,0.0,0.0,,0.3,13.3,2.0,1.0,0.0
3,2,2019-09-01 00:27:06,2019-09-01 00:33:22,N,1.0,145,112,1.0,1.73,7.5,0.5,0.5,1.5,0.0,,0.3,10.3,1.0,1.0,0.0
4,2,2019-09-01 00:43:23,2019-09-01 00:59:54,N,1.0,112,198,1.0,3.42,14.0,0.5,0.5,3.06,0.0,,0.3,18.36,1.0,1.0,0.0


### Display data types

In [13]:
df_first_rows.dtypes

VendorID                          int64
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag               object
RatecodeID                      float64
PULocationID                      int64
DOLocationID                      int64
passenger_count                 float64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                        object
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                       float64
congestion_surcharge            float64
dtype: object

### Change datatype for pickup and dropoff datetime

In [14]:
df_first_rows.lpep_pickup_datetime = pd.to_datetime(df_first_rows.lpep_pickup_datetime)
df_first_rows.lpep_dropoff_datetime = pd.to_datetime(df_first_rows.lpep_dropoff_datetime)

In [15]:
df_first_rows.dtypes

VendorID                          int64
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag               object
RatecodeID                      float64
PULocationID                      int64
DOLocationID                      int64
passenger_count                 float64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                        object
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                       float64
congestion_surcharge            float64
dtype: object

In [108]:
df_first_rows.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2019-09-01 00:10:53,2019-09-01 00:23:46,N,1,65,189,5,2.0,10.5,0.5,0.5,2.36,0.0,,0.3,14.16,1,1,0.0
1,2,2019-09-01 00:31:22,2019-09-01 00:44:37,N,1,97,225,5,3.2,12.0,0.5,0.5,0.0,0.0,,0.3,13.3,2,1,0.0
2,2,2019-09-01 00:50:24,2019-09-01 01:03:20,N,1,37,61,5,2.99,12.0,0.5,0.5,0.0,0.0,,0.3,13.3,2,1,0.0
3,2,2019-09-01 00:27:06,2019-09-01 00:33:22,N,1,145,112,1,1.73,7.5,0.5,0.5,1.5,0.0,,0.3,10.3,1,1,0.0
4,2,2019-09-01 00:43:23,2019-09-01 00:59:54,N,1,112,198,1,3.42,14.0,0.5,0.5,3.06,0.0,,0.3,18.36,1,1,0.0


### Show query to create a table

In [16]:
encoded_password = quote_plus("P@ssw0rd!")
encoded_password

'P%40ssw0rd%21'

In [17]:
engine = create_engine(f"postgresql://postgres:{encoded_password}@db:5432/ny_taxi")

In [18]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7ff89f9a89b0>

In [19]:
print(pd.io.sql.get_schema(df_first_rows, name=green_table_name, con=engine))


CREATE TABLE green_tripdata_2019_09 (
	"VendorID" BIGINT, 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" FLOAT(53), 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee TEXT, 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type FLOAT(53), 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




### Load Green Taxy 2019-09 data

### Create a table with the Green Taxy 2019-09 data

### Use iterator to upload data in chunks of 100000 rows

In [35]:
import pyarrow.parquet as pq
import pyarrow as pa

chunk_size = 100000
parquet_file = pq.ParquetFile(trip_data_file_path)
parquet_file.num_row_groups

1

In [38]:
""" `iter_batches` property iterate over parquet file data based
on chunk-size provided. """
first_row = True
for batch in parquet_file.iter_batches(batch_size=chunk_size):
    start = time()
    chunk_df = pa.Table.from_batches([batch]).to_pandas(split_blocks=True, self_destruct=True)

    if first_row:
        chunk_df.to_sql(green_table_name, engine, index=False, if_exists="replace")
        first_row = False
    else:
        chunk_df.to_sql(green_table_name, engine, index=False, if_exists="append")

    end = time()
    print(f"Chunk loaded in {end - start} seconds")


Chunk loaded in 19.99437713623047 seconds
Chunk loaded in 18.250382900238037 seconds
Chunk loaded in 18.961981296539307 seconds
Chunk loaded in 17.96063494682312 seconds
Chunk loaded in 7.536708354949951 seconds


In [39]:
pd.read_sql(f"SELECT COUNT(*) FROM {green_table_name}", engine)

Unnamed: 0,count
0,449063


### Load New York Taxi locations

In [41]:
dtypes = {
    "zone": "string",
    "LocationID": "Int64",
    "borough": "string"
}

ny_taxi_zones = pd.read_csv(taxi_zones_data_file_path, usecols=["zone", "LocationID", "borough"], dtype=dtypes)

In [42]:
ny_taxi_zones.head(20)

Unnamed: 0,zone,LocationID,borough
0,Newark Airport,1,EWR
1,Jamaica Bay,2,Queens
2,Allerton/Pelham Gardens,3,Bronx
3,Alphabet City,4,Manhattan
4,Arden Heights,5,Staten Island
5,Arrochar/Fort Wadsworth,6,Staten Island
6,Astoria,7,Queens
7,Astoria Park,8,Queens
8,Auburndale,9,Queens
9,Bloomingdale,24,Manhattan


In [43]:
ny_taxi_zones.dtypes

zone          string[python]
LocationID             Int64
borough       string[python]
dtype: object

In [44]:
ny_taxi_zones.to_sql(taxi_zones_table_name, engine, if_exists="replace", index=False)

263

In [45]:
query = f"""
SELECT * from {taxi_zones_table_name}
WHERE "borough" = 'Brooklyn'
  OR "borough" = 'Queens'
  OR "borough" = 'Manhattan'
  OR "borough" = 'Bronx'
  OR "borough" = 'Staten Island'
  OR "borough" = 'Brooklyn'
"""

pd.read_sql(query, engine)

Unnamed: 0,zone,LocationID,borough
0,Jamaica Bay,2,Queens
1,Allerton/Pelham Gardens,3,Bronx
2,Alphabet City,4,Manhattan
3,Arden Heights,5,Staten Island
4,Arrochar/Fort Wadsworth,6,Staten Island
...,...,...,...
257,Williamsburg (South Side),256,Brooklyn
258,Woodlawn/Wakefield,259,Bronx
259,Woodside,260,Queens
260,World Trade Center,261,Manhattan
