In [None]:
import os
import psycopg2
import pandas as pd

from postgres_utilities import connect_db, close_db
from psycopg2.extras import execute_batch
from psycopg2 import sql

from io import StringIO
from typing import Tuple, List

In [None]:
def populate_product_data(conn: psycopg2.extensions.connection, csv_file: str
) -> None:
    """
    Populate the products table with data from the CSV file.
    Args:
        conn (psycopg2.extensions.connection): PostgreSQL connection object.
        csv_file (str): Path to the CSV file containing product data.
    """
    # Create a string buffer
    # Read the train.csv file into a pandas dataframe, skipping bad lines
    df = pd.read_csv(csv_file, on_bad_lines="skip")
    output = StringIO()
    df_copy = df.copy()
    # Drop rows where any column value is empty
    df_copy = df_copy.dropna()
    # Convert year to integer if it's not already
    df_copy["year"] = df_copy["year"].astype("Int64")

    # Replace NaN with None for proper NULL handling in PostgreSQL
    df_copy = df_copy.replace({pd.NA: None, pd.NaT: None})
    df_copy = df_copy.where(pd.notnull(df_copy), None)
    print("Starting to populate products table")
    # Convert DataFrame to csv format in memory
    tuples: List[Tuple] = [tuple(x) for x in df_copy.to_numpy()]
    cols_list: List[str] = list(df_copy.columns)
    cols: str = ",".join(cols_list)
    placeholders: str = ",".join(
        ["%s"] * len(cols_list)
    )  # Create the correct number of placeholders
    # Create a parameterized query
    query: sql.SQL = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
        sql.Identifier("products"), sql.SQL(cols), sql.SQL(placeholders)
    )
    cursor: psycopg2.extensions.cursor = conn.cursor()
    try:
        execute_batch(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.Error) as error:
        print(f"Error while inserting data into PostgreSQL: {error}")
        conn.rollback()

    # Commit and close
    conn.commit()
    print("Finished populating products table")


In [None]:
def insert_dataframe(
    df: pd.DataFrame, table_name: str, connection: psycopg2.extensions.connection
) -> None:
    """
    Insert product_review data into the PostgreSQL table.
    Args:
        df (pd.DataFrame): DataFrame containing the data to be inserted.
        table_name (str): Name of the target table in PostgreSQL.
        connection (psycopg2.extensions.connection): PostgreSQL connection object.
    """
    if pd.api.types.is_integer_dtype(df["timestamp"]):
        df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
    tuples: List[Tuple] = [tuple(x) for x in df.to_numpy()]
    cols_list: List[str] = list(df.columns)
    cols: str = ",".join(cols_list)
    placeholders: str = ",".join(
        ["%s"] * len(cols_list)
    )  # Create the correct number of placeholders
    # Create a parameterized query
    query: sql.SQL = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
        sql.Identifier(table_name), sql.SQL(cols), sql.SQL(placeholders)
    )
    cursor: psycopg2.extensions.cursor = connection.cursor()
    try:
        execute_batch(cursor, query, tuples)
        connection.commit()
    except (Exception, psycopg2.Error) as error:
        print(f"Error while inserting data into PostgreSQL: {error}")
        connection.rollback()
    finally:
        cursor.close()

def populate_product_review_data(
    conn: psycopg2.extensions.connection, csv_file: str
) -> None:
    """
    Populate the product_review table with data from the CSV file.
    Args:
        conn (psycopg2.extensions.connection): PostgreSQL connection object.
        csv_file (str): Path to the CSV file containing product review data.
    """
    try:
        # Read the product_review.csv file into a pandas dataframe, skipping bad lines
        product_review: pd.DataFrame = pd.read_csv(csv_file, on_bad_lines="skip")[
            ["user_id", "product_id", "rating", "timestamp", "review"]
        ]
        insert_dataframe(product_review, "product_review", conn)
        print(
            f"DataFrame successfully written to the 'product_review' table in the database."
        )
    except psycopg2.OperationalError as e:
        print(f"Error connecting to PostgreSQL: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

In [None]:
try:
    conn = connect_db()
    populate_product_data(conn, "./products.csv") # Populate the products table with the stylesc.csv data
    populate_product_review_data(conn, "./product_reviews.csv")
except Exception as e:
    if conn is not None and not conn.closed:
        conn.rollback()
    err_msg = f"An error occurred and the transaction was rolled back: {e}"
    raise Exception(err_msg)
finally:
    close_db(conn)