In [1]:
import pandas as pd
from sqlalchemy import create_engine
import pyarrow.parquet as pq

In [None]:
## Fetch Taxi Data

# Dataset
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet --no-check-certificate

# Data dictionary
!wget https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf --no-check-certificate

In [29]:
# file_csv = 'green_tripdata_2019-09.csv'
file_csv = 'taxi_zone_lookup.csv'
file_pq = 'yellow_tripdata_2023-01.parquet'

In [3]:
## Parquet Reading
# parquet_file = pq.ParquetFile('yellow_tripdata_2023-01.parquet')
# 
# iter = parquet_file.iter_batches(batch_size=1)
# 
# # Use a list comprehension to get the first batch
# first_batch = next(iter)
# 
# # Convert the batch to a Pandas DataFrame
# dfp = first_batch.to_pandas()
# 
# dfp.head()

dfp = pd.read_parquet(file_pq)


In [None]:
## Parquet to CSV

# Read parquet
dfp = pd.read_parquet(file_pq)

## Process problematic columns 

# df.iloc[:,6].head()
# df.drop(df.columns[0], axis=1, inplace=True)
# df.iloc[:,6].dtype
# df.iloc[:, 6] = df.iloc[:, 6].astype(str)
# df.iloc[:, 6].astype(str).dtype
# print(df.iloc[:, 6].unique())
# condition = df[(df.iloc[:, 6] != "N") & (df.iloc[:, 6] != "Y")].index

# Drop rows that don't meet the condition
# df = df.drop(index=condition)

# Reset the index if needed
# df = df.reset_index(drop=True)
# df.iloc[:, 6].value_counts()
# dfp.iloc[:, 6].value_counts()


## Save as CSV
# Index = False to not have 1st column as index
# df.to_csv('yellow_tripdata_2023-01.csv', index=False)

In [30]:
df = pd.read_csv(file_csv, nrows=100)

In [3]:
## Cleaning

# pickup and drop off datetime should be as "TIMESTAMP", not "TEXT" in schema
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [5]:
# SQL Alchemy
# type of db://user/:password@hostname:port/db_name
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [6]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x13b290710>

In [11]:
# test connection with test query
dummy_query = """
SELECT 1 as number;
"""

# describe tables query; wont work because its psql specific
psqlQuery = """
\dt
"""

# ## \dt as generic SQL
query = """
select * from pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';
"""

pd.read_sql(query, con=engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity


In [31]:
## Show df in DDL (Data Definition Language)
table_name = 'taxi_zone_lookup'
# describes how the  data will be shown in SQL
# generic SQL statement. may or may not work with Postgres
# print(pd.io.sql.get_schema(df, name=table_name))

# definition with Postgres
print(pd.io.sql.get_schema(df, name=table_name, con=engine))


CREATE TABLE taxi_zone_lookup (
	"LocationID" BIGINT, 
	"Borough" TEXT, 
	"Zone" TEXT, 
	service_zone TEXT
)


In [32]:
# Chunk the data
df_iter = pd.read_csv(file_csv, iterator=True, chunksize=100000)

In [16]:
df = next(df_iter)

In [24]:
# Preprocess chunk
# df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
# df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

## called lpep_pickup_datetime in 2019
df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)

In [33]:
# we are chunking, so insert just the row headers first
df.head(0).to_sql(name=table_name,con=engine, if_exists='replace')

0

In [19]:
# insert a chunk, and time it
%time df.to_sql(name=table_name,con=engine, if_exists='append')

CPU times: user 8.46 s, sys: 584 ms, total: 9.05 s
Wall time: 17.7 s


1000

In [20]:
from time import time

In [34]:
## insert all chunks iteratively

while True:
    try:
        t_start = time()
        df = next(df_iter)
        
        # Preprocess chunk
        # df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        # df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        # df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
        # df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
        # 
        # insert
        df.to_sql(name=table_name,con=engine, if_exists='append')
        t_end = time()
        
        print('inserted another chunk... %.3f seconds' % (t_end-t_start))
    except StopIteration:
        print("Finished inserting all chunks.")
        break

inserted another chunk... 0.187 seconds
Finished inserting all chunks.


In [35]:
# check results with sample query
query = f"""
select * from {table_name} LIMIT 10;
"""

# query = """
# select * from yellow_taxi_data LIMIT 10;
# """

pd.read_sql(query, con=engine)

Unnamed: 0,index,LocationID,Borough,Zone,service_zone
0,0,1,EWR,Newark Airport,EWR
1,1,2,Queens,Jamaica Bay,Boro Zone
2,2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,3,4,Manhattan,Alphabet City,Yellow Zone
4,4,5,Staten Island,Arden Heights,Boro Zone
5,5,6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
6,6,7,Queens,Astoria,Boro Zone
7,7,8,Queens,Astoria Park,Boro Zone
8,8,9,Queens,Auburndale,Boro Zone
9,9,10,Queens,Baisley Park,Boro Zone
