In [19]:
!pip install pandas psycopg2-binary python-dotenv

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 25.0.1 -> 25.2
[notice] To update, run: C:\Users\herme\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.12_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [20]:
# ##  Step 1. Setup and Import Libraries
import os
import pandas as pd
import psycopg2
from psycopg2 import extras
from dotenv import load_dotenv
from io import StringIO
from pathlib import Path

In [21]:
# ##  Step 2. Load Environment Variables

# Load variables from .env file
load_dotenv()

# Database settings
DB_NAME = os.getenv("PG_DBNAME")
DB_USER = os.getenv("PG_USER")
DB_PASSWORD = os.getenv("PG_PASSWORD")
DB_HOST = os.getenv("PG_HOST")
DB_PORT = os.getenv("PG_PORT")

In [22]:
# File settings
CSV_PATH = os.getenv("CSV_PATH")
CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", "200000"))
TABLE_NAME = os.getenv("TABLE_NAME", "people")

print(" Environment variables loaded successfully!")


 Environment variables loaded successfully!


In [23]:
# ##  Step 3. Define Database Table Schema

create_table_query = f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
    row_index INTEGER,
    user_id VARCHAR(50),
    first_name TEXT,
    last_name TEXT,
    sex VARCHAR(10),
    email TEXT,
    phone TEXT,
    date_of_birth DATE,
    job_title TEXT
);
"""


In [24]:
# ##  Step 4. Define ETL Functions

def extract_csv(csv_path, chunk_size):
    """Extracts data from a large CSV file in chunks."""
    print(f"Extracting data from: {csv_path}")
    return pd.read_csv(csv_path, chunksize=chunk_size, dtype=str, na_values=["", "NA", "N/A"])


def transform_chunk(chunk):
    """Cleans and transforms a single chunk of data."""
    column_map = {
        "Index": "row_index",
        "User Id": "user_id",
        "First Name": "first_name",
        "Last Name": "last_name",
        "Sex": "sex",
        "Email": "email",
        "Phone": "phone",
        "Date of birth": "date_of_birth",
        "Job Title": "job_title"
    }

    # Rename columns
    chunk.rename(columns=column_map, inplace=True)

    # Convert date format
    if "date_of_birth" in chunk.columns:
        chunk["date_of_birth"] = pd.to_datetime(chunk["date_of_birth"], errors="coerce")

    # Fill missing values
    chunk.fillna({"first_name": "Unknown", "last_name": "Unknown", "job_title": "Unspecified"}, inplace=True)

    # Reorder columns
    expected_columns = [
        "row_index", "user_id", "first_name", "last_name",
        "sex", "email", "phone", "date_of_birth", "job_title"
    ]
    chunk = chunk[[col for col in expected_columns if col in chunk.columns]]

    print(f" Transformed chunk with {len(chunk)} rows")
    return chunk


def copy_from_stringio(conn, df, table):
    """Efficiently loads a Pandas DataFrame into PostgreSQL using COPY."""
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False)
    buffer.seek(0)
    cursor = conn.cursor()
    try:
        cursor.copy_from(buffer, table, sep=",", null="")
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(" Error during COPY:", e)
    finally:
        cursor.close()


def load_to_postgres(conn, chunk, table_name):
    """Loads one chunk of data into PostgreSQL."""
    copy_from_stringio(conn, chunk, table_name)
    print(f" Loaded {len(chunk)} records into '{table_name}'")


In [25]:
# ## Step 5. Execute the ETL Pipeline (Safe Bulk Insert)

import pandas as pd
import psycopg2
from psycopg2 import extras

try:
    # Connect to PostgreSQL
    conn = psycopg2.connect(
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT
    )
    print(" Connected to PostgreSQL!")

    # Create table if not exists
    with conn.cursor() as cursor:
        cursor.execute(create_table_query)
        conn.commit()
        print("Table verified or created successfully.")

    # Process CSV in chunks
    for i, chunk in enumerate(extract_csv(CSV_PATH, CHUNK_SIZE), start=1):
        print(f"\n Processing Chunk {i} with {len(chunk)} rows")

        # Transform the chunk
        transformed = transform_chunk(chunk)

        # Convert DataFrame to list of tuples
        data_tuples = list(transformed.itertuples(index=False, name=None))

        # Prepare insert query
        columns_str = ', '.join(f'"{col}"' for col in transformed.columns)
        insert_query = f"INSERT INTO {TABLE_NAME} ({columns_str}) VALUES %s"

        # Bulk insert using execute_values (safe for commas inside text)
        try:
            with conn.cursor() as cursor:
                extras.execute_values(cursor, insert_query, data_tuples, page_size=10_000)
                conn.commit()
            print(f" Chunk {i} inserted successfully.")
        except Exception as e:
            conn.rollback()
            print(f" Error inserting chunk {i}: {e}")
            break

    print("\n ETL Pipeline completed successfully!")

except Exception as e:
    print(" Error:", e)

finally:
    if 'conn' in locals() and conn:
        conn.close()
        print(" Database connection closed.")

 Connected to PostgreSQL!
Table verified or created successfully.
Extracting data from: C:/Users/herme/Downloads/people-2000000/people-2000000.csv

 Processing Chunk 1 with 200000 rows
 Transformed chunk with 200000 rows
 Chunk 1 inserted successfully.

 Processing Chunk 2 with 200000 rows
 Transformed chunk with 200000 rows
 Chunk 2 inserted successfully.

 Processing Chunk 3 with 200000 rows
 Transformed chunk with 200000 rows
 Chunk 3 inserted successfully.

 Processing Chunk 4 with 200000 rows
 Transformed chunk with 200000 rows
 Chunk 4 inserted successfully.

 Processing Chunk 5 with 200000 rows
 Transformed chunk with 200000 rows
 Chunk 5 inserted successfully.

 Processing Chunk 6 with 200000 rows
 Transformed chunk with 200000 rows
 Chunk 6 inserted successfully.

 Processing Chunk 7 with 200000 rows
 Transformed chunk with 200000 rows
 Chunk 7 inserted successfully.

 Processing Chunk 8 with 200000 rows
 Transformed chunk with 200000 rows
 Chunk 8 inserted successfully.

 Pro