In [None]:
%pip install --upgrade pip

In [None]:
%pip install -q -r requirements.txt

In [None]:
import psycopg2
import pandas as pd
import random
from datetime import datetime
import os
from dotenv import load_dotenv

load_dotenv()

class PostgresPipeline:
    def __init__(self, db_config):
        self.db_config = db_config
        self.conn = None
        self.cur = None
        self.connect()

    def connect(self):
        try:
            self.conn = psycopg2.connect(**self.db_config)
            self.conn.set_session(autocommit=True)
            self.cur = self.conn.cursor()
            print("Database connection established.")
        except psycopg2.Error as e:
            print(f"Error: Could not connect to the database. {e}")
            self.conn = None

    def create_table(self, table_name, columns, foreign_keys=None):
        column_defs = [f"{col} {dtype}" for col, dtype in columns.items()]
        if foreign_keys:
            column_defs.extend(foreign_keys)
        create_table_sql = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            {', '.join(column_defs)}
        );
        """
        self.execute_query(create_table_sql)
        print(f"Table '{table_name}' created or already exists.")

    def alter_table(self, table_name, columns):
        """
        Alter table by adding new columns if they do not exist.
        """
        try:
            for col, dtype in columns.items():
                alter_query = f"""
                ALTER TABLE {table_name}
                ADD COLUMN IF NOT EXISTS {col} {dtype};
                """
                self.cur.execute(alter_query)
                print(f"✅ Column '{col}' added to '{table_name}' (if not exists).")

            # ✅ Commit changes to the database
            self.conn.commit()

        except psycopg2.Error as e:
            print(f"❌ Error altering table '{table_name}': {e}")

    def load_csv_to_table(self, csv_path, table_name, columns):
        try:
            with open(csv_path, 'r') as f:
                self.cur.copy_expert(
                    f"""
                    COPY {table_name} ({', '.join(columns)})
                    FROM STDIN
                    WITH (FORMAT CSV, HEADER TRUE);
                    """, f
                )
            print(f"Data loaded into table '{table_name}' from '{csv_path}'.")
        except Exception as e:
            print(f"Error loading data into table '{table_name}': {e}")

    def execute_query(self, query):
        try:
            self.cur.execute(query)
            print("Query executed successfully.")
        except psycopg2.Error as e:
            print(f"Error executing query: {e}")

    def fetch_all(self, query):
        try:
            self.cur.execute(query)
            return self.cur.fetchall()
        except psycopg2.Error as e:
            print(f"Error fetching data: {e}")
            return []

    # Utility to generate a sample CSV file
    def generate_sample_csv(self, csv_path, num_rows, columns):
        """
        Generate a sample CSV file for chat history.
        """
        rows = []
        for _ in range(num_rows):
          row = {col: random.choice(vals) for col, vals in columns.items()}
        rows.append(row)
        df = pd.DataFrame(rows)  # Create DataFrame from the list of rows
        df.to_csv(csv_path, index=False)
        print(f"Sample CSV file generated at '{csv_path}'.")

    def insert_data(self, table_name, columns, data):
        try:
            column_names = ', '.join(columns.keys())
            placeholders = ', '.join(['%s' for _ in columns])
            insert_sql = f"""
            INSERT INTO {table_name} ({column_names})
            VALUES ({placeholders});
            """
            self.cur.execute(insert_sql, (data,))
            print("Data inserted successfully.")
        except psycopg2.Error as e:
            print(f"Error inserting data: {e}")

    def create_chat_history_csv(self, csv_path, num_rows):
        columns = {
            'user_id': [1, 2, 3],
            'message': ['hello', 'hi', 'hey'],
            "response": ['hello', 'hi', 'hey'],
            'timestamp': [datetime.now().strftime('%Y-%m-%d %H:%M:%S') for _ in range(num_rows)]
        }
        self.generate_sample_csv(csv_path, num_rows, columns)

        # prepare data
        data = [(
            random.choice(columns['user_id']),
            random.choice(columns['message']),
            random.choice(columns['response']),
            random.choice(columns['timestamp'])
        )
                for _ in range(num_rows)]
        return data

    def close(self):
        if self.cur:
            self.cur.close()
        if self.conn:
            self.conn.close()
        print("Database connection closed.")

In [None]:
if __name__ == "__main__":

    DB_CONFIG = {
      "host": os.getenv("POSTGRES_HOST"),
      "port": os.getenv("POSTGRES_PORT"),
      "database": os.getenv("POSTGRES_DB"),
      "user": os.getenv("POSTGRES_USER"),
      "password": os.getenv("POSTGRES_PASSWORD"),
    }

    pipeline = PostgresPipeline(DB_CONFIG)



    # Columns to movies table
    MOVIES_TABLE = "movies"
    MOVIES_COLUMNS = {
        "release_date": "DATE",
        "poster_url": "TEXT",
        "description": "TEXT",
        "vote_average": "FLOAT",
        "links": "TEXT",
    }

    # Create movies table
    pipeline.alter_table(MOVIES_TABLE, MOVIES_COLUMNS)

    result = pipeline.fetch_all(f"SELECT * FROM {MOVIES_TABLE}")
    print("🔍 Chat History Data:")
    for row in result:
        print(row)









In [None]:
# Create instance of PostgresPipeline
if __name__ == "__main__":
    # Database connection details
   DB_CONFIG = {
    "dbname": os.getenv("POSTGRES_DB"),
    "user": os.getenv("POSTGRES_USER"),
    "password": os.getenv("POSTGRES_PASSWORD"),
    "host": os.getenv("POSTGRES_HOST"),
    "port": os.getenv("POSTGRES_PORT"),
    }

    # Create instance of PostgresPipeline
pipeline = PostgresPipeline(DB_CONFIG)

    # Close the connection
pipeline.close()

    # Check if the connection is closed
print(pipeline.conn)

    # Check if the cursor is closed
print(pipeline.cur)

    # Reconnect
pipeline.connect()

    # Test the connection
print(pipeline.conn)



In [None]:
movies = fetch_movies(pipeline, 5)
ratings = fetch_ratings(pipeline, 5)
tags = fetch_tags(pipeline, 5)
links = fetch_links(pipeline, 5)

print(movies)
print(ratings)
print(tags)
print(links)