In [1]:
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
import glob
import numpy as np

collisions_df = pd.read_csv("../../input/collisions.csv")
geometry_df = gpd.read_file("../../input/taxi_zones/taxi_zones.shp")
collisions_df["crash_date"] = pd.to_datetime(collisions_df["crash_date"])
collisions_df = collisions_df[collisions_df['crash_date'].dt.year == 2024]


In [6]:
import glob
import pandas as pd
parquet_files = glob.glob("../../input/yellow_taxi_data/*2024*.parquet")
all_dfs = []
for file in parquet_files:
    df = pd.read_parquet(file)
    all_dfs.append(df)

taxi_df = pd.concat(all_dfs, ignore_index=True)

taxi_df['tpep_pickup_datetime'] = pd.to_datetime(taxi_df['tpep_pickup_datetime'], errors='coerce')

In [7]:
taxi_df.drop(columns=['store_and_fwd_flag'], inplace=True)

taxi_df.info()
taxi_df.head()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 41169720 entries, 0 to 41169719
Data columns (total 18 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int32         
 1   tpep_pickup_datetime   datetime64[us]
 2   tpep_dropoff_datetime  datetime64[us]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   PULocationID           int32         
 7   DOLocationID           int32         
 8   payment_type           int64         
 9   fare_amount            float64       
 10  extra                  float64       
 11  mta_tax                float64       
 12  tip_amount             float64       
 13  tolls_amount           float64       
 14  improvement_surcharge  float64       
 15  total_amount           float64       
 16  congestion_surcharge   float64       
 17  Airport_fee            float64       
dtypes: datetime64[us](2)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,1,2024-04-01 00:02:40,2024-04-01 00:30:42,0.0,5.2,1.0,161,7,1,29.6,3.5,0.5,8.65,0.0,1.0,43.25,2.5,0.0
1,2,2024-04-01 00:41:12,2024-04-01 00:55:29,1.0,5.6,1.0,264,264,1,25.4,1.0,0.5,10.0,0.0,1.0,37.9,0.0,0.0
2,2,2024-04-01 00:48:42,2024-04-01 01:05:30,1.0,3.55,1.0,186,236,1,20.5,1.0,0.5,5.1,0.0,1.0,30.6,2.5,0.0
3,2,2024-04-01 00:56:02,2024-04-01 01:05:09,1.0,1.06,1.0,137,164,2,10.0,1.0,0.5,0.0,0.0,1.0,15.0,2.5,0.0
4,1,2024-04-01 00:08:32,2024-04-01 00:10:24,1.0,0.7,1.0,236,263,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0


In [4]:
taxi_df.replace('NULL', np.nan, inplace=True)

In [None]:
collisions_df.replace('NULL', np.nan, inplace=True)
collisions_df.drop(columns=['location'], inplace=True)


In [7]:
cols = ['crash_date', 'crash_time', 'latitude', 'longitude', 'location',
       'on_street_name', 'number_of_persons_injured',
       'number_of_persons_killed', 'number_of_pedestrians_injured',
       'number_of_pedestrians_killed', 'number_of_cyclist_injured',
       'number_of_cyclist_killed', 'number_of_motorist_injured',
       'number_of_motorist_killed', 'contributing_factor_vehicle_1', 'collision_id','borough', 'zip_code',
       'cross_street_name', 'off_street_name']
collisions_df = collisions_df[cols]
collisions_df.info()
collisions_df.head()

# Replace 'Unspecified' in 'contributing_factor_vehicle_1' with NaN
collisions_df['contributing_factor_vehicle_1'].replace('Unspecified', np.nan, inplace=True)

# Drop rows where 'latitude' or 'longitude' are missing
collisions_df.dropna(subset=['latitude', 'longitude'], inplace=True)


<class 'pandas.core.frame.DataFrame'>
Index: 91264 entries, 200490 to 291753
Data columns (total 20 columns):
 #   Column                         Non-Null Count  Dtype         
---  ------                         --------------  -----         
 0   crash_date                     91264 non-null  datetime64[ns]
 1   crash_time                     91264 non-null  object        
 2   latitude                       84262 non-null  float64       
 3   longitude                      84262 non-null  float64       
 4   location                       84262 non-null  object        
 5   on_street_name                 65077 non-null  object        
 6   number_of_persons_injured      91264 non-null  int64         
 7   number_of_persons_killed       91264 non-null  int64         
 8   number_of_pedestrians_injured  91264 non-null  int64         
 9   number_of_pedestrians_killed   91264 non-null  int64         
 10  number_of_cyclist_injured      91264 non-null  int64         
 11  number_of_cycl

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  collisions_df['contributing_factor_vehicle_1'].replace('Unspecified', np.nan, inplace=True)


In [8]:
df = collisions_df
for col in df.columns:
    non_null_count = df[col].notnull().sum()
    unique_count = df[col].nunique(dropna=True)
    is_numeric = pd.api.types.is_numeric_dtype(df[col])

    # Skip numeric columns
    if is_numeric:
        continue

    # Print only if there are duplicate values (i.e., normalization candidates)
    if unique_count < non_null_count:
        print(f"{col}: {non_null_count} non-null values, {unique_count} unique values")
        print(" → Column is not purely numeric.")
        print(f" → {col} has duplicate values and could be normalized.")
        print("-" * 50)


crash_date: 84262 non-null values, 321 unique values
 → Column is not purely numeric.
 → crash_date has duplicate values and could be normalized.
--------------------------------------------------
crash_time: 84262 non-null values, 1440 unique values
 → Column is not purely numeric.
 → crash_time has duplicate values and could be normalized.
--------------------------------------------------
location: 84262 non-null values, 40961 unique values
 → Column is not purely numeric.
 → location has duplicate values and could be normalized.
--------------------------------------------------
on_street_name: 58601 non-null values, 5179 unique values
 → Column is not purely numeric.
 → on_street_name has duplicate values and could be normalized.
--------------------------------------------------
contributing_factor_vehicle_1: 62351 non-null values, 54 unique values
 → Column is not purely numeric.
 → contributing_factor_vehicle_1 has duplicate values and could be normalized.
---------------------

In [9]:
import pandas as pd

# 1. Create the lookup tables with IDs
def create_lookup_table(df, column_name, new_column_name):
    lookup_df = df[[column_name]].dropna().drop_duplicates().reset_index(drop=True)
    lookup_df.insert(0, f"{new_column_name}_id", range(1, len(lookup_df) + 1))
    return lookup_df

contributing_factor_lut = create_lookup_table(collisions_df, 'contributing_factor_vehicle_1', 'contributing_factor')
borough_lut = create_lookup_table(collisions_df, 'borough', 'borough')
cross_street_lut = create_lookup_table(collisions_df, 'cross_street_name', 'cross_street')
off_street_lut = create_lookup_table(collisions_df, 'off_street_name', 'off_street')

# 2. Map the original columns to their IDs
collisions_df = collisions_df.merge(contributing_factor_lut, how='left', left_on='contributing_factor_vehicle_1', right_on='contributing_factor_vehicle_1')
collisions_df = collisions_df.merge(borough_lut, how='left', left_on='borough', right_on='borough')
collisions_df = collisions_df.merge(cross_street_lut, how='left', left_on='cross_street_name', right_on='cross_street_name')
collisions_df = collisions_df.merge(off_street_lut, how='left', left_on='off_street_name', right_on='off_street_name')

# 3. Optional: Drop original text columns and rename *_id columns
collisions_df = collisions_df.drop(columns=[
    'contributing_factor_vehicle_1',
    'borough',
    'cross_street_name',
    'off_street_name'
])

collisions_df = collisions_df.rename(columns={
    'contributing_factor_id': 'contributing_factor_vehicle_1_id',
    'borough_id': 'borough_id',
    'cross_street_id': 'cross_street_name_id',
    'off_street_id': 'off_street_name_id'
})


In [None]:
contributing_factor_lut
borough_lut
cross_street_lut
off_street_lut
collisions_df
taxi_df
geometry_df


In [8]:
import psycopg2
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv(override=True)

RDS_HOST = os.getenv("DB_HOST")
RDS_PORT = os.getenv("DB_PORT")
RDS_USER = os.getenv("DB_USER")
RDS_PASSWORD = os.getenv("DB_PASSWORD")
RDS_DB = os.getenv("DB_NAME")

try:
    # Establish PostgreSQL connection
    conn = psycopg2.connect(
        host=RDS_HOST,
        user=RDS_USER,
        password=RDS_PASSWORD,
        dbname=RDS_DB,
        port=RDS_PORT,
        sslmode="require"  
    )
    print("✅ Connected to PostgreSQL RDS instance successfully!")

    # Create a cursor object
    cursor = conn.cursor()

    # Execute a test query
    cursor.execute("SELECT version();")
    version = cursor.fetchone()
    print("Database version:", version)

    # Close connection
    # cursor.close()
    # conn.close()


except Exception as e:
    print("Error connecting to RDS:", e)



✅ Connected to PostgreSQL RDS instance successfully!
Database version: ('PostgreSQL 17.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 12.4.0, 64-bit',)


In [21]:
collisions_df.head()

Unnamed: 0,crash_date,crash_time,latitude,longitude,on_street_name,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_cyclist_injured,number_of_cyclist_killed,number_of_motorist_injured,number_of_motorist_killed,collision_id,zip_code,contributing_factor_vehicle_1_id,borough_id,cross_street_name_id,off_street_name_id
0,2024-01-01,17:07,40.665657,-73.888084,,0,0,0,0,0,0,0,0,4702082,11207.0,,1.0,1.0,
1,2024-01-01,10:00,40.730442,-73.91367,LONG ISLAND EXPRESSWAY,1,0,0,0,0,0,1,0,4691881,,1.0,,,
2,2024-01-01,21:56,40.66643,-73.882835,,2,0,0,0,0,0,2,0,4692079,11207.0,1.0,1.0,2.0,
3,2024-01-01,6:00,40.68592,-73.846924,97 AVENUE,0,0,0,0,0,0,0,0,4691952,11416.0,,2.0,,1.0
4,2024-01-01,5:57,40.672382,-73.78574,BAISLEY BOULEVARD,1,0,0,0,0,0,0,0,4691606,11434.0,2.0,2.0,,2.0


In [12]:
def create_table_from_df(df, table_name, cursor):
    # Generate column definitions
    columns = []
    for col_name, dtype in zip(df.columns, df.dtypes):
        if 'int' in str(dtype):
            col_type = 'INTEGER'
        elif 'float' in str(dtype):
            col_type = 'NUMERIC'
        elif 'datetime' in str(dtype):
            col_type = 'TIMESTAMP'
        elif 'bool' in str(dtype):
            col_type = 'BOOLEAN'
        else:
            col_type = 'TEXT'
        
        columns.append(f'"{col_name}" {col_type}')
    
    # Create table
    create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
    cursor.execute(create_table_query)
    print(f"Created table {table_name}")

In [13]:
def copy_df_to_postgres(df, table_name, cursor):
    from io import StringIO
    import csv  # Import csv for quoting options

    # Create a buffer
    buffer = StringIO()

    # Write the DataFrame to the buffer
    df.to_csv(buffer, index=False, header=False, na_rep='', quoting=csv.QUOTE_MINIMAL)
    buffer.seek(0)

    try:
        # Use COPY command for fast data loading
        column_list = ','.join([f'"{col}"' for col in df.columns])
        cursor.copy_expert(
            f"COPY {table_name} ({column_list}) FROM STDIN WITH CSV NULL ''",
            buffer
        )
        print(f"Uploaded {len(df)} rows to {table_name}")
    except Exception as e:
        print(f"Error uploading to {table_name}: {e}")

In [None]:
# List of dataframes to upload
dataframes = {
    'contributing_factor_lut': contributing_factor_lut,
    'borough_lut': borough_lut,
    'cross_street_lut': cross_street_lut,
    'off_street_lut': off_street_lut,
    'collisions_df': collisions_df,
    'taxi_df': taxi_df,
    'geometry_df': geometry_df
}

# Process each dataframe
for table_name, df in dataframes.items():
    print(f"🔄 Processing {table_name} with {len(df)} rows...")
    
    # Create the table
    create_table_from_df(df, table_name, cursor)
    
    # Process in chunks to handle large dataframes
    chunk_size = 50000
    total_chunks = (len(df) + chunk_size - 1) // chunk_size  # Ceiling division
    
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i+chunk_size]
        chunk_num = i // chunk_size + 1
        print(f"Uploading chunk {chunk_num}/{total_chunks} ({i} to {min(i+chunk_size, len(df))} rows)")
        copy_df_to_postgres(chunk, table_name, cursor)
        # Commit after each chunk to avoid long transactions
        conn.commit()

