In [1]:
import pandas as pd
import requests
from pathlib import Path
from sqlalchemy import create_engine
import psycopg2

In [4]:
# download the green taxi data
GREEN_TAXI_CSV_DATA_URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-09.csv.gz"
GREEN_TAXI_DATA_GZ_FILE_PATH = Path("ny_taxi_pg_data/data/ny-taxi-green.gz")
GREEN_TAXI_CSV_FILE_PATH = Path("ny_taxi_pg_data/data/ny-taxi-green.csv")

try:
    download_response = requests.get(GREEN_TAXI_CSV_DATA_URL)

    if download_response.ok:
        with open(GREEN_TAXI_DATA_GZ_FILE_PATH, "wb") as gzip_file:
            gzip_file.write(download_response.content)

            if GREEN_TAXI_DATA_GZ_FILE_PATH.exists():
                print(f"{GREEN_TAXI_DATA_GZ_FILE_PATH.name} has been downloaded!")
            else:
                raise FileNotFoundError(
                    f"ERROR: The file {GREEN_TAXI_DATA_GZ_FILE_PATH.name} is not found. Kindly check your code!"
                )
except Exception as e:
    print(e)

ny-taxi-green.gz has been downloaded!


In [5]:
# download the zone data
TAXI_ZONE_LOOKUP_DATA_URL = "https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv"
TAXI_ZONE_LOOKUP_CSV_FILE_PATH = Path("ny_taxi_pg_data/data/taxi-zone-lookup.csv")

try:
    download_response = requests.get(TAXI_ZONE_LOOKUP_DATA_URL)

    if download_response.ok:
        with open(TAXI_ZONE_LOOKUP_CSV_FILE_PATH, "wb") as csv_file:
            csv_file.write(download_response.content)

            if TAXI_ZONE_LOOKUP_CSV_FILE_PATH.exists():
                print(f"{TAXI_ZONE_LOOKUP_CSV_FILE_PATH.name} has been downloaded!")
            else:
                raise FileNotFoundError(
                    f"ERROR: The file {TAXI_ZONE_LOOKUP_CSV_FILE_PATH.name} is not found. Kindly check your code!"
                )
except Exception as e:
    print(e)

taxi-zone-lookup.csv has been downloaded!


In [9]:
# read the green taxi data csv via pandas by directly reading from the gzip file
green_taxi_df = pd.read_csv(GREEN_TAXI_DATA_GZ_FILE_PATH, compression="gzip")

# convert the dates to datetime
green_taxi_df.lpep_pickup_datetime = pd.to_datetime(green_taxi_df.lpep_pickup_datetime)
green_taxi_df.lpep_dropoff_datetime = pd.to_datetime(
    green_taxi_df.lpep_dropoff_datetime
)

# green_taxi_df["VendorID"] = green_taxi_df["VendorID"].fillna(-1)

# convert VendorId column to integer
# green_taxi_df["VendorID"] = green_taxi_df["VendorID"].astype(int)

# # make column names lowercase and add _ to compound names
# green_taxi_df.rename(
#     columns={
#         "VendorID": "vendor_id",
#         "RatecodeID": "rate_code_id",
#         "PULocationID": "pu_lid",
#         "DOLocationID": "do_lid",
#     },
#     inplace=True,
# )

# print(green_taxi_df.dtypes)

print(green_taxi_df.isnull())

# print(green_taxi_df.head())

# save to csv file
# green_taxi_df.to_csv(GREEN_TAXI_CSV_FILE_PATH)

  green_taxi_df = pd.read_csv(GREEN_TAXI_DATA_GZ_FILE_PATH, compression="gzip")


        VendorID  lpep_pickup_datetime  lpep_dropoff_datetime  \
0          False                 False                  False   
1          False                 False                  False   
2          False                 False                  False   
3          False                 False                  False   
4          False                 False                  False   
...          ...                   ...                    ...   
449058      True                 False                  False   
449059      True                 False                  False   
449060      True                 False                  False   
449061      True                 False                  False   
449062      True                 False                  False   

        store_and_fwd_flag  RatecodeID  PULocationID  DOLocationID  \
0                    False       False         False         False   
1                    False       False         False         False   
2        

In [42]:
# save the csv files to postgres via pandas
# only the taxi zone lookup csv is copied by the following code since saving the green taxi data via pandas takes a considerable amount of time
CONN_STRING = "postgresql://root:root@localhost:5432/ny_taxi" # DONT DO THIS IN YOUR PROJECTS!!! Use env files or other safer methods for authentication
engine = create_engine(CONN_STRING)

tzlookup_df = pd.read_csv(TAXI_ZONE_LOOKUP_CSV_FILE_PATH)
tzlookup_df.rename(str.lower, axis="columns", inplace=True)
tzlookup_df.rename(columns={"locationid": "location_id"}, inplace=True)
print(tzlookup_df.head())

%time tzlookup_df.to_sql(con=CONN_STRING, name="tzlookup", if_exists="replace")
# %time green_taxi_df.head(n=0).to_sql(con=CONN_STRING, name='green_taxi_data', if_exists="replace")

   location_id        borough                     zone service_zone
0            1            EWR           Newark Airport          EWR
1            2         Queens              Jamaica Bay    Boro Zone
2            3          Bronx  Allerton/Pelham Gardens    Boro Zone
3            4      Manhattan            Alphabet City  Yellow Zone
4            5  Staten Island            Arden Heights    Boro Zone
CPU times: user 40.2 ms, sys: 16.7 ms, total: 56.9 ms
Wall time: 239 ms
CPU times: user 37.3 ms, sys: 3.9 ms, total: 41.2 ms
Wall time: 163 ms


0

In [43]:
# save data to postgres via COPY. Much faster than saving via pandas assuming the csv file has been prepared already
# only the green taxi data is copied by the following code

COPY_CSV_DML = '''
COPY green_taxi_data
FROM 'data/ny-taxi-green.csv'
DELIMITER ',' CSV HEADER;
'''

# create connection via psycopg2
pg_conn = psycopg2.connect(CONN_STRING)
cur = pg_conn.cursor()
cur.execute('TRUNCATE TABLE green_taxi_data')
%time cur.execute(COPY_CSV_DML)
pg_conn.commit()
cur.close()


CPU times: user 329 µs, sys: 1.8 ms, total: 2.13 ms
Wall time: 5.05 s
