Task 3: Store Cleaned Data in PostgreSQL

In [2]:
import psycopg2
from psycopg2 import sql
from datetime import date, datetime
import random
import os
import pandas as pd

In [None]:
# --- 1. PostgreSQL Connection Details ---
DB_NAME = "bank_reviews"
DB_USER = "postgres"
DB_PASSWORD = "123456"
DB_HOST = "localhost"
DB_PORT = "5432"

# --- 2. CSV File Path (MUST BE A STRING, NOT A DATAFRAME) ---
CSV_FILE_PATH = r"E:\Customer Experience Analytics for Fintech Apps week_2\Customer-Experience-Analytics-for-Fintech-Apps-week_2\notebooks\data\processed\final_reviews_with_themes.csv"

# Columns expected from CSV
CSV_COLUMNS = [
    "review_id",
    "content",
    "sentiment_label",
    "sentiment_score",
]


# --- 3. Connect to PostgreSQL ---
def connect_db():
    try:
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        print("Connected to database.")
        return conn
    except Exception as e:
        print("\n❌ ERROR: Could not connect to PostgreSQL.")
        print(f"Details: {e}")
        return None


# --- 4. Get Bank ID by App Name ---
def get_bank_id_by_app_name(conn, app_name):
    cursor = conn.cursor()
    query = sql.SQL("SELECT bank_id FROM bank_information.banks WHERE app_name = %s;")

    try:
        cursor.execute(query, (app_name,))
        result = cursor.fetchone()
    except Exception as e:
        print(f"❌ Query error for '{app_name}': {e}")
        cursor.close()
        return None

    cursor.close()
    return result[0] if result else None


# --- 5. Load Reviews from CSV + Build Insert Tuples ---
def load_reviews_from_csv(conn, file_path):
    if not os.path.exists(file_path):
        print(f"❌ CSV not found at: {file_path}")
        return []

    print(f"\n--- Reading CSV: {file_path} ---")

    df = pd.read_csv(CSV_FILE_PATH)

    missing_cols = [col for col in CSV_COLUMNS if col not in df.columns]
    if missing_cols:
        print(f"❌ Missing columns in CSV: {missing_cols}")
        return []

    reviews_to_insert = []
    bank_id_cache = {}

    for i, row in df.iterrows():

        app_name = str(row["bank"]).strip()

        # Lookup bank_id
        if app_name not in bank_id_cache:
            bank_id = get_bank_id_by_app_name(conn, app_name)
            if bank_id is None:
                print(f"⚠️ Skipping row {i}: Bank not found → {app_name}")
                continue
            bank_id_cache[app_name] = bank_id
        else:
            bank_id = bank_id_cache[app_name]

        # Validate and clean values
        try:
            rating_val = float(row["rating"]) if pd.notnull(row["rating"]) else None
            sentiment_score_val = float(row["sentiment_score"]) if pd.notnull(row["sentiment_score"]) else None
            review_date_obj = pd.to_datetime(row["review_date"]).date()

            record = (
                bank_id,
                int(row["review_id"]),
                row["review_text"],
                rating_val,
                review_date_obj,
                row["sentiment_label"],
                sentiment_score_val,
                row["source"]
            )

            reviews_to_insert.append(record)

        except Exception as e:
            print(f"⚠️ Error processing row {i}: {e}")
            continue

    print(f"✔ Loaded {len(reviews_to_insert)} valid reviews.")
    return reviews_to_insert


# --- 6. Bulk Insert into PostgreSQL ---
def bulk_insert_reviews(conn, reviews_data):
    if not reviews_data:
        print("❌ No data to insert.")
        return

    cursor = conn.cursor()

    columns = (
        "bank_id",
        "review_id",
        "review_text",
        "sentiment_label",
        "sentiment_score",
    )

    insert_query = sql.SQL("""
        INSERT INTO bank_information.reviews ({})
        VALUES ({})
    """).format(
        sql.SQL(", ").join(map(sql.Identifier, columns)),
        sql.SQL(", ").join(sql.Placeholder() * len(columns))
    )

    try:
        cursor.executemany(insert_query, reviews_data)
        conn.commit()
        print(f"✔ Inserted {len(reviews_data)} reviews into database.")
    except Exception as e:
        conn.rollback()
        print(f"❌ Insert error: {e}")
    finally:
        cursor.close()


# --- 7. Main Runner ---
def main():
    conn = connect_db()
    if not conn:
        return

    try:
        reviews_data = load_reviews_from_csv(conn, CSV_FILE_PATH)
        bulk_insert_reviews(conn, reviews_data)
    finally:
        conn.close()
        print("\nDatabase connection closed.")


if __name__ == "__main__":
    main()
