In [1]:
import pandas as pd

In [2]:
pd.__version__

'2.2.2'

Working with the CSV file and SQL

In [3]:
# count rows parquet
df = pd.read_parquet('yellow_tripdata_2021-01.parquet')
print("number of rows: ", len(df))

number of rows:  1369769


In [4]:
# read the first 100 rows from the parquet file
df = pd.read_parquet('yellow_tripdata_2021-01.parquet', engine='pyarrow', columns=None, use_nullable_dtypes=False).head(100)

# display the first 5 rows
df.head()

  df = pd.read_parquet('yellow_tripdata_2021-01.parquet', engine='pyarrow', columns=None, use_nullable_dtypes=False).head(100)


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [5]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [2]:
from sqlalchemy import create_engine

In [3]:
engine=create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [4]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x74fca5f43bf0>

In [12]:
# specify the schema for the CSV to SQL (print statement DDL)
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [12]:
import numpy as np

# FOR CSV create an iterator to batch the file by chunksize
# df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=10000)
# df = next(df_iter)

# FOR PARQUET Read the parquet file
df = pd.read_parquet('yellow_tripdata_2021-01.parquet', engine='pyarrow', columns=None, use_nullable_dtypes=False)

# Split the dataframe into chunks
chunk_size = 100000
df_chunks = np.array_split(df, len(df) // chunk_size + 1)


  df = pd.read_parquet('yellow_tripdata_2021-01.parquet', engine='pyarrow', columns=None, use_nullable_dtypes=False)
  return bound(*args, **kwds)


In [13]:
# check the lenght of the dataframe
len(df)
len(df_chunks)

14

In [14]:
# fix the data format for these 2 itens
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [15]:
# add only the header to the database (checking purpose)
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

# root@localhost:ny_taxi> \dt
# +--------+------------------+-------+-------+
# | Schema | Name             | Type  | Owner |
# |--------+------------------+-------+-------|
# | public | yellow_taxi_data | table | root  |
# +--------+------------------+-------+-------+
# 
# root@localhost:ny_taxi> \d yellow_taxi_data



0

In [53]:
# add the chuncks of data to the table (runing with %time to check how much time it will take)
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

# root@localhost:ny_taxi> SELECT count(1) FROM yellow_taxi_data;
# +--------+
# | count  |
# |--------|
# | 100000 |
# +--------+
# SELECT 1
# Time: 0.029s


CPU times: user 285 ms, sys: 9.58 ms, total: 295 ms
Wall time: 967 ms


1000

In [16]:
from time import time

In [17]:
import numpy as np

# Process each chunk
for i, df_chunk in enumerate(df_chunks):
    # Convert datetime columns
    df_chunk.tpep_pickup_datetime = pd.to_datetime(df_chunk.tpep_pickup_datetime)
    df_chunk.tpep_dropoff_datetime = pd.to_datetime(df_chunk.tpep_dropoff_datetime)
    
    # Insert the chunk into the database
    df_chunk.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    
    # Print a message after inserting each chunk
    print(f"Inserted chunk {i+1}/{len(df_chunks)}")
    
    # Clean up memory
    del df_chunk

# Close the connection
engine.dispose()


Inserted chunk 1/14
Inserted chunk 2/14
Inserted chunk 3/14
Inserted chunk 4/14
Inserted chunk 5/14
Inserted chunk 6/14
Inserted chunk 7/14
Inserted chunk 8/14
Inserted chunk 9/14
Inserted chunk 10/14
Inserted chunk 11/14
Inserted chunk 12/14
Inserted chunk 13/14
Inserted chunk 14/14


In [29]:
query = """
SELECT count(1) FROM yellow_taxi_data;"""

pd.read_sql(query, con=engine)

Unnamed: 0,count
0,1359765


In [5]:
query = """
SELECT * FROM yellow_taxi_data LIMIT 10;"""

pd.read_sql(query, con=engine)

Unnamed: 0,index,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
5,5,1,2021-01-01 00:16:29,2021-01-01 00:24:30,1.0,1.6,1.0,N,224,68,1,8.0,3.0,0.5,2.35,0.0,0.3,14.15,2.5,
6,6,1,2021-01-01 00:00:28,2021-01-01 00:17:28,1.0,4.1,1.0,N,95,157,2,16.0,0.5,0.5,0.0,0.0,0.3,17.3,0.0,
7,7,1,2021-01-01 00:12:29,2021-01-01 00:30:34,1.0,5.7,1.0,N,90,40,2,18.0,3.0,0.5,0.0,0.0,0.3,21.8,2.5,
8,8,1,2021-01-01 00:39:16,2021-01-01 01:00:13,1.0,9.1,1.0,N,97,129,4,27.5,0.5,0.5,0.0,0.0,0.3,28.8,0.0,
9,9,1,2021-01-01 00:26:12,2021-01-01 00:39:46,2.0,2.7,1.0,N,263,142,1,12.0,3.0,0.5,3.15,0.0,0.3,18.95,2.5,
