# Stage 1: Synthetic Social Graph & Persona Generation.

In [9]:
import networkx as nx
import random
import json
import os

# --- Configuration ---
NUM_USERS = 25
PERSONAS = [
    "AI Researcher",
    "Data Scientist",
    "Web Developer",
    "Financial Analyst",
    "NBA Fanatic",
    "Indie Gamer",
    "Political Commentator",
    "World Traveler",
    "Aspiring Chef"
]
AVG_FOLLOWS_PER_USER = 8

# --- 1.1. & 1.2. Define Personas & Create Users ---

def create_users(num_users, personas):
    """
    Generates a list of user objects, each with a unique ID and
    a primary and secondary persona.
    """
    users = []
    for i in range(1, num_users + 1):
        primary_persona = random.choice(personas)
        secondary_persona = random.choice([p for p in personas if p != primary_persona])

        users.append({
            "user_id": f"user_{i}",
            "username": f"user_{i}",
            "personas": [primary_persona, secondary_persona]
        })
    return users

print(f"Creating {NUM_USERS} users...")
users = create_users(NUM_USERS, PERSONAS)

# --- 1.3. Build the Social Graph (NetworkX) ---

def create_social_graph(users, avg_follows):
    """
    Creates a NetworkX DiGraph (directed graph) to represent
    "follows" relationships.

    Users are more likely to follow others with shared personas.
    """
    G = nx.DiGraph()

    # Add all users as nodes to the graph
    user_ids = [u["user_id"] for u in users]
    G.add_nodes_from(user_ids)

    # Create "follows" (edges)
    for user in users:
        user_id = user["user_id"]
        user_personas = set(user["personas"])

        # Create a pool of potential users to follow (excluding self)
        potential_follows = [u for u in users if u["user_id"] != user_id]

        # Weight the pool: 5x more likely to follow someone with a shared persona
        weights = []
        for pf in potential_follows:
            pf_personas = set(pf["personas"])
            if user_personas.intersection(pf_personas):
                weights.append(5)  # Higher weight for shared interest
            else:
                weights.append(1)  # Normal weight

        # Decide how many users this person will follow
        num_to_follow = random.randint(avg_follows - 3, avg_follows + 3)

        # Select and add the edges
        if potential_follows: # Ensure the list is not empty
            followed_list = random.choices(potential_follows, weights=weights, k=num_to_follow)

            # Add edges to the graph
            for followed_user in followed_list:
                G.add_edge(user_id, followed_user["user_id"])

    return G

print("Building social graph...")
social_graph = create_social_graph(users, AVG_FOLLOWS_PER_USER)

# --- Verification & Output ---

print("\n--- Stage 1 Complete ---")
print(f"Total users created: {len(users)}")
print(f"Example user:\n{json.dumps(users[0], indent=2)}")

print("\nSocial Graph Stats:")
print(f"Total nodes (users): {social_graph.number_of_nodes()}")
print(f"Total edges (follows): {social_graph.number_of_edges()}")

# Example: Check who user_1 follows
user_1_follows = list(social_graph.successors("user_1"))
print(f"\nuser_1 follows {len(user_1_follows)} users: {user_1_follows[:5]}...")

# Example: Check who follows user_1
user_1_followed_by = list(social_graph.predecessors("user_1"))
print(f"user_1 is followed by {len(user_1_followed_by)} users: {user_1_followed_by[:5]}...")

# --- 1.4. Save Graph for later stages ---
# We can save the user list and the graph (as an edge list)
# to be used in the next stage.

# Create data directory if it doesn't exist
os.makedirs("data", exist_ok=True)

# Save users list
with open("data/users.json", "w") as f:
    json.dump(users, f, indent=2)

# Save the graph edges
nx.write_edgelist(social_graph, "data/follows.edgelist")

print("\nUsers list saved to 'data/users.json'")
print("Social graph saved to 'data/follows.edgelist'")

Creating 25 users...
Building social graph...

--- Stage 1 Complete ---
Total users created: 25
Example user:
{
  "user_id": "user_1",
  "username": "user_1",
  "personas": [
    "Web Developer",
    "NBA Fanatic"
  ]
}

