# Ingesting NYC taxi data into PostgreSQL database

## Load packages and NYC taxi data from January 2021.

Load packages.

In [1]:
from pathlib import Path

import pandas as pd
import sqlalchemy as sa

Load NYC taxi data from January 2021.

In [2]:
DATA_PATH = Path(
    "/home/fmerino/Documents/data-engineering-zoomcamp-2024/01-docker-terraform/02-docker-sql/data-nyc-taxi"
)

POSTGRES_PATH = Path(
    "/home/fmerino/Documents/data-engineering-zoomcamp-2024/01-docker-terraform/02-docker-sql"
)

CERTS_PATH = Path(
    "/home/fmerino/Documents/data-engineering-zoomcamp-2024/"
)

In [3]:
nyc_taxi = pd.read_parquet(DATA_PATH/"yellow_tripdata_2021-01_prepared.parquet")

nyc_taxi

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,dt,trip_distance,avg_speed,PULocationID,DOLocationID,RatecodeID,passenger_count,total_amount,fare_amount,tip_amount,tolls_amount,extra,mta_tax,improvement_surcharge,congestion_surcharge,airport_fee,payment_type,VendorID
0,2021-01-01 00:30:10,2021-01-01 00:36:12,0 days 00:06:02,2.10,20.883978,142,43,1,1,11.80,8.0,0.00,0.00,3.0,0.5,0.3,2.5,,2,1
1,2021-01-01 00:43:30,2021-01-01 01:11:06,0 days 00:27:36,14.70,31.956522,132,165,1,1,51.95,42.0,8.65,0.00,0.5,0.5,0.3,0.0,,1,1
2,2021-01-01 00:31:49,2021-01-01 00:48:21,0 days 00:16:32,4.94,17.927419,68,33,1,1,24.36,16.5,4.06,0.00,0.5,0.5,0.3,2.5,,1,2
3,2021-01-01 00:16:29,2021-01-01 00:24:30,0 days 00:08:01,1.60,11.975052,224,68,1,1,14.15,8.0,2.35,0.00,3.0,0.5,0.3,2.5,,1,1
4,2021-01-01 00:00:28,2021-01-01 00:17:28,0 days 00:17:00,4.10,14.470588,95,157,1,1,17.30,16.0,0.00,0.00,0.5,0.5,0.3,0.0,,2,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1192563,2021-01-31 23:39:26,2021-01-31 23:54:36,0 days 00:15:10,6.85,27.098901,137,42,1,3,24.30,20.5,0.00,0.00,0.5,0.5,0.3,2.5,,2,2
1192564,2021-01-31 23:07:54,2021-01-31 23:19:42,0 days 00:11:48,3.81,19.372881,113,141,1,1,16.30,12.5,0.00,0.00,0.5,0.5,0.3,2.5,,2,2
1192565,2021-01-31 23:30:45,2021-01-31 23:35:13,0 days 00:04:28,1.32,17.731343,233,237,1,1,9.80,6.0,0.00,0.00,0.5,0.5,0.3,2.5,,2,2
1192566,2021-01-31 23:09:52,2021-01-31 23:51:56,0 days 00:42:04,10.56,15.061807,56,68,1,2,44.92,37.5,0.00,6.12,0.5,0.5,0.3,0.0,,1,2


## Define PostgreSQL schema for the table storing NYC taxi data

Define the SQLAlchemy engine to enable communications between a client and our PostgreSQL server.

In [4]:
username = "fmerinocasallo_writer"
passwd = open(POSTGRES_PATH/"pg-server/passwds/pg-fmerinocasallo_writer-passwd.txt").readline().rstrip()

hostname = "172.19.0.70"
port = [
    line.split(" ")[2].rstrip()
    for line in open(POSTGRES_PATH/"pg-server/conf/postgresql.conf").readlines()
    if line.startswith("port")
][-1]

database = "de_zoomcamp"
schema = "nyc_taxi"

url = f"postgresql://{username}:{passwd}@{hostname}/{database}"
connect_args = {
    "port": port,
    "sslmode": "verify-full",
    "sslrootcert": POSTGRES_PATH/"pg-server/certs/ca/server/server-ca.crt",
    "sslcert": POSTGRES_PATH/"host/certs/client/writer/fmerinocasallo_writer.crt",
    "sslkey": POSTGRES_PATH/"host/certs/client/writer/fmerinocasallo_writer.key",
}

engine = sa.create_engine(url=url, connect_args=connect_args, echo=True)

Check Panda's suggested SQL statement to create a new table that will store the processed data.

In [5]:
print(pd.io.sql.get_schema(nyc_taxi, "nyc_taxi"))

CREATE TABLE "nyc_taxi" (
"tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "dt" INTEGER,
  "trip_distance" REAL,
  "avg_speed" REAL,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "RatecodeID" INTEGER,
  "passenger_count" INTEGER,
  "total_amount" REAL,
  "fare_amount" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "improvement_surcharge" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL,
  "payment_type" INTEGER,
  "VendorID" INTEGER
)


  print(pd.io.sql.get_schema(nyc_taxi, "nyc_taxi"))


