In [1]:
import pandas as pd
from sqlalchemy import create_engine

In [2]:
taxi_dtypes = {
    'VendorID': pd.Int64Dtype(),
    'passenger_count': pd.Int64Dtype(),
    'trip_distance': float,
    'RatecodeID':pd.Int64Dtype(),
    'store_and_fwd_flag':str,
    'PULocationID':pd.Int64Dtype(),
    'DOLocationID':pd.Int64Dtype(),
    'payment_type': pd.Int64Dtype(),
    'fare_amount': float,
    'extra':float,
    'mta_tax':float,
    'tip_amount':float,
    'tolls_amount':float,
    'improvement_surcharge':float,
    'total_amount':float,
    'congestion_surcharge':float
}
parse_dates = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

dfs = []
months = [10,11,12]
for month in months:
    url = f'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-{month}.csv.gz'
    dfs.append(pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates))    

df = pd.concat(dfs, axis=0, ignore_index=True)

In [5]:
import re

def _to_snake_case(camel_case_str: str)-> str:
    pre_treat = camel_case_str.replace('ID','Id').replace('PU', 'Pu').replace('DO','Do')
    snake_case = re.sub(r'(?<!^)(?=[A-Z])', '_', pre_treat).lower()
    return snake_case

rename_map = {col: _to_snake_case(col) for col in df.columns}
data = ( 
    df[(df.trip_distance != 0) & (df.passenger_count!=0)]
    .assign(
        lpep_pickup_date = lambda df: df.lpep_pickup_datetime.dt.date
    )
    .rename(columns=rename_map)
)

display(data.head())
assert "vendor_id" in data.columns, 'The column names must be in snake case'
assert (data.passenger_count==0).sum() == 0, 'The passenger_count must be different from zero'
assert (data.trip_distance==0).sum() == 0, 'The trip_distance must be different from zero'

Unnamed: 0,vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecode_id,pu_location_id,do_location_id,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,lpep_pickup_date
0,2,2020-10-01 00:31:19,2020-10-01 00:34:55,N,1,7,7,1,0.79,5.0,...,0.5,1.58,0.0,,0.3,7.88,1,1.0,0.0,2020-10-01
1,2,2020-10-01 00:42:12,2020-10-01 00:43:51,N,1,179,7,1,0.5,4.0,...,0.5,0.0,0.0,,0.3,5.3,2,1.0,0.0,2020-10-01
2,2,2020-10-01 00:53:09,2020-10-01 00:55:39,N,1,179,223,1,0.6,4.0,...,0.5,1.06,0.0,,0.3,6.36,1,1.0,0.0,2020-10-01
3,1,2020-10-01 00:12:29,2020-10-01 00:20:08,N,1,134,216,2,4.4,13.5,...,0.5,0.0,0.0,,0.3,14.8,2,1.0,0.0,2020-10-01
4,1,2020-10-01 00:32:38,2020-10-01 00:43:02,N,1,82,7,1,2.9,10.5,...,0.5,0.0,0.0,,0.3,11.8,2,1.0,0.0,2020-10-01


In [9]:
sorted(data.lpep_pickup_date.unique())

[datetime.date(2009, 1, 1),
 datetime.date(2020, 9, 30),
 datetime.date(2020, 10, 1),
 datetime.date(2020, 10, 2),
 datetime.date(2020, 10, 3),
 datetime.date(2020, 10, 4),
 datetime.date(2020, 10, 5),
 datetime.date(2020, 10, 6),
 datetime.date(2020, 10, 7),
 datetime.date(2020, 10, 8),
 datetime.date(2020, 10, 9),
 datetime.date(2020, 10, 10),
 datetime.date(2020, 10, 11),
 datetime.date(2020, 10, 12),
 datetime.date(2020, 10, 13),
 datetime.date(2020, 10, 14),
 datetime.date(2020, 10, 15),
 datetime.date(2020, 10, 16),
 datetime.date(2020, 10, 17),
 datetime.date(2020, 10, 18),
 datetime.date(2020, 10, 19),
 datetime.date(2020, 10, 20),
 datetime.date(2020, 10, 21),
 datetime.date(2020, 10, 22),
 datetime.date(2020, 10, 23),
 datetime.date(2020, 10, 24),
 datetime.date(2020, 10, 25),
 datetime.date(2020, 10, 26),
 datetime.date(2020, 10, 27),
 datetime.date(2020, 10, 28),
 datetime.date(2020, 10, 29),
 datetime.date(2020, 10, 30),
 datetime.date(2020, 10, 31),
 datetime.date(2020, 1

In [50]:
data.vendor_id.unique()

<IntegerArray>
[2, 1]
Length: 2, dtype: Int64

In [17]:
import os
from dotenv import load_dotenv
load_dotenv()
POSTGRES_DBNAME = os.environ.get("POSTGRES_DBNAME")
POSTGRES_SCHEMA = 'ny_taxi'
POSTGRES_USER = os.environ.get("POSTGRES_USER")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD")
HOST = 'localhost'
POSTGRES_PORT_HOST_MACHINE = os.environ.get("POSTGRES_PORT_HOST_MACHINE")

In [18]:
table_name = "yellow_cab_data"
engine = create_engine(f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{HOST}:{POSTGRES_PORT_HOST_MACHINE}')

In [19]:
with engine.connect() as con:

    rs = con.execute(f'SELECT * FROM {table_name}')

    for row in rs[:10]:
        print(row)

OperationalError: (psycopg2.OperationalError) FATAL:  database "ny_taxi" does not exist

(Background on this error at: https://sqlalche.me/e/14/e3q8)