Social Graph Stats:
Total nodes (users): 25
Total edges (follows): 160

user_1 follows 6 users: ['user_24', 'user_21', 'user_6', 'user_8', 'user_7']...
user_1 is followed by 4 users: ['user_3', 'user_18', 'user_23', 'user_25']...

Users list saved to 'data/users.json'
Social graph saved to 'data/follows.edgelist'


# Stage 2: GenAI Content & Unique Feed Simulation

In [10]:
import json
import os
import random
import uuid
from datetime import datetime, timedelta
import networkx as nx
from tqdm import tqdm

# --- Configuration ---
DATA_DIR = "data"
# Generate a variable number of posts per user

AVG_POSTS_PER_USER = 40

# --- 1. Define Post Templates (Simulating GenAI) ---
# This dictionary mimics an LLM's ability to generate
# persona-based content.

POST_TEMPLATES = {
    "AI Researcher": [
        "Just read a fascinating paper on {topic}. The implications for {field} are huge.",
        "My hot take: {topic} is completely overhyped. The real breakthrough is still 5 years away.",
        "Anyone else attending the {conf} conference? Excited to see the talks on {topic}.",
        "Struggling with this new {library} implementation. Why is {detail} so non-intuitive?",
    ],
    "Data Scientist": [
        "Finished my analysis on {dataset}. Turns out the primary driver for {metric} is {finding}.",
        "Python's {library} is a lifesaver for {task}.",
        "Building a new {model} model to predict {metric}. So far the results are... interesting.",
        "Hot take: 90% of data science is just cleaning {dataset} data.",
    ],
    "Web Developer": [
        "Why did we ever use {old_tech}? {new_tech} is so much cleaner.",
        "Just deployed the new {feature} to prod. Fingers crossed!",
        "TIL about this weird CSS bug in {browser}. Nightmare fuel.",
        "Debating {framework_a} vs {framework_b} for the new project. Thoughts?",
    ],
    "Financial Analyst": [
        "{stock} is looking seriously {sentiment} after their earnings call.",
        "The {market_event} is going to have a major impact on {sector} stocks.",
        "My model predicts a {movement} for {stock} in Q{quarter}.",
        "Deep dive into {company}'s 10-K. Their {metric} looks suspicious.",
    ],
    "NBA Fanatic": [
        "Can you believe that {player} trade? The {team} got fleeced!",
        "{player} is the GOAT, I don't care what anyone says.",
        "That game last night was insane. {team} totally choked in the {quarter}.",
        "My prediction for the finals: {team} vs {team}.",
    ],
    "Indie Gamer": [
        "Just sank 40 hours into {game}. It's a masterpiece of {genre}.",
        "Stop playing {aaa_game} and go play {game}. You won't regret it.",
        "The art style in {game} is just breathtaking.",
        "Shoutout to the solo dev of {game}. Incredible achievement.",
    ],
    "Political Commentator": [
        "The new {policy} is a disaster for {group}.",
        "Can't believe what {politician} said about {issue}. Completely out of touch.",
        "The upcoming {election} is the most important one yet.",
        "Reading the latest poll on {issue}. The numbers are surprising.",
    ],
    "World Traveler": [
        "Just landed in {city}! The {food} is incredible.",
        "Back from {country}. My favorite part was definitely {activity}.",
        "Packing for {country}. Any tips for {activity}?",
        "That {airline} flight was rough, but the view of {landmark} was worth it.",
    ],
    "Aspiring Chef": [
        "Tonight's experiment: {dish} with a {ingredient} twist. It actually worked!",
        "Perfected my {technique} for {food}. The secret is {secret}.",
        "I will never buy {food} from a store again. Homemade is so much better.",
        "Failed attempt at {dish}. It was a {texture} mess.",
    ]
}

# --- 2. Define Topic Fillers ---
# These will be slotted into the templates above.

