In [10]:
import pandas as pd
import pyarrow.parquet as pq

from sqlalchemy import create_engine
from time import time

In [11]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7fbc111f55e0>

In [12]:
# test query
query = """
SELECT 1 as number;
"""

pd.read_sql(query, con=engine)

Unnamed: 0,number
0,1


In [69]:
#query the information schema to determine if any tables exist in our database
query = """
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND 
    schemaname != 'information_schema';
"""

pd.read_sql(query, con=engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,yellow_taxi_data,root,,True,False,False,False
1,public,zones,root,,True,False,False,False


In [14]:
query = """select count(*) from yellow_taxi_data;"""

pd.read_sql(query, con=engine)

Unnamed: 0,count
0,2463931


In [22]:
#How many taxi trips were there on January 15, 2022?
query = """select count(*) from yellow_taxi_data
            where tpep_pickup_datetime between '2022-01-01 00:00:00'
            and '2022-01-01 23:59:59' """

pd.read_sql(query, con=engine)

Unnamed: 0,count
0,63441


In [37]:
#Find the largest tip each day
query = """select date(tpep_pickup_datetime) as date
            , max(tip_amount) as max_tip
             from yellow_taxi_data
             where tpep_pickup_datetime <= '2022-01-31 23:59:59'
             group by date
             order by max_tip desc"""

pd.read_sql(query, con=engine).head()

Unnamed: 0,date,max_tip
0,2022-01-29,888.88
1,2022-01-15,303.0
2,2022-01-16,301.0
3,2022-01-23,250.0
4,2022-01-27,245.83


In [101]:
#Most popular destinations from Central Park on January 14?
query = """select 
            z."Zone"
            , count(*) as count
            from yellow_taxi_data as ytd
            join zones as z
                on ytd."DOLocationID"= z."LocationID"
            where tpep_pickup_datetime between '2022-01-14 0:00:00'
            and '2022-01-14 23:59:59'
            and ytd."PULocationID" = 43 
            group by z."Zone" 
            order by count desc"""

# query = """select * from yellow_taxi_data limit 5;"""

# query = """select * from zones where "Zone" = 'Central Park' """  #43

pd.read_sql(query, con=engine)

Unnamed: 0,Zone,count
0,Upper East Side South,108
1,Upper East Side North,100
2,Midtown Center,99
3,Lincoln Square East,94
4,Central Park,88
...,...,...
81,Woodside,1
82,,1
83,Brooklyn Heights,1
84,Borough Park,1


In [128]:
query = """select concat(coalesce(puzones."Zone", 'Unknown')
                ,'/'
                , coalesce(dozones."Zone", 'Unknown')) as pickup_dropoff
            , avg(total_amount) as avg_price_ride
            from yellow_taxi_data as taxi
            
            left join zones as puzones
                on taxi."PULocationID" = puzones."LocationID"
                
            left join zones as dozones
                on taxi."DOLocationID" = dozones."LocationID" 
                
            group by 1"""
pd.read_sql(query, con=engine)

Unnamed: 0,pickup_dropoff,avg_price_ride
0,Allerton/Pelham Gardens/Allerton/Pelham Gardens,55.985000
1,Allerton/Pelham Gardens/Alphabet City,49.000000
2,Allerton/Pelham Gardens/Co-Op City,14.650000
3,Allerton/Pelham Gardens/Eastchester,18.000000
4,Allerton/Pelham Gardens/East Harlem South,39.780000
...,...,...
21385,Yorkville West/Woodlawn/Wakefield,40.516667
21386,Yorkville West/Woodside,24.932931
21387,Yorkville West/World Trade Center,33.361667
21388,Yorkville West/Yorkville East,10.031180


In [None]:
"""since the above query returned 0 records, 
we need to download the parquet file from the NYC.gov
webpage. 
https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page """

import pyarrow.parquet as pq
trips = pq.read_table(r'/Users/martinpalkovic/Documents/DE_ZoomCamp/Week1_Setting_up_Docker_PostgreSQL_GCP/yellow_tripdata_2022-01.parquet')
df = trips.to_pandas()
# df = df.sample(n = 1000)

df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

df.to_csv('yellow_tripdata_2022-01.csv')

In [None]:
#print SQL DDL for the dataframe
# print(pd.io.sql.get_schema(df, 'yellow_taxi_data'))

In [None]:
df_iter = pd.read_csv('yellow_tripdata_2022-01.csv', iterator=True, chunksize=100000)
df = next(df_iter)
len(df)

In [None]:
while True:
    t_start = time()
    df = next(df_iter)

    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

    df.to_sql(name = 'yellow_taxi_data', con = engine, if_exists = 'append')
    
    t_end = time()

    print('inserted another chunk, took %.3f seconds' % (t_end - t_start))

In [None]:
#only run this one time, it imports the SQL data to Postgres
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='replace', chunksize=100000)

In [None]:
query = """
select count(*) from yellow_taxi_data;
"""

pd.read_sql(query, con=engine)

In [None]:
query = """
select *
from pg_catalog.pg_tables
where schemaname != 'pg_catalog' AND 
    schemaname != 'information_schema';
"""

pd.read_sql(query, con=engine)

In [None]:
# query = """drop table yellow_taxi_data"""
# # pd.read_sql(query, con=engine)
# engine.execute(query)

In [6]:
df = pd.read_csv('taxi+_zone_lookup.csv')
df.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [7]:
%time df.to_sql(name='zones', con=engine, if_exists='replace', chunksize=100000)

CPU times: user 10 ms, sys: 1.88 ms, total: 11.9 ms
Wall time: 162 ms


265