Define the schema for a new PostgreSQL table storing NYC taxi data associated with trips during January 2021. According
to [PostgreSQL's official documentation](https://www.postgresql.org/docs/current/datatype-numeric.html):

1. On all currently supported platforms, the `REAL` type has a range of around 1E-37 to 1E+37 with a
precision of at least 6 decimal digits. The `DOUBLE PRECISION` type has a range of around 1E-307 to 1E+308 with a
precision of at least 15 digits.
2. The type `INTEGER` is the common choice, as it offers the best balance between range, storage size, and performance.
The `SMALLINT` type is generally only used if disk space is at a premium. The `BIGINT` type is designed to be used when
the range of the `INTEGER` type is insufficient.

For our specific use case, we assume `REAL` and `INTEGER` to be the most suitable data types for all the numerical
columns/attributes except `dt`. For the `dt` column/attribute, which originally stored the duration of each trip as
'timedelta' values in our PARQUET file and now will store this information as integer values (ns frequency) in the
database, we opt for `BIGINT`. Note that 15 minutes equals 9E+11 ns. Using these inexact data types instead of
`NUMERIC`/`DECIMAL` will offer noticeable performance gains at the expense of negligible precision losses, as monetary
amounts in this sector are stored with only 2 decimal digits at most.

Define the PostgreSQL schema for the table storing the NYC taxi data associated with the trips from January 2021.

In [6]:
table_name = "yellow_taxi_trips" 

schema_name = "nyc_taxi"
schema_dtypes = {
    "tpep_pickup_datetime": sa.types.TIMESTAMP,
    "tpep_dropoff_datetime": sa.types.TIMESTAMP,
    "dt": sa.types.BIGINT,
	"trip_distance": sa.types.REAL,
	"avg_speed": sa.types.REAL,
	"PULocationID": sa.types.INTEGER,
	"DOLocationID": sa.types.INTEGER,
	"RatecodeID": sa.types.INTEGER,
	"passenger_count": sa.types.INTEGER,
	"total_amount": sa.types.REAL,
	"fare_amount": sa.types.REAL,
	"tip_amount": sa.types.REAL,
	"tolls_amount": sa.types.REAL,
	"extra": sa.types.REAL,
	"mta_tax": sa.types.REAL,
	"improvement_surcharge": sa.types.REAL,
	"congestion_surcharge": sa.types.REAL,
	"airport_fee": sa.types.REAL,
	"payment_type": sa.types.INTEGER,
	"VendorID": sa.types.INTEGER,
}

## Ingesting NYC taxi data from January 2021 to our PostgreSQL database

Create a new table `yellow_taxi_trips`.

In [7]:
nyc_taxi.head(n=0).to_sql(name=table_name, con=engine, schema=schema_name, if_exists="replace", index=False, dtype=schema_dtypes)

2024-09-02 16:59:29,001 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2024-09-02 16:59:29,002 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-09-02 16:59:29,004 INFO sqlalchemy.engine.Engine select current_schema()
2024-09-02 16:59:29,005 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-09-02 16:59:29,006 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2024-09-02 16:59:29,006 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-09-02 16:59:29,007 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-09-02 16:59:29,011 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_namespace.nspname = %(nspname_1)s
2024-09-02 16:59:29,012 INFO sqlalchemy.engine.Engine [g

0

Ingesting NYC taxi data with January 2021 trips into the newly created PostgreSQL table `yellow_taxi_trips`.

In [8]:
nyc_taxi.to_sql(name=table_name, con=engine, schema=schema_name, if_exists="append", index=False, dtype=schema_dtypes)

2024-09-02 16:59:29,033 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-09-02 16:59:29,036 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_namespace.nspname = %(nspname_1)s
2024-09-02 16:59:29,037 INFO sqlalchemy.engine.Engine [cached since 0.02599s ago] {'table_name': 'yellow_taxi_trips', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'nyc_taxi'}
2024-09-02 16:59:42,878 INFO sqlalchemy.engine.Engine INSERT INTO nyc_taxi.yellow_taxi_trips (tpep_pickup_datetime, tpep_dropoff_datetime, dt, trip_distance, avg_speed, "PULocationID", "DOLocationID", "RatecodeID", passenger_count, total_amount, fare_amount, tip_amount, tolls_amou

568

Grant SELECT permissions (ro) to the `reader` role for the newly created `nyc_taxi.yellow_taxi_trips`. Otherwise, `reader`s won't be able to access it.

In [9]:
query = f"GRANT SELECT ON TABLE {schema_name}.{table_name} TO reader"
with engine.connect() as conn:
    conn.execute(sa.text(query))
    conn.commit()

2024-09-02 17:01:31,126 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-09-02 17:01:31,128 INFO sqlalchemy.engine.Engine GRANT SELECT ON TABLE nyc_taxi.yellow_taxi_trips TO reader
2024-09-02 17:01:31,128 INFO sqlalchemy.engine.Engine [generated in 0.00283s] {}
2024-09-02 17:01:31,130 INFO sqlalchemy.engine.Engine COMMIT