TOPIC_FILLERS = {
    "{topic}": ["RAG systems", "scaling laws", "GANs", "customer churn", "React hooks", "CSS grid", "inflation", "the playoffs", "Stardew Valley", "immigration policy", "Japan", "sourdough bread"],
    "{field}": ["NLP", "robotics", "e-commerce", "frontend dev", "macroeconomics"],
    "{conf}": ["NeurIPS", "ICLR", "WWDC", "JSConf"],
    "{library}": ["PyTorch", "Pandas", "React", "D3.js"],
    "{detail}": ["the data loader", "the async handling", "the state management"],
    "{dataset}": ["sales data", "user logs", "sensor data"],
    "{metric}": ["conversion rate", "user engagement", "stock price"],
    "{finding}": ["seasonal trends", "user location", "ad spend"],
    "{task}": ["ETL", "data viz", "model training"],
    "{model}": ["regression", "neural net", "prophet"],
    "{old_tech}": ["jQuery", "AngularJS", "legacy PHP"],
    "{new_tech}": ["Svelte", "Next.js", "FastAPI"],
    "{feature}": ["checkout page", "auth flow", "dashboard"],
    "{browser}": ["Safari", "Chrome", "Firefox"],
    "{framework_a}": ["React", "Vue"],
    "{framework_b}": ["Svelte", "Angular"],
    "{stock}": ["TSLA", "AAPL", "GOOGL", "AMZN"],
    "{sentiment}": ["undervalued", "overvalued", "volatile"],
    "{market_event}": ["Fed rate hike", "CPI report"],
    "{sector}": ["tech", "healthcare", "energy"],
    "{movement}": ["10% upside", "20% drop"],
    "{quarter}": ["1", "2", "3", "4"],
    "{company}": ["Enron", "Meta", "startup X"],
    "{player}": ["LeBron", "Jordan", "Curry", "Wemby"],
    "{team}": ["Lakers", "Celtics", "Knicks", "Bulls"],
    "{game}": ["Hades", "Hollow Knight", "Dave the Diver"],
    "{genre}": ["metroidvania", "roguelike", "farming sim"],
    "{aaa_game}": ["Call of Duty", "Assassin's Creed"],
    "{policy}": ["tax bill", "healthcare reform"],
    "{group}": ["small businesses", "students"],
    "{politician}": ["the president", "senator X"],
    "{issue}": ["climate change", "the economy"],
    "{election}": ["midterm", "primary"],
    "{city}": ["Tokyo", "Rome", "Bangkok"],
    "{food}": ["ramen", "pasta", "pad thai"],
    "{country}": ["Italy", "Thailand", "Argentina"],
    "{activity}": ["hiking", "scuba diving", "visiting museums"],
    "{airline}": ["Ryanair", "Spirit"],
    "{landmark}": ["the Alps", "the coastline"],
    "{dish}": ["beef bourguignon", "a souffle", "pho"],
    "{ingredient}": ["cardamom", "truffle oil"],
    "{technique}": ["sous-vide", "maillard reaction"],
    "{secret}": ["more butter", "patience"],
    "{texture}": ["soggy", "dense"]
}

# --- 3. Helper Functions ---

def get_random_timestamp():
    """Generates a random timestamp within the last 30 days."""
    now = datetime.utcnow()
    delta = timedelta(days=random.randint(0, 30),
                      hours=random.randint(0, 23),
                      minutes=random.randint(0, 59))
    return (now - delta).isoformat() + "Z"

def generate_post_content(personas):
    """
    Picks a persona (preferring the primary one) and
    generates a post by filling in a template.
    """
    # 70% chance to use primary persona, 30% for secondary
    persona = random.choices(personas, weights=[0.7, 0.3], k=1)[0]

    # Get a random template for that persona
    template = random.choice(POST_TEMPLATES[persona])

    # Fill in the blanks
    content = template
    # Find all placeholders in the template
    placeholders = [key for key in TOPIC_FILLERS.keys() if key in content]

    for key in placeholders:
        content = content.replace(key, random.choice(TOPIC_FILLERS[key]), 1)

    return content

# --- 4. Main Script Execution ---

print("--- Starting Stage 2 ---")

# Load users from Stage 1
users_path = f"{DATA_DIR}/users.json"
print(f"Loading users from '{users_path}'...")
try:
    with open(users_path, 'r') as f:
        users = json.load(f)
