In [1]:
import pandas as pd
from sqlalchemy import create_engine

In [2]:
# Read a sample of the data
prefix = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/'

In [3]:
url = 'yellow_tripdata_2021-01.csv.gz'

In [4]:
dtype = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "RatecodeID": "Int64",
    "store_and_fwd_flag": "string",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64",
}

parse_dates = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
]

In [5]:
df = pd.read_csv(
    prefix + url,
    nrows=100,
    dtype=dtype,
    parse_dates=parse_dates,
)
    

In [6]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [7]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [8]:
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [18]:
df_iter = pd.read_csv(
    prefix + url,
    dtype=dtype,
    parse_dates=parse_dates,
    iterator=True,
    chunksize=100000,
)

In [22]:
from time import time
from tqdm.auto import tqdm as fignushka

first_chunk = next(df_iter)

first_chunk.head(0).to_sql(
    name='yellow_taxi_data',
    con=engine,
    if_exists="replace",
)

print("Table created")

first_chunk.to_sql(
    name="yellow_taxi_data",
    con=engine,
    if_exists="append",
)
print("Inserted first chunk:", len(first_chunk))

for i, df_chunk in enumerate(fignushka(df_iter), start=2):

    t_start = time()
    
    df_chunk.to_sql(
        name="yellow_taxi_data",
        con=engine,
        if_exists="append"
    )

    t_end = time()
    
    print(f'Chunk {i} inserted ({len(df_chunk)} rows), took {t_end - t_start:.3f} seconds')


[2K[2mResolved [1m120 packages[0m [2min 2.94s[0m[0m                                       [0m
[2K[2mPrepared [1m1 package[0m [2min 55ms[0m[0m                                               
         If the cache and target directories are on different filesystems, hardlinking may not be supported.
[2K[2mInstalled [1m1 package[0m [2min 14ms[0m[0m                                 [0m
 [32m+[39m [1mtqdm[0m[2m==4.67.1[0m
Table created
Inserted first chunk: 100000


0it [00:00, ?it/s]

Chunk 1 inserted (100000 rows), took 8.294 seconds
Chunk 2 inserted (100000 rows), took 8.337 seconds
Chunk 3 inserted (100000 rows), took 8.603 seconds
Chunk 4 inserted (100000 rows), took 9.059 seconds
Chunk 5 inserted (100000 rows), took 8.231 seconds
Chunk 6 inserted (100000 rows), took 8.060 seconds
Chunk 7 inserted (100000 rows), took 8.960 seconds
Chunk 8 inserted (100000 rows), took 8.430 seconds
Chunk 9 inserted (100000 rows), took 9.199 seconds
Chunk 10 inserted (100000 rows), took 8.905 seconds
Chunk 11 inserted (100000 rows), took 8.362 seconds
Chunk 12 inserted (100000 rows), took 8.224 seconds
Chunk 13 inserted (69765 rows), took 5.214 seconds
