In [2]:
import pandas as pd

from sqlalchemy import create_engine

Creating connection to database:

In [3]:
engine = create_engine("postgresql://root:root@localhost:5432/ny_taxi")
conn = engine.connect()
conn

<sqlalchemy.engine.base.Connection at 0x7f6980171dc0>

# Exploration

Reading dataset with Pandas using URL:

In [3]:
url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"

df = pd.read_csv(url, nrows=100)
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


Casting text columns to datetime columns:

In [4]:
datetime_cols = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]

for col in datetime_cols:
    df[col] = pd.to_datetime(df[col], format="%Y-%m-%d %H:%M:%S")

df[datetime_cols].head()

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime
0,2021-01-01 00:30:10,2021-01-01 00:36:12
1,2021-01-01 00:51:20,2021-01-01 00:52:19
2,2021-01-01 00:43:30,2021-01-01 01:11:06
3,2021-01-01 00:15:48,2021-01-01 00:31:01
4,2021-01-01 00:31:49,2021-01-01 00:48:21


Getting DDL from Pandas DataFrame:

In [6]:
# Using default dialect
print(pd.io.sql.get_schema(df, name="yellow_taxi_data"))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [7]:
# Using PostgreSQL dialect
print(pd.io.sql.get_schema(df, name="yellow_taxi_data", con=conn))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




# Ingestion

Downloading dataset to local filesystem:

In [3]:
!curl -Lso dataset.csv.gz https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz

Using DataFrame iterator as an example to avoid demanding too many resources for greater-size files:

In [4]:
df_iter = pd.read_csv("dataset.csv.gz", iterator=True, chunksize=100_000)
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x7f697a514280>

In [5]:
def preprocess_in_place(df):
    datetime_cols = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]
    for col in datetime_cols:
        df[col] = pd.to_datetime(df[col], format="%Y-%m-%d %H:%M:%S")


In [6]:
table_name = "yellow_taxi_data_from_notebook"

# inserting first chunk by replacing table if existing
df = next(df_iter)
preprocess_in_place(df)
rows_count = len(df)
df.to_sql(name=table_name, con=engine, if_exists="replace")

# inserting resting chunks by appending data
for df in df_iter:
    preprocess_in_place(df)
    rows_count += len(df)

    df.to_sql(name=table_name, con=engine, if_exists="append")

print(f"Total number of ingested records: {rows_count}")

  for df in df_iter:


Total number of ingested records: 1369765


Note that you can truncate table if required:
```python
df.head(n=0).to_sql(name=table_name, con=engine, if_exists="replace")
```