In [32]:
import pandas as pd

retrieve a sample of the data

In [33]:
prefix = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/'
yellow_data = 'yellow_tripdata_2021-01.csv.gz'
url = prefix + yellow_data

print(f"Downloading from {url}")
try:
    df = pd.read_parquet('yellow_tripdata_2021-01.parquet')
    print("Data loaded successfully!")
except Exception as e:
    print(f"Error loading the data\nError code: {e}")

Downloading from https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
Data loaded successfully!


In [34]:
# Convert the parquet to csv
df.to_csv('yellow_tripdata_2021-01.csv', index=False)
print("data successfully converted to csv")

data successfully converted to csv


In [None]:
print(type(df))

In [None]:
df.head()

In [None]:
df.shape

In [None]:
df.dtypes

In [None]:
parse_dates = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]
dtype = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "RatecodeID": "Int64",
    "store_and_fwd_flag": "string",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64",
    "airport_fee" : "float64"
}

In [None]:
df = pd.read_csv('yellow_tripdata_2021-01.csv', dtype = dtype, parse_dates = parse_dates)
print("Data successfully loaded to csv with correct data types!")

In [None]:
from sqlalchemy import create_engine
# Add "+psycopg" to the dialect prefix
engine = create_engine('postgresql+psycopg://root:root@localhost:5432/ny_taxi')

Check the Data Definition Statement (DDL)

In [None]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))

Create the table

In [None]:
df.head(0).to_sql(name='yellow_taxi_table', con=engine, if_exists='replace')

Seperate the dataframe into chunks

In [None]:
df = pd.read_csv('yellow_tripdata_2021-01.csv', dtype = dtype, parse_dates = parse_dates, chunksize = 100000, iterator = True)

Ingest the data into the postgresql db in docker

In [None]:
# import tqdm for monitoring progress
from tqdm import tqdm

# create the table then insert data into it
first = True
for df_chunk in tqdm(df):
    if first:
        df_chunk.head(0).to_sql(name='yellow_taxi_table', con=engine, if_exists='replace')
        print("Table created")
        first = False
    rows = int(len(df_chunk))
    print(f"Next batch:{rows}rows")
    df_chunk.to_sql(name='yellow_taxi_table', con=engine, if_exists='append')
    print(f"{rows}rows inserted")