In [1]:
%pip install psycopg2

Collecting psycopg2
  Using cached psycopg2-2.9.10.tar.gz (385 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: psycopg2
  Building wheel for psycopg2 (setup.py) ... [?25ldone
[?25h  Created wheel for psycopg2: filename=psycopg2-2.9.10-cp312-cp312-linux_x86_64.whl size=168407 sha256=9d65e92462440c24b8c871987f8915bab19e1a65af54dd98ef048eda466f5791
  Stored in directory: /home/horeb/.cache/pip/wheels/ac/bb/ce/afa589c50b6004d3a06fc691e71bd09c9bd5f01e5921e5329b
Successfully built psycopg2
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.10
Note: you may need to restart the kernel to use updated packages.


In [2]:
# Libraries used 
import pandas as pd
import requests 
from io import BytesIO 
from sqlalchemy import create_engine
from time import time

#check pandas version 
pd.__version__


'2.2.2'

## Import the parquet file from the site and convert it on csv to save 

In [2]:
# TO RUN ONCE !!!

# The database we need to import is on parquet form at https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"

# Dowload the file and get the csv 
response = requests.get(url)
print(response.content)
if response.status_code == 200: 
    parquet_file = BytesIO(response.content)
    df = pd.read_parquet(parquet_file, engine ="pyarrow")

    # Get the CSV 
    csv_file = "yellow_tripdata_2024-01.csv"
    df.to_csv(csv_file, index=False)
    print(f"File saved on : {csv_file}")

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



File saved on : yellow_tripdata_2024-01.csv


## Import the CSV file, get the schema and ingestion in PostgreSQL

### Try with the 100 first lines of the dataframe

In [3]:
# Import the first 100 lines of the database 
df = pd.read_csv('yellow_tripdata_2024-01.csv', nrows=100)

# When we display the schema, we note that date variables are not at the right format. 
# Let's right format them 
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

# Get the schema 
print(pd.io.sql.get_schema(df, name = 'yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


In [4]:
# Ingestion into the database
# Create engine to connect to PostgreSQL
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

# Upload DataFrame to PostgreSQL
pd.io.sql.get_schema(df, name="yellow_taxi_data", con=engine)
## This will create the schema that helps us to correctly upload the df in the PostrgreSQL database

'\nCREATE TABLE yellow_taxi_data (\n\t"VendorID" BIGINT, \n\ttpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, \n\ttpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, \n\tpassenger_count FLOAT(53), \n\ttrip_distance FLOAT(53), \n\t"RatecodeID" FLOAT(53), \n\tstore_and_fwd_flag TEXT, \n\t"PULocationID" BIGINT, \n\t"DOLocationID" BIGINT, \n\tpayment_type BIGINT, \n\tfare_amount FLOAT(53), \n\textra FLOAT(53), \n\tmta_tax FLOAT(53), \n\ttip_amount FLOAT(53), \n\ttolls_amount FLOAT(53), \n\timprovement_surcharge FLOAT(53), \n\ttotal_amount FLOAT(53), \n\tcongestion_surcharge FLOAT(53), \n\tairport_fee FLOAT(53)\n)\n\n'

### Try with chunks and iteratively 

In [5]:
### Create the iterative df
df_iter = pd.read_csv('yellow_tripdata_2024-01.csv', iterator=True, chunksize=100000)

# Get the current df and make formatting
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

# Insert in database. The use of %time is just to time the execution and get some interesting information
%time df.to_sql(name="yellow_taxi_data", con=engine, if_exists="replace")

CPU times: user 7.52 s, sys: 269 ms, total: 7.78 s
Wall time: 12.8 s


1000

### Injesting of the whole dataframe

In [6]:
# Ingest the remain 
while True:
    try:
        # Démarrer le chronomètre
        t_start = time()

        # Lire la tranche suivante
        df = next(df_iter)

        # Corriger les types de colonnes
        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

        # Ajouter les données dans la table existante
        df.to_sql(name="yellow_taxi_data", con=engine, if_exists="append")

        # Fin du chronomètre
        t_end = time()

        print('Inserted another chunk... took %.3f second(s)' % (t_end - t_start))

    except StopIteration:
        print("End of data importation.")
        break


Inserted another chunk... took 12.767 second(s)
Inserted another chunk... took 12.742 second(s)
Inserted another chunk... took 12.709 second(s)
Inserted another chunk... took 12.833 second(s)
Inserted another chunk... took 12.704 second(s)
Inserted another chunk... took 12.590 second(s)
Inserted another chunk... took 12.565 second(s)
Inserted another chunk... took 12.686 second(s)
Inserted another chunk... took 12.601 second(s)
Inserted another chunk... took 12.710 second(s)
Inserted another chunk... took 12.577 second(s)
Inserted another chunk... took 12.626 second(s)
Inserted another chunk... took 12.608 second(s)
Inserted another chunk... took 12.609 second(s)
Inserted another chunk... took 12.644 second(s)
Inserted another chunk... took 12.663 second(s)
Inserted another chunk... took 12.767 second(s)
Inserted another chunk... took 12.667 second(s)
Inserted another chunk... took 12.686 second(s)
Inserted another chunk... took 12.571 second(s)
Inserted another chunk... took 12.696 se

  df = next(df_iter)


Inserted another chunk... took 12.713 second(s)
Inserted another chunk... took 7.502 second(s)
End of data importation.


In [5]:
# Import and ingest the Taxi Zone Lookup file
df_zone = pd.read_csv('taxi_zone_lookup.csv')
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

# Ingestion
df_zone.to_sql(name='zones', con=engine, if_exists='replace')

265

In [6]:
print(df_zone.head())

   LocationID        Borough                     Zone service_zone
0           1            EWR           Newark Airport          EWR
1           2         Queens              Jamaica Bay    Boro Zone
2           3          Bronx  Allerton/Pelham Gardens    Boro Zone
3           4      Manhattan            Alphabet City  Yellow Zone
4           5  Staten Island            Arden Heights    Boro Zone
