In [1]:

import pandas as pd
from sqlalchemy import create_engine, text
import os

# --- Configuration ---
CLEAN_DIR = "../data_clean"
DATA_FILE = "cleaned_nyc_trips_q1.csv"
TABLE_NAME = "taxi_trips"

# PostgreSQL Connection Details (Customize these!)
DB_USER = "postgres"  # Your PostgreSQL username
DB_PASS = "etes1209111" # !! Replace with your actual password !!
# DB_HOST = "localhost"
DB_HOST = "127.0.0.1" 
DB_PORT = "5432"
DB_NAME = "nyc_taxi"

# SQLAlchemy Connection String
DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

# Create the SQLAlchemy engine
try:
    engine = create_engine(DATABASE_URL)
    print("✅ Successfully created database engine.")
except ImportError:
    print("❌ ERROR: Ensure psycopg2-binary is installed: pip install psycopg2-binary")
    exit()

✅ Successfully created database engine.


In [2]:
# --- Define Schema for taxi_trips Table ---
# We define this as a transaction in Python to execute it cleanly.

# Note: We use VARCHAR for Location IDs and payment types initially
# to ensure compatibility, though INTEGER is also possible.
# Double precision is used for floats for accuracy.

create_table_sql = f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (
    tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE,
    tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE,
    passenger_count INTEGER,
    trip_distance DOUBLE PRECISION,
    pulocationid VARCHAR(10),     -- All lowercase
    dolocationid VARCHAR(10),     -- All lowercase
    payment_type INTEGER,
    fare_amount DOUBLE PRECISION,
    tip_amount DOUBLE PRECISION,
    total_amount DOUBLE PRECISION,
    congestion_surcharge DOUBLE PRECISION,
    airport_fee DOUBLE PRECISION, -- The one remaining airport_fee column
    pickup_date DATE,
    pickup_hour INTEGER,
    pickup_dayofweek INTEGER,
    trip_duration DOUBLE PRECISION,
    max_temp DOUBLE PRECISION,
    min_temp DOUBLE PRECISION,
    precipitation DOUBLE PRECISION,
    rain_day INTEGER              -- All lowercase
);
"""
# ➡️ Rerun the block that executes this SQL to drop and recreate the table.

print(f"Creating table '{TABLE_NAME}' in PostgreSQL...")
with engine.connect() as connection:
    connection.execute(text(create_table_sql))
    connection.commit()
print(f"✅ Table '{TABLE_NAME}' created successfully.")

Creating table 'taxi_trips' in PostgreSQL...
✅ Table 'taxi_trips' created successfully.


In [3]:
# --- Load Data from CSV IN CHUNKS ---
file_path = os.path.join(CLEAN_DIR, DATA_FILE)
if not os.path.exists(file_path):
    print(f"❌ ERROR: Clean data file not found at: {file_path}")
    exit()

print(f"Starting chunked data load from {DATA_FILE} to PostgreSQL.")

# Define chunk size (adjust this based on your available memory)
# 500,000 rows is a good starting point for chunking
CHUNK_SIZE = 500000 
chunk_num = 0

try:
    # Use pandas.read_csv with 'iterator=True' or 'chunksize'
    for chunk_df in pd.read_csv(file_path, chunksize=CHUNK_SIZE, low_memory=False):
        
        print(f"Processing chunk #{chunk_num + 1}...")

        # 1. FIX: Convert all DataFrame column names to lowercase to match the SQL schema
        chunk_df.columns = chunk_df.columns.str.lower()
        
        # 2. Convert DATE and TIMESTAMP columns (using new lowercase names)
        chunk_df['tpep_pickup_datetime'] = pd.to_datetime(chunk_df['tpep_pickup_datetime'])
        chunk_df['tpep_dropoff_datetime'] = pd.to_datetime(chunk_df['tpep_dropoff_datetime'])
        chunk_df['pickup_date'] = pd.to_datetime(chunk_df['pickup_date']) 
        
        # 3. Handle the duplicate 'airport_fee' (if still present in the CSV)
        # Assuming the duplicate column was dropped in the cleaning script, 
        # this check can often be skipped, but keep it in mind.
        
        # --- Push Chunk to PostgreSQL ---
        # 'if_exists='append' is used to add data to the table created earlier
        chunk_df.to_sql(TABLE_NAME, engine, if_exists='append', index=False)
        
        print(f"✅ Chunk #{chunk_num + 1} loaded successfully.")
        chunk_num += 1

    print(f"\n🎉 Chunked data load complete. Total chunks loaded: {chunk_num}.")

except Exception as e:
    print(f"❌ ERROR during chunked data load: {e}")

Starting chunked data load from cleaned_nyc_trips_q1.csv to PostgreSQL.
Processing chunk #1...
✅ Chunk #1 loaded successfully.
Processing chunk #2...
✅ Chunk #2 loaded successfully.
Processing chunk #3...
✅ Chunk #3 loaded successfully.
Processing chunk #4...
✅ Chunk #4 loaded successfully.
Processing chunk #5...
✅ Chunk #5 loaded successfully.
Processing chunk #6...
✅ Chunk #6 loaded successfully.
Processing chunk #7...
✅ Chunk #7 loaded successfully.
Processing chunk #8...
✅ Chunk #8 loaded successfully.
Processing chunk #9...
✅ Chunk #9 loaded successfully.
Processing chunk #10...
✅ Chunk #10 loaded successfully.
Processing chunk #11...
✅ Chunk #11 loaded successfully.
Processing chunk #12...
✅ Chunk #12 loaded successfully.
Processing chunk #13...
✅ Chunk #13 loaded successfully.
Processing chunk #14...
✅ Chunk #14 loaded successfully.
Processing chunk #15...
✅ Chunk #15 loaded successfully.
Processing chunk #16...
✅ Chunk #16 loaded successfully.
Processing chunk #17...
✅ Chunk #1

In [4]:
engine = create_engine(DATABASE_URL)

print("Starting data validation checks...")

try:
    with engine.connect() as connection:
        
        # 1. Row Count Check
        # Get the total number of rows in the loaded table
        count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME};")
        row_count = connection.execute(count_query).scalar()
        print(f"✅ Total rows loaded in PostgreSQL table '{TABLE_NAME}': {row_count:,}")

        # Note: You should compare this count to the number of rows in your final_df 
        # from the cleaning script (it was printed as 'Final merged dataset size').
        
        # 2. Schema Check
        # Display the first 5 rows and column names/types to confirm schema integrity
        sample_query = text(f"SELECT * FROM {TABLE_NAME} LIMIT 5;")
        sample_df = pd.read_sql(sample_query, connection)
        
        print("\n✅ First 5 rows of data:")
        print(sample_df)
        
        print("\n✅ Column names and data types (PostgreSQL inferred):")
        print(sample_df.dtypes)
        
        # 3. Basic Data Integrity Check (Example: Check for negative trip distances)
        integrity_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME} WHERE trip_distance <= 0;")
        invalid_trips = connection.execute(integrity_query).scalar()
        
        print(f"\n✅ Invalid trips (distance <= 0): {invalid_trips:,}")
        # This count should ideally be 0, as you filtered these in the cleaning step.

except Exception as e:
    print(f"❌ ERROR during data validation: {e}")


Starting data validation checks...
✅ Total rows loaded in PostgreSQL table 'taxi_trips': 8,772,953

✅ First 5 rows of data:
  tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  trip_distance  \
0  2023-01-01 00:32:10   2023-01-01 00:40:36                1           0.97   
1  2023-01-01 00:55:08   2023-01-01 01:01:27                1           1.10   
2  2023-01-01 00:25:04   2023-01-01 00:37:49                1           2.51   
3  2023-01-01 00:10:29   2023-01-01 00:21:19                1           1.43   
4  2023-01-01 00:50:34   2023-01-01 01:02:52                1           1.84   

  pulocationid dolocationid  payment_type  fare_amount  tip_amount  \
0          161          141             2          9.3        0.00   
1           43          237             1          7.9        4.00   
2           48          238             1         14.9       15.00   
3          107           79             1         11.4        3.28   
4          161          137             1    