print("All data uploaded successfully!")

# Close connections
cursor.close()
conn.close()
print("Connection closed.")

🔄 Processing contributing_factor_lut with 54 rows...
Created table contributing_factor_lut
Uploading chunk 1/1 (0 to 54 rows)
Uploaded 54 rows to contributing_factor_lut
🔄 Processing borough_lut with 5 rows...
Created table borough_lut
Uploading chunk 1/1 (0 to 5 rows)
Uploaded 5 rows to borough_lut
🔄 Processing cross_street_lut with 18027 rows...
Created table cross_street_lut
Uploading chunk 1/1 (0 to 18027 rows)
Uploaded 18027 rows to cross_street_lut
🔄 Processing off_street_lut with 5974 rows...
Created table off_street_lut
Uploading chunk 1/1 (0 to 5974 rows)
Uploaded 5974 rows to off_street_lut
🔄 Processing collisions_df with 84262 rows...
Created table collisions_df
Uploading chunk 1/2 (0 to 50000 rows)
Uploaded 50000 rows to collisions_df
Uploading chunk 2/2 (50000 to 84262 rows)
Uploaded 34262 rows to collisions_df
🔄 Processing taxi_df with 41169720 rows...
Created table taxi_df
Uploading chunk 1/824 (0 to 50000 rows)
Uploaded 50000 rows to taxi_df
Uploading chunk 2/824 (50000

NameError: name 'taxi_df' is not defined

In [19]:
# Drop all tables in the public schema
cursor.execute("""
    DO $$ 
    BEGIN
        EXECUTE (
            SELECT string_agg('DROP TABLE IF EXISTS "' || table_name || '" CASCADE;', ' ')
            FROM information_schema.tables
            WHERE table_schema = 'public'
        );
    END $$;
""")
conn.commit()

print("All tables dropped successfully!")

All tables dropped successfully!


In [4]:
cursor.execute("""
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = 'public'
""")
tables = cursor.fetchall()

# Print the table names
print("Tables in the database:")
for table in tables:
    print(table[0])

cursor.execute("""
    SELECT COUNT(*) AS row_count
    FROM taxi_df
""")
row_count = cursor.fetchone()

# Print the number of rows
print(f"Number of rows in taxi_df: {row_count[0]}")

Tables in the database:
contributing_factor_lut
borough_lut
cross_street_lut
off_street_lut
taxi_df
collisions_df
Number of rows in taxi_df: 20500000


In [9]:
cursor.execute("SELECT COUNT(*) FROM taxi_df")
uploaded_rows = cursor.fetchone()[0]
print(f"Rows already uploaded: {uploaded_rows}")

Rows already uploaded: 20500000


In [11]:
start_row = uploaded_rows  # Start from the last uploaded row
chunk_size = 50000  # Adjust chunk size if needed

for i in range(start_row, len(taxi_df), chunk_size):
    chunk = taxi_df.iloc[i:i+chunk_size]
    chunk_num = i // chunk_size + 1
    print(f"Uploading chunk {chunk_num} ({i} to {min(i+chunk_size, len(taxi_df))} rows)")
    copy_df_to_postgres(chunk, 'taxi_df', cursor)
    conn.commit()  # Commit after each chunk

Uploading chunk 411 (20500000 to 20550000 rows)


NameError: name 'copy_df_to_postgres' is not defined