In [1]:
import pandas as pd

In [21]:
import time
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import logging

In [10]:
file_path = 'dataset/yellow_tripdata/yellow_tripdata_2021-01.csv'
batch_size = 10000

In [11]:
df_iter = pd.read_csv('dataset/yellow_tripdata/yellow_tripdata_2021-01.csv', chunksize=batch_size)

In [12]:
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x1b01420a8a0>

In [13]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [14]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [7]:
print(pd.io.sql.get_schema(test_dt,name="yellow_taxi_data", con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TEXT, 
	tpep_dropoff_datetime TEXT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [27]:
string_columns = ["VendorID","RatecodeID","DOLocationID","payment_type"]

# Function to detect and convert datetime columns
def convert_datatype_columns(df):
    for col in df.columns:
        if df[col].dtype == 'object':  # Check if the column is of object type (likely strings)
            try:
                df[col] = pd.to_datetime(df[col])
                logging.info(f"Converted column '{col}' to datetime.")
            except (ValueError, TypeError):
                logging.info(f"Column '{col}' is not a datetime column.")
        if col in string_columns:
            df[col] = df[col].astype('object')
    return df


In [28]:
# Function to process and insert a chunk
def process_and_insert_chunk(df):
    try:
        df = convert_datatype_columns(df)  # Convert datetime columns dynamically
        df.to_sql(name="yellow_taxi_data", con=engine, if_exists="append", index=False)
        return True
    except SQLAlchemyError as e:
        logging.error(f"Error inserting chunk: {e}")
        return False



In [29]:
# Main ingestion loop
total_chunks = 0
successful_chunks = 0

for df in df_iter:
    total_chunks += 1
    t_start = time.time()
    
    if process_and_insert_chunk(df):
        successful_chunks += 1
        t_end = time.time()
        logging.info(f"Inserted chunk {total_chunks}, took {t_end - t_start:.3f} seconds")
    else:
        logging.warning(f"Failed to insert chunk {total_chunks}")

logging.info(f"Ingestion completed. Total chunks: {total_chunks}, Successful chunks: {successful_chunks}")

print("done")

2025-02-01 18:52:58,707 - INFO - Converted column 'tpep_pickup_datetime' to datetime.
2025-02-01 18:52:58,712 - INFO - Converted column 'tpep_dropoff_datetime' to datetime.
  df[col] = pd.to_datetime(df[col])
2025-02-01 18:52:58,717 - INFO - Column 'store_and_fwd_flag' is not a datetime column.
2025-02-01 18:53:00,315 - INFO - Inserted chunk 1, took 1.614 seconds
2025-02-01 18:53:00,347 - INFO - Converted column 'tpep_pickup_datetime' to datetime.
2025-02-01 18:53:00,352 - INFO - Converted column 'tpep_dropoff_datetime' to datetime.
  df[col] = pd.to_datetime(df[col])
2025-02-01 18:53:00,355 - INFO - Column 'store_and_fwd_flag' is not a datetime column.
2025-02-01 18:53:01,717 - INFO - Inserted chunk 2, took 1.378 seconds
2025-02-01 18:53:01,748 - INFO - Converted column 'tpep_pickup_datetime' to datetime.
2025-02-01 18:53:01,753 - INFO - Converted column 'tpep_dropoff_datetime' to datetime.
  df[col] = pd.to_datetime(df[col])
2025-02-01 18:53:01,755 - INFO - Column 'store_and_fwd_flag

done