except FileNotFoundError:
    print(f"ERROR: '{users_path}' not found. Please run Stage 1 first.")
    exit() # Or handle error appropriately in a notebook

# Generate "Original Posts" (Simulated LLM)
all_posts = []
print(f"Generating synthetic posts for {len(users)} users...")

for user in tqdm(users, desc="Generating Posts"):
    user_id = user['user_id']
    personas = user['personas']

    # Each user gets a slightly different number of posts
    num_posts = random.randint(AVG_POSTS_PER_USER - 15, AVG_POSTS_PER_USER + 15)

    for _ in range(num_posts):
        post_content = generate_post_content(personas)

        all_posts.append({
            "post_id": str(uuid.uuid4()),
            "author_id": user_id,
            "content": post_content,
            "created_at": get_random_timestamp()
        })

# Save Posts to JSON
posts_path = f"{DATA_DIR}/posts.json"
print(f"\nSaving {len(all_posts)} posts to '{posts_path}'...")
with open(posts_path, "w") as f:
    json.dump(all_posts, f, indent=2)

# Convert 'follows.edgelist' to 'follows.json' for easier Spark ingestion
edgelist_path = f"{DATA_DIR}/follows.edgelist"
follows_json_path = f"{DATA_DIR}/follows.json"

print(f"Converting '{edgelist_path}' to '{follows_json_path}'...")
try:
    G = nx.read_edgelist(edgelist_path, create_using=nx.DiGraph())
    follows_list = []
    for follower, followed in G.edges():
        follows_list.append({
            "follower_id": follower,
            "followed_id": followed
        })

    with open(follows_json_path, "w") as f:
        json.dump(follows_list, f, indent=2)
    print(f"Successfully converted {len(follows_list)} follow relationships.")

except FileNotFoundError:
    print(f"ERROR: '{edgelist_path}' not found. Did Stage 1 run correctly?")

print("\n--- Stage 2 Complete ---")
print(f"Raw data landing zone '{DATA_DIR}' now contains:")
print(f"- users.json")
print(f"- posts.json")
print(f"- follows.json")

--- Starting Stage 2 ---
Loading users from 'data/users.json'...
Generating synthetic posts for 25 users...


  now = datetime.utcnow()
Generating Posts: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 25/25 [00:00<00:00, 2773.79it/s]


Saving 1024 posts to 'data/posts.json'...
Converting 'data/follows.edgelist' to 'data/follows.json'...
Successfully converted 160 follow relationships.

--- Stage 2 Complete ---
Raw data landing zone 'data' now contains:
- users.json
- posts.json
- follows.json





# Stage 3: PySpark ETL & PostgreSQL Ingestion

In [11]:
DB_NAME = "mmds"
DB_USER = "sadhu"
DB_PASS = "12345678"
DB_HOST = "localhost"
DB_PORT = "5432"

In [12]:
import psycopg2
from psycopg2 import sql

conn_string = f"dbname='{DB_NAME}' user='{DB_USER}' password='{DB_PASS}' host='{DB_HOST}' port='{DB_PORT}'"

# SQL statements to create our schema
create_tables_sql = """
CREATE TABLE IF NOT EXISTS users (
    user_id TEXT PRIMARY KEY,
    username TEXT,
    personas TEXT[]
);

CREATE TABLE IF NOT EXISTS follows (
    follower_id TEXT REFERENCES users(user_id),
    followed_id TEXT REFERENCES users(user_id),
    PRIMARY KEY (follower_id, followed_id)
);

CREATE TABLE IF NOT EXISTS posts (
    post_id TEXT PRIMARY KEY,
    author_id TEXT REFERENCES users(user_id),
    content TEXT,
    cleaned_content TEXT,
    created_at TIMESTAMP
);
"""

try:
    with psycopg2.connect(conn_string) as conn:
        with conn.cursor() as cur:
            cur.execute(create_tables_sql)
            conn.commit()
    print("--- Stage 3.A: Setup Complete ---")
    print("Successfully connected and created tables (if they didn't exist).")

