In [2]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import time

In [3]:
# Step 1: Extract - Read the NYC Taxi CSV file
start_time = time.time()
print("Extracting data...")
try:
    # Read only relevant columns to optimize memory usage
    df = pd.read_csv('/Users/rakshithreddykondra/Desktop/2017_Yellow_Taxi_Trip_Data_20250522.csv', 
                     usecols=['tpep_pickup_datetime', 'tpep_dropoff_datetime', 
                              'trip_distance', 'fare_amount', 'PULocationID'])
    print(f"Data extracted successfully. Rows: {len(df)}")
except FileNotFoundError:
    print("Error: CSV file not found. Please ensure '2017_Yellow_Taxi_Trip_Data_20250522.csv' is in the directory.")
    exit(1)
extract_time = time.time() - start_time
print(f"Extraction time: {extract_time:.2f} seconds")

Extracting data...
Data extracted successfully. Rows: 199689
Extraction time: 0.16 seconds


In [4]:
# Step 2: Transform - Clean and aggregate the data
print("Transforming data...")
# Convert datetime columns to proper format
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'], errors='coerce')
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'], errors='coerce')

# Handle missing values
df = df.dropna()

# Filter out invalid records (e.g., negative distances or fares, or unrealistic trips)
df = df[(df['trip_distance'] > 0) & 
        (df['fare_amount'] > 0) & 
        (df['trip_distance'] < 100) & 
        (df['fare_amount'] < 500)]

# Calculate trip duration in minutes
df['trip_duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() / 60
df = df[df['trip_duration'] > 0]  # Remove invalid durations

# Aggregate data: Compute average trip distance and fare by pickup location
agg_data = df.groupby('PULocationID').agg({
    'trip_distance': 'mean',
    'fare_amount': 'mean',
    'PULocationID': 'count'
}).rename(columns={'trip_distance': 'avg_trip_distance', 
                  'fare_amount': 'avg_fare_amount',
                  'PULocationID': 'trip_count'}).reset_index()

print(f"Data transformed successfully. Aggregated rows: {len(agg_data)}")
transform_time = time.time() - start_time - extract_time
print(f"Transformation time: {transform_time:.2f} seconds")

Transforming data...


  df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'], errors='coerce')
  df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'], errors='coerce')


Data transformed successfully. Aggregated rows: 255
Transformation time: 10.19 seconds


In [9]:
# Step 3: Load - Connect to PostgreSQL and load the aggregated data
print("Loading data to PostgreSQL...")
try:
    # Connect to PostgreSQL 
    conn = psycopg2.connect(
        dbname="taxi_data",
        user="taxi_user",  #PostgreSQL username
        password="taxidriver321",  #PostgreSQL password
        host="localhost",
        port="5432"
    )
    cursor = conn.cursor()

    # Create table for aggregated data
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS taxi_aggregates (
            "PULocationID" INTEGER PRIMARY KEY,
            "avg_trip_distance" FLOAT,
            "avg_fare_amount" FLOAT,
            "trip_count" INTEGER
        );
    """)
    conn.commit()

    # Use SQLAlchemy for efficient data loading
    engine = create_engine('postgresql://taxi_user:taxidriver321@localhost:5432/taxi_data')
    agg_data.to_sql('taxi_aggregates', engine, if_exists='replace', index=False)

    # Create an index to optimize queries
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_pulocation ON taxi_aggregates ("PULocationID");')
    conn.commit()
    print("Data loaded successfully.")
except Exception as e:
    print(f"Error loading data to PostgreSQL: {e}")
    conn.rollback()
finally:
    cursor.close()
    conn.close()

load_time = time.time() - start_time - extract_time - transform_time
print(f"Load time: {load_time:.2f} seconds")

Loading data to PostgreSQL...
Data loaded successfully.
Load time: 955.89 seconds


In [10]:
# Step 4: Query and Validate - Run sample queries to verify data
print("Validating data with sample queries...")
try:
    conn = psycopg2.connect(
        dbname="taxi_data",
        user="taxi_user",
        password="taxidriver321",
        host="localhost",
        port="5432"
    )
    cursor = conn.cursor()

    # Query 1: Top 5 pickup locations by trip count
    start_query_time = time.time()
    cursor.execute("""
        SELECT "PULocationID", trip_count, avg_trip_distance, avg_fare_amount
        FROM taxi_aggregates
        ORDER BY trip_count DESC
        LIMIT 5;
    """)
    results = cursor.fetchall()
    print("Top 5 pickup locations by trip count:")
    for row in results:
        print(row)
    query_time = time.time() - start_query_time
    print(f"Query execution time: {query_time:.2f} seconds")

except Exception as e:
    print(f"Error querying data: {e}")
finally:
    cursor.close()
    conn.close()

print(f"Total ETL pipeline execution time: {time.time() - start_time:.2f} seconds")

Validating data with sample queries...
Top 5 pickup locations by trip count:
(132, 29077, 31.881795233345944, 124.72203734910755)
(138, 19499, 25.802251397507565, 114.89924765372584)
(230, 9022, 19.877104854799377, 81.36422301041898)
(161, 8353, 19.588861486890938, 81.76651741889141)
(163, 7579, 20.09293706293706, 81.73039319171394)
Query execution time: 0.00 seconds
Total ETL pipeline execution time: 1103.07 seconds