except psycopg2.OperationalError as e:
    print("\n--- ðŸš¨ ERROR: Could not connect to PostgreSQL ---")
    print(f"Error details: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

--- Stage 3.A: Setup Complete ---
Successfully connected and created tables (if they didn't exist).


In [13]:
import psycopg2

conn_string = f"dbname='{DB_NAME}' user='{DB_USER}' password='{DB_PASS}' host='{DB_HOST}' port='{DB_PORT}'"

# TRUNCATE tables in the correct order
truncate_sql = """
TRUNCATE TABLE posts, follows;
TRUNCATE TABLE users RESTART IDENTITY CASCADE;
"""

print("--- Starting Stage 3.B.1: Pre-ETL Table Truncation ---")

try:
    with psycopg2.connect(conn_string) as conn:
        with conn.cursor() as cur:
            cur.execute(truncate_sql)
            conn.commit()
    print("Successfully truncated tables: 'posts', 'follows', and 'users'.")

except Exception as e:
    print(f"--- ðŸš¨ ERROR: Could not truncate tables ---")
    print(f"Error details: {e}")

--- Starting Stage 3.B.1: Pre-ETL Table Truncation ---
Successfully truncated tables: 'posts', 'follows', and 'users'.


In [14]:
import re
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

print("\n--- Starting Stage 3.B.2: PySpark ETL (SQL-Native Functions Only) ---")

# --- 1. Initialize SparkSession ---
spark = (
    SparkSession.builder
    .appName("SocialMediaETL")
    .master("local[*]")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3")
    .getOrCreate()
)

print("SparkSession initialized.")

# --- 2. JDBC Config ---
jdbc_url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"
properties = {"user": DB_USER, "password": DB_PASS, "driver": "org.postgresql.Driver"}
DATA_DIR = "data"

# --- 3. Run ETL ---
try:
    # --- USERS ---
    print("Processing 'users' data...")
    users_df = spark.read.option("multiline", "true").json(f"{DATA_DIR}/users.json")
    users_df.write.jdbc(jdbc_url, "users", mode="append", properties=properties)
    print(f"Successfully loaded {users_df.count()} users into 'users' table.")

    # --- FOLLOWS ---
    print("Processing 'follows' data...")
    follows_df = spark.read.option("multiline", "true").json(f"{DATA_DIR}/follows.json")
    follows_df.write.jdbc(jdbc_url, "follows", mode="append", properties=properties)
    print(f"Successfully loaded {follows_df.count()} relationships into 'follows' table.")

    # --- POSTS ---
    print("Processing 'posts' data (SQL-native functions)...")
    posts_df = spark.read.option("multiline", "true").json(f"{DATA_DIR}/posts.json")

    # --- This is the new, 100% stable cleaning logic ---
    # It uses only Spark's built-in functions. No Python worker is needed.
    cleaned_col = F.lower(F.col("content"))
    cleaned_col = F.regexp_replace(cleaned_col, "http\\S+", "")  # Remove URLs
    cleaned_col = F.regexp_replace(cleaned_col, "[^a-z0-9\\s]", "") # Remove non-alpha
    cleaned_col = F.regexp_replace(cleaned_col, "\\s+", " ")      # Collapse whitespace
    cleaned_col = F.trim(cleaned_col)
    cleaned_col = F.when(cleaned_col == "", None).otherwise(cleaned_col)
    # ----------------------------------------------------

    posts_df_final = posts_df.withColumn("cleaned_content", cleaned_col) \
                              .withColumn("created_at", F.to_timestamp(F.col("created_at"))) \
                              .select("post_id", "author_id", "content", "cleaned_content", "created_at")

    posts_df_final.write.jdbc(jdbc_url, "posts", mode="append", properties=properties)
    print(f"Successfully loaded {posts_df_final.count()} posts into 'posts' table.")

    print("\n--- PySpark ETL Job Complete! ---")

except Exception as e:
    print(f"\n--- ðŸš¨ ERROR during Spark ETL ---")
    print(f"Error details: {e}")

finally:
    spark.stop()
    print("SparkSession stopped.")


--- Starting Stage 3.B.2: PySpark ETL (SQL-Native Functions Only) ---
SparkSession initialized.
Processing 'users' data...
Successfully loaded 25 users into 'users' table.
Processing 'follows' data...
Successfully loaded 160 relationships into 'follows' table.
Processing 'posts' data (SQL-native functions)...
Successfully loaded 1024 posts into 'posts' table.

--- PySpark ETL Job Complete! ---
SparkSession stopped.


# Stage 4: Backend AI Core (Analysis Pipeline)

In [15]:
import psycopg2
# ------------------------------

conn_string = f"dbname='{DB_NAME}' user='{DB_USER}' password='{DB_PASS}' host='{DB_HOST}' port='{DB_PORT}'"

# SQL statements to create our new results tables
create_results_tables_sql = """
CREATE TABLE IF NOT EXISTS user_topics (
    topic_id SERIAL PRIMARY KEY,
    user_id TEXT REFERENCES users(user_id),
    topic_name TEXT,
    summary TEXT,
    post_count INTEGER,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE IF NOT EXISTS post_topic_mapping (
    post_id TEXT REFERENCES posts(post_id),
    topic_id INTEGER REFERENCES user_topics(topic_id),
    PRIMARY KEY (post_id, topic_id)
);
"""

try:
    with psycopg2.connect(conn_string) as conn:
        with conn.cursor() as cur:
            cur.execute(create_results_tables_sql)
            conn.commit()
    print("--- Stage 4.B: Setup Complete ---")
    print("Successfully created 'user_topics' and 'post_topic_mapping' tables.")

except Exception as e:
    print(f"--- ðŸš¨ ERROR: Could not create results tables ---")
    print(f"Error details: {e}")

--- Stage 4.B: Setup Complete ---
Successfully created 'user_topics' and 'post_topic_mapping' tables.


In [16]:
import psycopg2
import pandas as pd
from bertopic import BERTopic
from transformers import pipeline
from psycopg2.extras import execute_values
from tqdm import tqdm


# --- 1. Database Connector Module ---
def get_db_connection():
    """Establishes a new connection to the PostgreSQL database."""
    conn_string = f"dbname='{DB_NAME}' user='{DB_USER}' password='{DB_PASS}' host='{DB_HOST}' port='{DB_PORT}'"
    return psycopg2.connect(conn_string)

def clear_previous_analysis(conn, user_id):
    """
    Clears out old analysis results for a user to prevent duplicates.
    Uses ON DELETE CASCADE from the foreign keys.
    """
    with conn.cursor() as cur:
        # This is the "parent" table. Deleting from here will cascade
        # and delete all mappings from 'post_topic_mapping'.
        cur.execute("DELETE FROM user_topics WHERE user_id = %s", (user_id,))
        conn.commit()
    print(f"Cleared old analysis results for {user_id}.")

# --- 2. Feed Retrieval Logic ---
def get_feed_for_user(conn, user_id):
    """
    Fetches the unique feed for a given user.
    Returns a DataFrame of post_ids and their cleaned_content.
    """
    print(f"Fetching feed for {user_id}...")

    # This query finds all posts written by people the user_id follows
    sql_query = """
    SELECT p.post_id, p.cleaned_content
    FROM posts p
    JOIN follows f ON p.author_id = f.followed_id
    WHERE f.follower_id = %s
      AND p.cleaned_content IS NOT NULL
      AND p.cleaned_content != '';
    """

    # Use pandas to read SQL results directly into a DataFrame
    with conn.cursor() as cur:
        cur.execute(sql_query, (user_id,))
        rows = cur.fetchall()

    df = pd.DataFrame(rows, columns=['post_id', 'cleaned_content'])

    if df.empty:
        print(f"No posts found for {user_id}'s feed.")
        return None

    print(f"Found {len(df)} posts in {user_id}'s feed.")
    return df

# --- 3. AI Core (Topic Modeling & Summarization) ---
def run_analysis_pipeline(user_id):
    """
    Main function to run the full AI analysis pipeline for a user.
    """
    conn = None
    try:
        conn = get_db_connection()

        # Step 0: Clear old data
        clear_previous_analysis(conn, user_id)

        # Step 1: Get the user's feed
        feed_df = get_feed_for_user(conn, user_id)
        if feed_df is None or len(feed_df) < 10: # Min posts for BERTopic
            print(f"Not enough posts ({len(feed_df)}) to analyze. Aborting.")
            return

        posts_list = feed_df['cleaned_content'].tolist()
        post_id_list = feed_df['post_id'].tolist()

        # Step 2: Initialize AI Models
        # BERTopic: Use a simpler model for speed in our notebook
        print("Initializing BERTopic model...")
        topic_model = BERTopic(
            embedding_model="all-MiniLM-L6-v2", # Fast & small
            min_topic_size=5,                     # Find topics with at least 5 posts
            verbose=False
        )

        # Summarizer: Use a small, fast model
        print("Initializing Summarization model (this may take a moment)...")
        summarizer = pipeline(
            "summarization",
            model="sshleifer/distilbart-cnn-6-6", # Small & fast
            truncation=True
        )

        # Step 3: Run Topic Modeling
        print("Running BERTopic.This may take a few minutes...")
        topics, _ = topic_model.fit_transform(posts_list)

        # Add topic assignments back to our DataFrame
        feed_df['topic'] = topics

        # Get topic info (names, post counts)
        # We'll analyze topics -1 (outliers), 0, 1, 2...
        topic_info_df = topic_model.get_topic_info()
        print(f"BERTopic found {len(topic_info_df) - 1} topics.")

        # --- Step 4: Summarize and Save Each Topic ---
        print("Summarizing topics and saving results...")

        # We'll collect all data to insert in batches
        all_post_topic_mappings = []

        # Use tqdm for a nice progress bar
        for _, topic_row in tqdm(topic_info_df.iterrows(), total=topic_info_df.shape[0]):
            topic_num = topic_row['Topic']
            topic_name = topic_row['Name']

            # Skip the outlier topic (-1)
            if topic_num == -1:
                continue

            # Get all posts for this topic
            topic_posts_df = feed_df[feed_df['topic'] == topic_num]
            topic_posts = topic_posts_df['cleaned_content'].tolist()
            post_count = len(topic_posts)

            # Combine all posts into one document for the summarizer
            doc_to_summarize = " . ".join(topic_posts)

            # Ensure doc is not too long for the model (e.g., ~1024 tokens)
            # A simple way is to cap by character length
            max_chars = 4000
            if len(doc_to_summarize) > max_chars:
                doc_to_summarize = doc_to_summarize[:max_chars]

            # Run summarization
            summary_result = summarizer(doc_to_summarize, min_length=15, max_length=50)
            summary_text = summary_result[0]['summary_text']

            # Save topic to DB and get its new primary key (topic_id)
            with conn.cursor() as cur:
                cur.execute(
                    """
                    INSERT INTO user_topics (user_id, topic_name, summary, post_count)
                    VALUES (%s, %s, %s, %s)
                    RETURNING topic_id;
                    """,
                    (user_id, topic_name, summary_text, post_count)
                )
                new_topic_id = cur.fetchone()[0]
                conn.commit()

            # Collect the post_id <-> topic_id mappings
            for post_id in topic_posts_df['post_id'].tolist():
                all_post_topic_mappings.append((post_id, new_topic_id))

        # Step 5: Batch-insert all post-topic mappings
        print(f"Saving {len(all_post_topic_mappings)} post-topic mappings...")
        with conn.cursor() as cur:
            execute_values(
                cur,
                "INSERT INTO post_topic_mapping (post_id, topic_id) VALUES %s",
                all_post_topic_mappings
            )
            conn.commit()

        print("\n--- Stage 4: Analysis Complete! ---")
        print(f"Successfully analyzed and saved results for {user_id}.")

    except (Exception, psycopg2.Error) as error:
        print(f"--- ðŸš¨ ERROR in Stage 4 ---")
        print(f"Error details: {error}")
        if conn:
            conn.rollback() # Roll back any failed transactions

    finally:
        if conn:
            conn.close()
            print("Database connection closed.")

# --- 4.D: Example Usage ---
# Pick a user_id to analyze. 'user_1' is a good test.
USER_TO_ANALYZE = 'user_1'
run_analysis_pipeline(USER_TO_ANALYZE)

Cleared old analysis results for user_1.
Fetching feed for user_1...
Found 257 posts in user_1's feed.
Initializing BERTopic model...
Initializing Summarization model (this may take a moment)...


Device set to use cuda:0


Running BERTopic.This may take a few minutes...
BERTopic found 23 topics.
Summarizing topics and saving results...


 46%|â–ˆâ–ˆâ–ˆâ–ˆâ–Œ     | 11/24 [00:04<00:04,  2.84it/s]You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset
100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 24/24 [00:08<00:00,  2.76it/s]

Saving 254 post-topic mappings...

--- Stage 4: Analysis Complete! ---
Successfully analyzed and saved results for user_1.
Database connection closed.





In [17]:
import psycopg2
from IPython.display import clear_output, display
import pandas as pd

def get_db_connection():
    """Establishes a new connection to the PostgreSQL database."""
    conn_string = f"dbname='{DB_NAME}' user='{DB_USER}' password='{DB_PASS}' host='{DB_HOST}' port='{DB_PORT}'"
    return psycopg2.connect(conn_string)

def list_all_users():
    """Fetches and displays all users from the database."""
    print("--- ðŸ‘¤ All Users ---")
    conn = None
    try:
        conn = get_db_connection()
        # Use pandas to pretty-print the user list
        df = pd.read_sql("SELECT user_id, username, personas FROM users ORDER BY user_id", conn)
        display(df) # Use display() for nice notebook formatting
    except Exception as e:
        print(f"ðŸš¨ Error listing users: {e}")
    finally:
        if conn:
            conn.close()

def view_results_for_user(user_id):
    """Fetches and displays the last analysis results for a user."""
    print(f"\n--- ðŸ“ˆ Viewing Analysis for: {user_id} ---")
    conn = None
    try:
        conn = get_db_connection()
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT topic_name, summary, post_count
                FROM user_topics
                WHERE user_id = %s
                ORDER BY post_count DESC
                """,
                (user_id,)
            )
            results = cur.fetchall()

            if not results:
                print(f"No results found for {user_id}.")
                print("Please run an analysis (Option 2) for this user first.")
                return

            for i, (topic_name, summary, post_count) in enumerate(results):
                print("\n" + "=" * 40)
                print(f"   TOPIC {i}: {topic_name.split('_', 1)[1]}")
                print(f"   POSTS: {post_count}")
                print(f"   SUMMARY: {summary}")
            print("=" * 40)

    except Exception as e:
        print(f"ðŸš¨ Error viewing results: {e}")
    finally:
        if conn:
            conn.close()

print("Stage 5.A: UI Helper functions defined.")

Stage 5.A: UI Helper functions defined.


In [19]:
# This cell runs the main application loop.
# It requires the functions from Stage 4.C and Stage 5.A to be in memory.

def main_menu():
    """The main interactive console loop."""
    while True:
        clear_output(wait=True) # Clears the output for a clean menu
        print("=" * 50)
        print("  ðŸ“Š Social Media Topic & Summary Tool ðŸ“Š")
        print("=" * 50)
        print("1. List All Users")
        print("2. Run New Analysis for a User")
        print("3. View Last Analysis for a User")
        print("4. Exit")
        print("-" * 50)

        choice = input("Enter your choice (1-4): ")

        if choice == '1':
            list_all_users()
            input("\nPress Enter to return to the menu...")

        elif choice == '2':
            user_id = input("Enter User ID to analyze (e.g., user_1): ")
            try:
                # This calls the function from your Stage 4 cell
                print(f"Starting new analysis for {user_id}...")
                run_analysis_pipeline(user_id)
                print(f"\nâœ… Analysis complete for {user_id}!")
            except Exception as e:
                print(f"ðŸš¨ An error occurred during analysis: {e}")
            input("\nPress Enter to return to the menu...")

        elif choice == '3':
            user_id = input("Enter User ID to view (e.g., user_1): ")
            view_results_for_user(user_id)
            input("\nPress Enter to return to the menu...")

        elif choice == '4':
            print("Exiting. Goodbye!")
            break

        else:
            input("Invalid choice. Press Enter to try again...")

# --- Run the application ---
main_menu()

KeyboardInterrupt: Interrupted by user