# Hybrid Topic Discovery & Classification with AWS Integration

**Purpose**: Classify questions against existing topics and discover new topics using hybrid approach.

**Data Flow**:
1. Load topics from Google Sheets
2. Load student questions from Langfuse CSV
3. Similarity classification (threshold-based)
4. Clustering for new topic discovery
5. Output parquet files to AWS S3

**Key Features**:
- AWS S3 for embeddings cache and outputs
- Environment-responsive configuration
- Comprehensive error logging
- Analytics outputs for Streamlit dashboard

## Install Dependencies

In [None]:
!pip install -q openai pandas numpy scipy scikit-learn matplotlib seaborn tqdm umap-learn hdbscan bertopic backoff boto3 gspread oauth2client pyarrow fastparquet python-dotenv

[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/154.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m154.7/154.7 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/140.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m140.6/140.6 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.8/1.8 MB[0m [31m38.3 MB/s[0m eta [36m0:00:00[0m
[2K   [9

## Configuration

In [None]:
# Processing settings
EVAL_MODE = "all"  # "sample" or "all"
SAMPLE_SIZE = 1000
SIMILARITY_THRESHOLD = 0.70
REPRESENTATIVE_QUESTION_METHOD = "centroid"  # "centroid" or "frequent"

# Model settings
EMBEDDING_MODEL = "text-embedding-3-small"
EMBEDDING_DIMENSIONS = 1536
GPT_MODEL = "gpt-5-nano"

# AWS S3 settings
S3_BUCKET = "byupathway-public"
S3_OUTPUT_PREFIX = "topic-modeling-data"
S3_CACHE_PREFIX = "embeddings-cache"
S3_REGION = "us-east-1"

# Embedding storage settings
EMBEDDING_STORAGE = "local"  # "s3" or "local"
LOCAL_CACHE_DIR = "./embedding_cache"  # Directory for local embedding storage

# Clustering settings
UMAP_N_COMPONENTS = 5
HDBSCAN_MIN_CLUSTER_SIZE = 3
RANDOM_SEED = 42

# Google Sheets URL (This should come from Elder Edwards)
GOOGLE_SHEETS_URL = "https://docs.google.com/spreadsheets/d/1aX7ILPVAU_9MsliuXMeDstzWz5DqPLAY5LNBfV6l_NQ/"

print("‚úÖ Configuration loaded")
print(f"   Mode: {EVAL_MODE}, Threshold: {SIMILARITY_THRESHOLD}")
print(f"   S3 Bucket: {S3_BUCKET}")
print(f"   Embedding Model: {EMBEDDING_MODEL}")

‚úÖ Configuration loaded
   Mode: all, Threshold: 0.7
   S3 Bucket: byupathway-public
   Embedding Model: text-embedding-3-small


## Environment Setup

In [None]:
import os
import pandas as pd
import numpy as np
from scipy.spatial.distance import cosine
import json
import pickle
from pathlib import Path
import time
from datetime import datetime
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict, Tuple, Optional, Any
import asyncio
import backoff
import re
import hashlib
import boto3
from botocore.exceptions import ClientError
import gspread
from oauth2client.service_account import ServiceAccountCredentials

# Detect environment
try:
    import google.colab
    IN_COLAB = True
    from google.colab import userdata
    print("üîß Running in Google Colab")
except ImportError:
    IN_COLAB = False
    from dotenv import load_dotenv
    load_dotenv()
    print("üîß Running locally")

# Load credentials
if IN_COLAB:
    OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
    AWS_ACCESS_KEY = userdata.get('AWS_ACCESS_KEY_ID')
    AWS_SECRET_KEY = userdata.get('AWS_SECRET_ACCESS_KEY')
    GOOGLE_SERVICE_ACCOUNT = userdata.get('GOOGLE_SERVICE_ACCOUNT_JSON')
else:
    OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
    AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID')
    AWS_SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
    GOOGLE_SERVICE_ACCOUNT = os.getenv('GOOGLE_SERVICE_ACCOUNT_JSON')

# Initialize OpenAI
from openai import OpenAI, AsyncOpenAI
client = OpenAI(api_key=OPENAI_API_KEY)
async_client = AsyncOpenAI(api_key=OPENAI_API_KEY)

# Initialize AWS S3
s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=S3_REGION
)

print("‚úÖ Environment setup complete")

üîß Running in Google Colab
‚úÖ Environment setup complete


## Error Logger

In [None]:
class ErrorLogger:
    def __init__(self):
        self.errors = []
        self.warnings = []
        self.rows_dropped = []
        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    def log_error(self, stage: str, message: str, data: Any = None):
        entry = {"timestamp": datetime.now().isoformat(), "stage": stage, "message": message, "data": str(data)}
        self.errors.append(entry)
        print(f"‚ùå ERROR [{stage}]: {message}")

    def log_warning(self, stage: str, message: str, data: Any = None):
        entry = {"timestamp": datetime.now().isoformat(), "stage": stage, "message": message, "data": str(data)}
        self.warnings.append(entry)
        print(f"‚ö†Ô∏è  WARNING [{stage}]: {message}")

    def log_dropped_row(self, stage: str, reason: str, row_data: Any):
        entry = {"timestamp": datetime.now().isoformat(), "stage": stage, "reason": reason, "row_data": str(row_data)}
        self.rows_dropped.append(entry)

    def get_summary(self):
        return {
            "total_errors": len(self.errors),
            "total_warnings": len(self.warnings),
            "total_dropped_rows": len(self.rows_dropped),
            "errors": self.errors,
            "warnings": self.warnings,
            "dropped_rows": self.rows_dropped
        }

    def save_to_file(self, filename: str):
        with open(filename, 'w') as f:
            json.dump(self.get_summary(), f, indent=2)
        print(f"üìù Error log saved: {filename}")
        return filename

error_logger = ErrorLogger()
print("‚úÖ Error logger initialized")

‚úÖ Error logger initialized


## AWS S3 Utilities

In [None]:
@backoff.on_exception(
    backoff.expo,
    Exception,
    max_tries=5,
    max_time=30,
    giveup=lambda e: isinstance(e, (KeyboardInterrupt, SystemExit))
)
def upload_to_s3(local_path: str, s3_key: str, public: bool = True) -> bool:
    """Upload file to S3 with retry logic and exponential backoff

    Args:
        local_path: Local file path to upload
        s3_key: S3 key (path in bucket)
        public: Whether to set public-read ACL (default True)

    Returns:
        bool: True if successful, False otherwise
    """
    try:
        extra_args = {'ACL': 'public-read'} if public else {}
        s3_client.upload_file(local_path, S3_BUCKET, s3_key, ExtraArgs=extra_args)

        if public:
            url = f"https://{S3_BUCKET}.s3.amazonaws.com/{s3_key}"
            print(f"‚úÖ Uploaded to S3: {url}")
        else:
            print(f"‚úÖ Uploaded to S3: s3://{S3_BUCKET}/{s3_key}")

        return True
    except Exception as e:
        error_logger.log_error("S3_Upload", f"Failed to upload {local_path} after retries", e)
        return False

@backoff.on_exception(
    backoff.expo,
    Exception,
    max_tries=3,
    max_time=15,
    giveup=lambda e: isinstance(e, (KeyboardInterrupt, SystemExit))
)
def download_from_s3(s3_key: str, local_path: str) -> bool:
    """Download file from S3 with retry logic"""
    try:
        s3_client.download_file(S3_BUCKET, s3_key, local_path)
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            # File doesn't exist - don't retry
            return False
        error_logger.log_error("S3_Download", f"Failed to download {s3_key} after retries", e)
        return False

def delete_s3_folder(prefix: str):
    """Delete all objects with given prefix"""
    try:
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=prefix)
        if 'Contents' in response:
            objects = [{'Key': obj['Key']} for obj in response['Contents']]
            s3_client.delete_objects(Bucket=S3_BUCKET, Delete={'Objects': objects})
            print(f"üóëÔ∏è  Deleted {len(objects)} objects from s3://{S3_BUCKET}/{prefix}")
    except Exception as e:
        error_logger.log_error("S3_Delete", f"Failed to delete folder {prefix}", e)

print("‚úÖ AWS S3 utilities ready (with retry logic)")

‚úÖ AWS S3 utilities ready (with retry logic)


# Master File Sync System

In [None]:
# S3 Master file settings
S3_MASTER_KEY = "langfuse/langfuse_traces_master.parquet"

def sync_master_file(new_data_df: pd.DataFrame) -> pd.DataFrame:
    """
    Sync master file with S3 storage

    Flow:
    1. Download master file from S3 (if exists)
    2. Merge new data with master
    3. Remove duplicates (question/input + timestamp)
    4. Upload updated master back to S3

    Args:
        new_data_df: New data to merge with master

    Returns:
        pd.DataFrame: Complete master dataframe
    """

    print("\nüîÑ Syncing master file with S3...")

    local_master_path = "/tmp/langfuse_traces_master.parquet"
    master_df = None

    # Step 1: Try to load from S3
    print("üìÇ Checking S3 for master file...")
    if download_from_s3(S3_MASTER_KEY, local_master_path):
        try:
            master_df = pd.read_parquet(local_master_path)
            print(f"‚úÖ Loaded master from S3: {len(master_df)} rows")
            print(f"   Timestamp range: {master_df['timestamp'].min()} to {master_df['timestamp'].max()}")
        except Exception as e:
            error_logger.log_error("MasterSync", "Failed to read master from S3", e)
            master_df = None

    # Step 2: Merge new data with master
    if master_df is None:
        print("üìù No existing master file found - creating new one")
        merged_df = new_data_df.copy()
    else:
        print(f"üîÄ Merging new data ({len(new_data_df)} rows) with master ({len(master_df)} rows)...")

        # Ensure both have same columns
        for col in new_data_df.columns:
            if col not in master_df.columns:
                master_df[col] = None
        for col in master_df.columns:
            if col not in new_data_df.columns:
                new_data_df[col] = None

        # Combine
        merged_df = pd.concat([master_df, new_data_df], ignore_index=True)
        print(f"   Combined: {len(merged_df)} rows")

    # Step 3: Remove duplicates (question/input + timestamp REQUIRED, id optional)
    print("üßπ Removing duplicates (same question + timestamp)...")
    before_dedup = len(merged_df)

    # Determine question column name (could be 'question' or 'input')
    question_col = 'question' if 'question' in merged_df.columns else 'input' if 'input' in merged_df.columns else None

    # Check for required columns
    if question_col is None or 'timestamp' not in merged_df.columns:
        print("   ‚ö†Ô∏è  Warning: Missing required columns (question/input and timestamp) for deduplication")
    else:
        # Prefer rows with richer feedback metadata when duplicate keys exist
        priority_cols = []
        if 'feedback_comment' in merged_df.columns:
            merged_df['_has_feedback_comment'] = merged_df['feedback_comment'].notna() & (merged_df['feedback_comment'].astype(str).str.strip() != '')
            priority_cols.append('_has_feedback_comment')
        if 'user_feedback' in merged_df.columns:
            merged_df['_has_user_feedback'] = merged_df['user_feedback'].notna() & (merged_df['user_feedback'].astype(str).str.strip() != '')
            priority_cols.append('_has_user_feedback')

        if priority_cols:
            merged_df = merged_df.sort_values(by=priority_cols, ascending=False)

        # Deduplicate by question/input + timestamp only
        # keep='first' now preserves the richest row after priority sort
        merged_df = merged_df.drop_duplicates(subset=[question_col, 'timestamp'], keep='first')

        # Cleanup temp priority columns
        for col in ['_has_feedback_comment', '_has_user_feedback']:
            if col in merged_df.columns:
                merged_df = merged_df.drop(columns=[col])

        after_dedup = len(merged_df)
        print(f"   Removed {before_dedup - after_dedup} duplicates")
        print(f"   Deduplication criteria: {question_col} + timestamp (preferring richer feedback metadata)")
        print(f"   Final: {len(merged_df)} rows")

    # Sort by timestamp (newest first)
    if 'timestamp' in merged_df.columns:
        merged_df = merged_df.sort_values('timestamp', ascending=False).reset_index(drop=True)

    # Step 4: Upload to S3
    print("‚òÅÔ∏è  Uploading to S3...")
    merged_df.to_parquet(local_master_path)
    s3_success = upload_to_s3(local_master_path, S3_MASTER_KEY, public=False)

    # Clean up temp file
    try:
        os.unlink(local_master_path)
    except:
        pass

    if s3_success:
        print(f"‚úÖ Master file synced: {len(merged_df)} total rows")
    else:
        print(f"‚ùå S3 upload failed - master file not backed up!")
        error_logger.log_error("MasterSync", "Failed to upload master file to S3", None)

    return merged_df

print("‚úÖ Master file sync system ready (S3-only)")
print(f"   Storage: s3://{S3_BUCKET}/{S3_MASTER_KEY}")

‚úÖ Master file sync system ready (S3-only)
   Storage: s3://byupathway-public/langfuse/langfuse_traces_master.parquet


## Google Sheets Integration

In [None]:
def read_topics_from_google_sheets(sheet_url: str) -> pd.DataFrame:
    """Read topics from Google Sheets with flexible column handling"""
    try:
        creds_dict = json.loads(GOOGLE_SERVICE_ACCOUNT)
        scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
        creds = ServiceAccountCredentials.from_json_keyfile_dict(creds_dict, scope)
        gc = gspread.authorize(creds)

        sheet = gc.open_by_url(sheet_url)
        worksheet = sheet.get_worksheet(0)
        data = worksheet.get_all_records()
        df = pd.DataFrame(data)

        # Handle both uppercase and lowercase column names with flexible matching
        column_mapping = {}
        for col in df.columns:
            col_lower = col.lower().strip()
            # Match topic/topics
            if col_lower in ['topics', 'topic']:
                column_mapping[col] = 'topic'
            # Match subtopic/subtopics
            elif col_lower in ['subtopics', 'subtopic']:
                column_mapping[col] = 'subtopic'
            # Match question/questions/representative question/representative questions
            elif col_lower in ['questions', 'question', 'representative question', 'representative questions']:
                column_mapping[col] = 'question'

        df = df.rename(columns=column_mapping)

        required = ['topic', 'subtopic', 'question']
        if not all(col in df.columns for col in required):
            raise ValueError(f"Missing required columns. Found: {list(df.columns)}")

        df = df[required].dropna()
        print(f"‚úÖ Loaded {len(df)} topics from Google Sheets")
        print(f"   Unique topics: {df['topic'].nunique()}, Unique subtopics: {df['subtopic'].nunique()}")
        return df

    except Exception as e:
        error_logger.log_error("GoogleSheets", "Failed to read topics", e)
        raise

print("‚úÖ Google Sheets integration ready")

‚úÖ Google Sheets integration ready


## Langfuse Data Cleaning

In [None]:
def filter_malformed_rows(df: pd.DataFrame) -> pd.DataFrame:
    """
    Filter out malformed rows containing both 'kwargs' AND 'args' (case-insensitive)
    This is the FIRST step in data cleaning to remove Langfuse error rows
    """
    print(f"üîç Filtering malformed rows (containing 'kwargs' AND 'args')...")
    before_filter = len(df)

    # Convert all columns to string and check for both kwargs and args (case-insensitive)
    df_str = df.astype(str)

    # Check each row for both terms
    malformed_mask = df_str.apply(
        lambda row: any('kwargs' in str(val).lower() for val in row) and
                   any('args' in str(val).lower() for val in row),
        axis=1
    )

    # Keep only rows that are NOT malformed
    filtered_df = df[~malformed_mask].copy()

    after_filter = len(filtered_df)
    filtered_count = before_filter - after_filter

    print(f"‚úÖ Filtered {filtered_count} malformed rows ({filtered_count/before_filter*100:.1f}%)")
    print(f"   Remaining: {after_filter} rows")

    # Log dropped rows
    for idx in df[malformed_mask].index:
        error_logger.log_dropped_row(
            "MalformedFilter",
            "Row contains both 'kwargs' and 'args' - likely Langfuse error",
            {"index": idx}
        )

    return filtered_df

def detect_acm_question(question: str) -> bool:
    """Detect if question has ACM prefix"""
    if not isinstance(question, str):
        return False

    pattern = r'^\s*\(ACMs?\s+[Qq]uestion\)\s*:?\s*'
    return bool(re.search(pattern, question, flags=re.IGNORECASE))

print("‚úÖ Malformed row filter and ACM detector ready")


def clean_langfuse_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Clean and validate Langfuse data with comprehensive error handling.

    Filters out non-chat traces (e.g. general_feedback) and extracts all
    available fields including latency, cost, session_id, user_id, tags, scores.

    Returns:
        Tuple of (cleaned_questions_df, general_feedback_df)
    """

    print(f"üßπ Cleaning {len(df)} Langfuse rows...")

    # Separate general feedback from chat traces BEFORE cleaning
    general_feedback_rows = []
    name_col_exists = 'name' in df.columns

    if name_col_exists:
        non_chat_mask = df['name'].notna() & (df['name'] != 'chat')
        non_chat_df = df[non_chat_mask]
        chat_df = df[~non_chat_mask]
        print(f"   Separated {len(non_chat_df)} non-chat traces (general feedback, etc.)")
        print(f"   Processing {len(chat_df)} chat traces...")

        # Collect general feedback entries
        for _, row in non_chat_df.iterrows():
            fb_entry = {
                'id': row.get('id', None),
                'name': row.get('name', None),
                'input': row.get('input', None),
                'output': row.get('output', None),
                'timestamp': row.get('timestamp', None),
                'user_id': row.get('user_id', None),
                'session_id': row.get('session_id', None),
                'tags': row.get('tags', None),
            }
            general_feedback_rows.append(fb_entry)
    else:
        chat_df = df
        print("   No 'name' column found - processing all rows as chat traces")

    general_feedback_df = pd.DataFrame(general_feedback_rows) if general_feedback_rows else pd.DataFrame()
    if not general_feedback_df.empty:
        print(f"   üìù General feedback entries: {len(general_feedback_df)}")

    cleaned_rows = []

    for idx, row in enumerate(chat_df.itertuples(index=False), 1):
        try:
            cleaned_row = {}

            # 0. Extract id (needed for deduplication)
            id_value = getattr(row, 'id', None)
            cleaned_row['id'] = id_value if pd.notna(id_value) else None

            # 1. Extract timestamp
            timestamp_value = getattr(row, 'timestamp', None)
            if pd.notna(timestamp_value):
                try:
                    # Handle different timestamp formats
                    if isinstance(timestamp_value, str):
                        # Try parsing as ISO format first
                        cleaned_row['timestamp'] = pd.to_datetime(timestamp_value).isoformat()
                    else:
                        cleaned_row['timestamp'] = pd.to_datetime(timestamp_value).isoformat()
                except (ValueError, TypeError):
                    # If parsing fails, try to extract from kwargs
                    if hasattr(row, 'kwargs') and pd.notna(getattr(row, 'kwargs', None)):
                        kwargs_str = str(getattr(row, 'kwargs', ''))
                        # Look for timestamp in kwargs
                        ts_match = re.search(r'"timestamp"\s*:\s*"([^"]*)"', kwargs_str)
                        if ts_match:
                            try:
                                cleaned_row['timestamp'] = pd.to_datetime(ts_match.group(1)).isoformat()
                            except:
                                cleaned_row['timestamp'] = None
                        else:
                            cleaned_row['timestamp'] = None
                    else:
                        cleaned_row['timestamp'] = None
            else:
                cleaned_row['timestamp'] = None

            # 2. Extract input (malformed rows already filtered out)
            input_value = getattr(row, 'input', None)

            # Validate input is not empty
            if pd.isna(input_value) or str(input_value).strip() == '':
                error_logger.log_dropped_row("LangfuseClean", "Missing input", {"index": idx})
                continue

            cleaned_row['input'] = str(input_value).strip()

            # Detect if this is an ACM question (before cleaning the prefix)
            cleaned_row['is_acm_question'] = detect_acm_question(cleaned_row['input'])

            # 3. Extract output (no kwargs handling needed)
            output_value = getattr(row, 'output', None)
            cleaned_row['output'] = output_value if pd.notna(output_value) else None

            # 4. Extract feedback label and reason (new Langfuse format)
            feedback_label_raw = getattr(row, 'feedback_value', None)
            legacy_feedback_raw = getattr(row, 'user_feedback', None)

            # Prefer feedback_value (Good/Bad) when available
            if pd.notna(feedback_label_raw) and str(feedback_label_raw).strip() != '':
                cleaned_row['user_feedback'] = str(feedback_label_raw).strip()
            elif pd.notna(legacy_feedback_raw) and str(legacy_feedback_raw).strip() != '':
                legacy_text = str(legacy_feedback_raw).strip()
                cleaned_row['user_feedback'] = legacy_text.split(':', 1)[0].strip() if ':' in legacy_text else legacy_text
            else:
                cleaned_row['user_feedback'] = None

            # Feedback reason/comment (used for thumbs-down explanation)
            feedback_comment_raw = getattr(row, 'feedback_comment', None)
            if pd.notna(feedback_comment_raw) and str(feedback_comment_raw).strip() != '':
                cleaned_row['feedback_comment'] = str(feedback_comment_raw).strip()
            elif pd.notna(legacy_feedback_raw) and ':' in str(legacy_feedback_raw):
                legacy_reason = str(legacy_feedback_raw).split(':', 1)[1].strip()
                cleaned_row['feedback_comment'] = legacy_reason if legacy_reason else None
            else:
                cleaned_row['feedback_comment'] = None

            # 5. Extract latency (seconds)
            latency_value = getattr(row, 'latency', None)
            if pd.notna(latency_value):
                try:
                    cleaned_row['latency'] = float(latency_value)
                except (ValueError, TypeError):
                    cleaned_row['latency'] = None
            else:
                cleaned_row['latency'] = None

            # 6. Extract total_cost
            cost_value = getattr(row, 'total_cost', None)
            if pd.notna(cost_value):
                try:
                    cleaned_row['total_cost'] = float(cost_value)
                except (ValueError, TypeError):
                    cleaned_row['total_cost'] = None
            else:
                cleaned_row['total_cost'] = None

            # 7. Extract session_id
            session_value = getattr(row, 'session_id', None)
            cleaned_row['session_id'] = session_value if pd.notna(session_value) else None

            # 8. Extract user_id
            user_id_value = getattr(row, 'user_id', None)
            cleaned_row['user_id'] = user_id_value if pd.notna(user_id_value) else None

            # 9. Extract tags (stored as JSON string list)
            tags_value = getattr(row, 'tags', None)
            if pd.notna(tags_value) and str(tags_value).strip() not in ('', '[]'):
                cleaned_row['tags'] = str(tags_value)
            else:
                cleaned_row['tags'] = None

            # 10. Extract scores (stored as JSON string list)
            scores_value = getattr(row, 'scores', None)
            if pd.notna(scores_value) and str(scores_value).strip() not in ('', '[]'):
                cleaned_row['scores'] = str(scores_value)
            else:
                cleaned_row['scores'] = None

            # 11. Extract release (git commit hash)
            release_value = getattr(row, 'release', None)
            cleaned_row['release'] = release_value if pd.notna(release_value) else None

            # 12. Parse metadata JSON for country, state, city, language, is_suspicious, role
            metadata_value = getattr(row, 'metadata', None)
            if pd.notna(metadata_value) and metadata_value != '':
                try:
                    metadata = json.loads(metadata_value) if isinstance(metadata_value, str) else metadata_value

                    # Extract geographic data (handle both flat and nested geo_data formats)
                    geo_data = metadata.get('geo_data', {})
                    if isinstance(geo_data, dict) and geo_data:
                        cleaned_row['country'] = geo_data.get('country')
                        cleaned_row['state'] = geo_data.get('state')
                        cleaned_row['city'] = geo_data.get('city')
                    else:
                        # Fallback to flat metadata keys
                        cleaned_row['country'] = metadata.get('country')
                        cleaned_row['state'] = metadata.get('state')
                        cleaned_row['city'] = metadata.get('city')

                    cleaned_row['user_language'] = metadata.get('user_language')
                    cleaned_row['role'] = metadata.get('role')

                    # Extract is_suspicious from security_validation
                    security_val = metadata.get('security_validation', {})
                    if isinstance(security_val, dict):
                        cleaned_row['is_suspicious'] = security_val.get('is_suspicious', False)
                    else:
                        cleaned_row['is_suspicious'] = False

                except (json.JSONDecodeError, TypeError, AttributeError) as e:
                    # Metadata parsing failed, set nulls
                    error_logger.log_error("LangfuseClean", f"Failed to parse metadata at row {idx}", e)
                    cleaned_row['country'] = None
                    cleaned_row['state'] = None
                    cleaned_row['city'] = None
                    cleaned_row['user_language'] = None
                    cleaned_row['role'] = None
                    cleaned_row['is_suspicious'] = False
            else:
                # No metadata, set nulls
                cleaned_row['country'] = None
                cleaned_row['state'] = None
                cleaned_row['city'] = None
                cleaned_row['user_language'] = None
                cleaned_row['role'] = None
                cleaned_row['is_suspicious'] = False

            cleaned_rows.append(cleaned_row)

        except Exception as e:
            error_logger.log_dropped_row("LangfuseClean", f"Unexpected error: {e}", {"index": idx})
            continue

    cleaned_df = pd.DataFrame(cleaned_rows)

    # Remove duplicates (same timestamp and input)
    if 'timestamp' in cleaned_df.columns and 'input' in cleaned_df.columns:
        before_dedup = len(cleaned_df)
        cleaned_df = cleaned_df.drop_duplicates(subset=['timestamp', 'input'], keep='first')
        after_dedup = len(cleaned_df)
        if before_dedup > after_dedup:
            print(f"üóëÔ∏è  Removed {before_dedup - after_dedup} duplicate rows (same timestamp + input)")

    # Clean question prefixes (ACM Question)
    cleaned_df['input'] = cleaned_df['input'].apply(lambda x: clean_question(x) if pd.notna(x) else x)

    print(f"‚úÖ Cleaned data: {len(cleaned_df)} rows ({len(chat_df) - len(cleaned_df)} dropped)")
    print(f"   Columns: {list(cleaned_df.columns)}")
    print(f"   Country data: {cleaned_df['country'].notna().sum()} rows with country info")
    print(f"   With latency: {cleaned_df['latency'].notna().sum()}, With cost: {cleaned_df['total_cost'].notna().sum()}")
    print(f"   With session_id: {cleaned_df['session_id'].notna().sum()}, With user_id: {cleaned_df['user_id'].notna().sum()}")

    return cleaned_df, general_feedback_df

print("‚úÖ Langfuse cleaning utilities ready")

‚úÖ Malformed row filter and ACM detector ready
‚úÖ Langfuse cleaning utilities ready


## Question Preprocessing

In [None]:
def clean_question(question: str) -> str:
    """Remove ACM prefixes and clean text"""
    if not isinstance(question, str):
        return str(question) if question is not None else ""

    pattern = r'^\s*\(ACMs?\s+[Qq]uestion\)\s*:?\s*'
    cleaned = re.sub(pattern, '', question, flags=re.IGNORECASE).strip()
    return cleaned if cleaned else question

def preprocess_dataframe(df: pd.DataFrame, question_col: str) -> pd.DataFrame:
    """Apply cleaning to question column"""
    df = df.copy()
    df[question_col] = df[question_col].apply(clean_question)
    return df

print("‚úÖ Question preprocessing ready")

‚úÖ Question preprocessing ready


## S3 Embeddings Cache

In [None]:
def get_cache_key(text: str, model: str) -> str:
    """Generate S3 cache key for text"""
    text_hash = hashlib.md5(text.encode()).hexdigest()[:12]
    return f"{S3_CACHE_PREFIX}/{model}/{text_hash}.pkl"

def load_embedding_from_s3(text: str, model: str) -> Optional[List[float]]:
    """Load cached embedding from S3"""
    cache_key = get_cache_key(text, model)
    local_path = f"/tmp/{cache_key.split('/')[-1]}"

    if download_from_s3(cache_key, local_path):
        try:
            with open(local_path, 'rb') as f:
                embedding = pickle.load(f)
            # Clean up temp file after reading
            try:
                os.unlink(local_path)
            except:
                pass
            return embedding
        except Exception as e:
            # Clean up corrupted cache file
            try:
                os.unlink(local_path)
            except:
                pass
    return None

def save_embedding_to_s3(text: str, model: str, embedding: List[float]):
    """Save embedding to S3 cache with retry logic"""
    cache_key = get_cache_key(text, model)
    local_path = f"/tmp/{cache_key.split('/')[-1]}"

    try:
        # Write to local temp file
        with open(local_path, 'wb') as f:
            pickle.dump(embedding, f)

        # Upload to S3 (with retry logic from upload_to_s3)
        # Use public=False for cache files (no need for public access)
        success = upload_to_s3(local_path, cache_key, public=False)

        # Clean up temp file
        try:
            os.unlink(local_path)
        except:
            pass

        return success
    except Exception as e:
        # Clean up on failure
        try:
            if os.path.exists(local_path):
                os.unlink(local_path)
        except:
            pass
        # Don't log - cache failures are expected and handled gracefully
        return False

def get_local_cache_path(text: str, model: str) -> str:
    """Generate local cache path for text"""
    text_hash = hashlib.md5(text.encode()).hexdigest()[:12]
    os.makedirs(LOCAL_CACHE_DIR, exist_ok=True)
    return f"{LOCAL_CACHE_DIR}/{model}_{text_hash}.pkl"

def load_embedding_from_local(text: str, model: str) -> Optional[List[float]]:
    """Load cached embedding from local storage"""
    cache_path = get_local_cache_path(text, model)

    if os.path.exists(cache_path):
        try:
            with open(cache_path, 'rb') as f:
                embedding = pickle.load(f)
            return embedding
        except Exception as e:
            # Clean up corrupted cache file
            try:
                os.unlink(cache_path)
            except:
                pass
    return None

def save_embedding_to_local(text: str, model: str, embedding: List[float]):
    """Save embedding to local cache"""
    cache_path = get_local_cache_path(text, model)

    try:
        with open(cache_path, 'wb') as f:
            pickle.dump(embedding, f)
        return True
    except Exception as e:
        return False

def load_embedding(text: str, model: str) -> Optional[List[float]]:
    """Load cached embedding based on storage setting"""
    if EMBEDDING_STORAGE == "s3":
        return load_embedding_from_s3(text, model)
    elif EMBEDDING_STORAGE == "local":
        return load_embedding_from_local(text, model)
    else:
        raise ValueError(f"Unknown EMBEDDING_STORAGE: {EMBEDDING_STORAGE}")

def save_embedding(text: str, model: str, embedding: List[float]):
    """Save embedding based on storage setting"""
    if EMBEDDING_STORAGE == "s3":
        return save_embedding_to_s3(text, model, embedding)
    elif EMBEDDING_STORAGE == "local":
        return save_embedding_to_local(text, model, embedding)
    else:
        raise ValueError(f"Unknown EMBEDDING_STORAGE: {EMBEDDING_STORAGE}")

print("‚úÖ Embeddings cache ready (with retry logic)")
print(f"   Storage: {EMBEDDING_STORAGE}")

‚úÖ Embeddings cache ready (with retry logic)
   Storage: local


## Embedding Generation

In [None]:
def get_embeddings_batch(texts: List[str], model: str = EMBEDDING_MODEL, batch_size: int = 1000) -> List[List[float]]:
    """Generate embeddings with caching (S3 or local based on EMBEDDING_STORAGE)"""
    cleaned_texts = [clean_question(t) for t in texts]
    embeddings = []
    cache_hits = 0
    api_calls = 0

    print(f"üîÑ Processing {len(cleaned_texts)} texts...")

    for i in tqdm(range(0, len(cleaned_texts), batch_size), desc="Batches"):
        batch_texts = cleaned_texts[i:i+batch_size]
        batch_embeddings = []
        uncached_texts = []
        uncached_indices = []

        # Check cache (S3 or local based on setting)
        for j, text in enumerate(batch_texts):
            cached = load_embedding(text, model)
            if cached:
                batch_embeddings.append(cached)
                cache_hits += 1
            else:
                batch_embeddings.append(None)
                uncached_texts.append(text)
                uncached_indices.append(j)

        # Generate uncached embeddings
        if uncached_texts:
            try:
                response = client.embeddings.create(model=model, input=uncached_texts)
                new_embeddings = [d.embedding for d in response.data]
                api_calls += len(uncached_texts)

                for idx, emb in zip(uncached_indices, new_embeddings):
                    batch_embeddings[idx] = emb
                    save_embedding(batch_texts[idx], model, emb)
            except Exception as e:
                error_logger.log_error("Embeddings", f"Batch failed", e)
                for idx in uncached_indices:
                    batch_embeddings[idx] = [0.0] * EMBEDDING_DIMENSIONS

        embeddings.extend(batch_embeddings)

    print(f"‚úÖ Complete! Cache: {cache_hits}/{len(embeddings)} ({cache_hits/len(embeddings)*100:.1f}%), API: {api_calls}")
    return embeddings

print("‚úÖ Embedding generation ready")

‚úÖ Embedding generation ready


## Load Data

In [None]:
# Load topics from Google Sheets
print("üìä Loading topics from Google Sheets...")
topics_df = read_topics_from_google_sheets(GOOGLE_SHEETS_URL)
topics_df = preprocess_dataframe(topics_df, 'question')

# Upload Langfuse CSV
if IN_COLAB:
    from google.colab import files
    print("\nüìÇ Upload Langfuse CSV:")
    uploaded = files.upload()
    langfuse_filename = list(uploaded.keys())[0]
else:
    langfuse_filename = "notebook/langfuse_traces.csv"

# Load Langfuse data
print(f"\nüìä Loading Langfuse data from {langfuse_filename}...")
langfuse_df = pd.read_csv(langfuse_filename)

# STEP 1: Filter malformed rows (FIRST STEP - remove kwargs/args errors)
langfuse_df = filter_malformed_rows(langfuse_df)

# STEP 2: Clean Langfuse data (also separates general feedback from chat traces)
langfuse_clean, general_feedback_df = clean_langfuse_data(langfuse_df)

# Store general feedback for later use (Streamlit Feedback page)
if not general_feedback_df.empty:
    print(f"\nüìù General feedback entries stored: {len(general_feedback_df)}")
else:
    print(f"\nüìù No general feedback entries found")

# STEP 3: Sync with master file (S3 storage)
master_df = sync_master_file(langfuse_clean)

# Create questions dataframe with all metadata columns preserved
required_cols = [
    'id', 'input', 'timestamp', 'country', 'state', 'city',
    'output', 'user_feedback', 'feedback_comment', 'user_language', 'is_suspicious',
    'is_acm_question', 'latency', 'total_cost', 'session_id', 'user_id',
    'tags', 'scores', 'release', 'role'
]
available_cols = [col for col in required_cols if col in master_df.columns]
questions_df = master_df[available_cols].copy()
questions_df = questions_df.rename(columns={'input': 'question'})
questions_df = preprocess_dataframe(questions_df, 'question')

print(f"\nüìä DATA LOADED:")
print(f"   Topics: {len(topics_df)} ({topics_df['topic'].nunique()} unique)")
print(f"   Questions: {len(questions_df)} (from master file)")
print(f"   With metadata: country={questions_df['country'].notna().sum()}, timestamp={questions_df['timestamp'].notna().sum()}")
print(f"   ACM questions: {questions_df['is_acm_question'].sum() if 'is_acm_question' in questions_df.columns else 'N/A'}")
print(f"   With latency: {questions_df['latency'].notna().sum() if 'latency' in questions_df.columns else 'N/A'}")
print(f"   With cost: {questions_df['total_cost'].notna().sum() if 'total_cost' in questions_df.columns else 'N/A'}")
print(f"   With user_id: {questions_df['user_id'].notna().sum() if 'user_id' in questions_df.columns else 'N/A'}")
print(f"   With session_id: {questions_df['session_id'].notna().sum() if 'session_id' in questions_df.columns else 'N/A'}")
print(f"   With feedback reasons: {questions_df['feedback_comment'].notna().sum() if 'feedback_comment' in questions_df.columns else 'N/A'}")
print(f"   Errors: {error_logger.get_summary()['total_errors']}")
print(f"   Dropped rows: {error_logger.get_summary()['total_dropped_rows']}")

üìä Loading topics from Google Sheets...
‚úÖ Loaded 120 topics from Google Sheets
   Unique topics: 59, Unique subtopics: 118

üìÇ Upload Langfuse CSV:


Saving langfuse_traces_01_25_26.csv to langfuse_traces_01_25_26.csv

üìä Loading Langfuse data from langfuse_traces_01_25_26.csv...
üîç Filtering malformed rows (containing 'kwargs' AND 'args')...
‚úÖ Filtered 8 malformed rows (0.3%)
   Remaining: 2892 rows
üßπ Cleaning 2892 Langfuse rows...
‚úÖ Cleaned data: 2892 rows (0 dropped)
   Columns: ['id', 'timestamp', 'input', 'is_acm_question', 'output', 'user_feedback', 'country', 'state', 'city', 'ip_address', 'user_language', 'is_suspicious']
   Country data: 2826 rows with country info

üîÑ Syncing master file with S3...
üìÇ Checking S3 for master file...
‚úÖ Loaded master from S3: 23669 rows
   Timestamp range: 2025-07-01T00:00:00+00:00 to 2026-01-02T11:33:17.412000+00:00
üîÄ Merging new data (2892 rows) with master (23669 rows)...
   Combined: 26561 rows
üßπ Removing duplicates (same question + timestamp)...
   Removed 0 duplicates
   Deduplication criteria: input + timestamp
   Final: 26561 rows
‚òÅÔ∏è  Uploading to S3...
‚úÖ 

## Prepare Evaluation Dataset

In [None]:
if EVAL_MODE == "sample":
    eval_df = questions_df.sample(n=min(SAMPLE_SIZE, len(questions_df)), random_state=RANDOM_SEED).copy()
    print(f"üìù Sample mode: {len(eval_df)} questions")
else:
    eval_df = questions_df.copy()
    print(f"üìù Full mode: {len(eval_df)} questions")

# Cost estimate
total_tokens = (len(topics_df) + len(eval_df)) * 50
embedding_cost = (total_tokens / 1_000_000) * 0.02
print(f"üí∞ Estimated cost: ${embedding_cost:.4f} (embeddings only)")

üìù Full mode: 26561 questions
üí∞ Estimated cost: $0.0267 (embeddings only)


## Similarity Classification

In [None]:
def classify_by_similarity(questions_df: pd.DataFrame, topics_df: pd.DataFrame, threshold: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Classify questions by similarity to existing topics - preserves all metadata columns"""

    print(f"\nüéØ Similarity Classification (threshold: {threshold})")

    # Generate embeddings
    print(f"üìä Generating topic embeddings...")
    topic_embeddings = get_embeddings_batch(topics_df['question'].tolist())
    topics_df = topics_df.copy()
    topics_df['embedding'] = topic_embeddings

    print(f"üìä Generating question embeddings...")
    question_embeddings = get_embeddings_batch(questions_df['question'].tolist())

    # Classify
    similar = []
    remaining = []

    print(f"üîç Classifying {len(questions_df)} questions...")
    for idx, (_, row) in enumerate(tqdm(questions_df.iterrows(), total=len(questions_df))):
        question = row['question']
        q_emb = question_embeddings[idx]

        if not q_emb or len(q_emb) != EMBEDDING_DIMENSIONS:
            row_data = row.to_dict()
            row_data['embedding'] = [0.0]*EMBEDDING_DIMENSIONS
            remaining.append(row_data)
            continue

        best_sim = 0
        best_match = None

        for _, topic_row in topics_df.iterrows():
            t_emb = topic_row['embedding']
            if t_emb and len(t_emb) == EMBEDDING_DIMENSIONS:
                sim = 1 - cosine(q_emb, t_emb)
                if sim > best_sim:
                    best_sim = sim
                    best_match = topic_row

        if best_sim >= threshold and best_match is not None:
            row_data = row.to_dict()
            row_data['matched_topic'] = best_match['topic']
            row_data['matched_subtopic'] = best_match['subtopic']
            row_data['similarity_score'] = best_sim
            similar.append(row_data)
        else:
            row_data = row.to_dict()
            row_data['embedding'] = q_emb
            remaining.append(row_data)

    similar_df = pd.DataFrame(similar)
    remaining_df = pd.DataFrame(remaining)

    print(f"\n‚úÖ Classification complete:")
    print(f"   Similar: {len(similar_df)} ({len(similar_df)/len(questions_df)*100:.1f}%)")
    print(f"   Remaining: {len(remaining_df)} ({len(remaining_df)/len(questions_df)*100:.1f}%)")

    return similar_df, remaining_df

similar_df, remaining_df = classify_by_similarity(eval_df, topics_df, SIMILARITY_THRESHOLD)


üéØ Similarity Classification (threshold: 0.7)
üìä Generating topic embeddings...
üîÑ Processing 120 texts...


Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1/1 [00:02<00:00,  2.01s/it]


‚úÖ Complete! Cache: 0/120 (0.0%), API: 120
üìä Generating question embeddings...
üîÑ Processing 26561 texts...


Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 27/27 [00:52<00:00,  1.94s/it]


‚úÖ Complete! Cache: 4693/26561 (17.7%), API: 21868
üîç Classifying 26561 questions...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 26561/26561 [16:13<00:00, 27.29it/s]



‚úÖ Classification complete:
   Similar: 3942 (14.8%)
   Remaining: 22619 (85.2%)


## Clustering for New Topics

In [None]:
clustered_df = None
topic_model = None

if len(remaining_df) > 0:
    print(f"\nüéØ Clustering {len(remaining_df)} remaining questions...")

    from umap import UMAP
    from hdbscan import HDBSCAN
    from bertopic import BERTopic

    embeddings = np.array(remaining_df['embedding'].tolist())

    # UMAP
    print(f"üîÑ UMAP reduction to {UMAP_N_COMPONENTS} dimensions...")
    umap_model = UMAP(n_components=UMAP_N_COMPONENTS, min_dist=0.0, metric='cosine', random_state=RANDOM_SEED)
    reduced = umap_model.fit_transform(embeddings)

    # HDBSCAN
    print(f"üîÑ HDBSCAN clustering (min_size={HDBSCAN_MIN_CLUSTER_SIZE})...")
    hdbscan_model = HDBSCAN(min_cluster_size=HDBSCAN_MIN_CLUSTER_SIZE, metric="euclidean", cluster_selection_method="eom")
    clusters = hdbscan_model.fit_predict(reduced)

    n_clusters = len([c for c in np.unique(clusters) if c != -1])
    n_noise = sum(clusters == -1)
    print(f"‚úÖ Found {n_clusters} clusters, {n_noise} noise points")

    if n_clusters > 0:
        # BERTopic
        topic_model = BERTopic(embedding_model=None, umap_model=umap_model, hdbscan_model=hdbscan_model, verbose=False)
        topics, probs = topic_model.fit_transform(remaining_df['question'].tolist(), embeddings)

        clustered_df = remaining_df.copy()
        clustered_df['cluster_id'] = clusters
        clustered_df['topic_id'] = topics
        clustered_df = clustered_df[clustered_df['cluster_id'] != -1]

        print(f"‚úÖ Clustered {len(clustered_df)} questions into {n_clusters} topics")
else:
    print("\n‚úÖ All questions matched existing topics - no clustering needed")


üéØ Clustering 22619 remaining questions...


  $max \{ core_k(a), core_k(b), 1/\alpha d(a,b) \}$.


üîÑ UMAP reduction to 5 dimensions...
üîÑ HDBSCAN clustering (min_size=3)...
‚úÖ Found 1694 clusters, 5174 noise points
‚úÖ Clustered 17445 questions into 1694 topics


## Generate Topic Names with GPT

In [None]:
topic_names = {}

if clustered_df is not None and len(clustered_df) > 0:
    print(f"\nü§ñ Generating topic names with {GPT_MODEL}...")

    async def generate_topic_name(questions: List[str], keywords: str = "") -> str:
        """Generate a topic name using GPT-5 for a cluster of questions"""

        # Limit to top 10 questions for context (like insights)
        sample_questions = questions[:10]
        questions_text = "\n".join([f"- {q}" for q in sample_questions])

        prompt = f"""
Based on the following student questions and keywords, generate a concise, descriptive topic name.

QUESTIONS:
{questions_text}

KEYWORDS: {keywords}

Instructions:
- Your answer must be ONLY the topic name (2‚Äì8 words), no extra text.
- It should clearly describe the shared theme of the questions.
- Avoid generic labels like "General Questions" or "Miscellaneous."
- Do not include "Topic name:" or quotation marks.
- Use simple, natural English that sounds clear to a student or teacher.

Example:
Questions:
- When does registration open?
- What are the fall 2025 enrollment deadlines?
Keywords: registration, deadlines

Topic name: Fall 2025 Registration Deadlines

Now generate the topic name for the questions above:
"""

        try:
            messages = [
                {"role": "system", "content": "You are an expert at creating clear, descriptive topic names for student question categories."},
                {"role": "user", "content": prompt}
            ]

            # GPT-5 specific configuration (NO temperature parameter!)
            response = await async_client.chat.completions.create(
                model=GPT_MODEL,
                messages=messages,
                max_completion_tokens=1000  # Use max_completion_tokens for GPT-5, not max_tokens
            )

            topic_name = response.choices[0].message.content.strip()

            # Clean up the response
            topic_name = topic_name.replace("Topic name:", "").strip()
            topic_name = topic_name.strip('\"\'')

            if not topic_name:
                topic_name = f"Topic: {keywords[:50]}" if keywords else f"Question Group {hash(str(questions[:3])) % 1000}"

            return topic_name

        except Exception as e:
            error_logger.log_error("TopicNaming", f"GPT failed: {str(e)}", e)
            # Fallback to keyword-based name
            fallback_name = f"Topic: {keywords[:50]}" if keywords else f"Question Group {hash(str(questions[:3])) % 1000}"
            return fallback_name

    async def process_all_clusters():
        tasks = []
        cluster_ids = []

        for cluster_id, group in clustered_df.groupby('cluster_id'):
            questions = group['question'].tolist()
            # Extract keywords from BERTopic if available
            keywords = group['topic_keywords'].iloc[0] if 'topic_keywords' in group.columns else ""

            tasks.append(generate_topic_name(questions, keywords))
            cluster_ids.append(cluster_id)

        names = await asyncio.gather(*tasks)
        return dict(zip(cluster_ids, names))

    topic_names = await process_all_clusters()
    clustered_df['topic_name'] = clustered_df['cluster_id'].map(topic_names)

    print(f"‚úÖ Generated {len(topic_names)} topic names")
    for cid, name in list(topic_names.items())[:5]:
        count = len(clustered_df[clustered_df['cluster_id'] == cid])
        print(f"   {name} ({count} questions)")


ü§ñ Generating topic names with gpt-5-nano...
‚úÖ Generated 1669 topic names
   TR letter grade meaning (34 questions)
   Transfer Credit and TR Grades (7 questions)
   Haitian Student Tuition and Scholarships (17 questions)
   PathwayConnect Absence Corrections and Global Services (20 questions)
   Inside Language Model: Forbidden Questions (8 questions)


## Generate Output Files

In [None]:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

print(f"\nüìÅ Generating output files...")

# File 1: Similar questions
file1 = f"similar_questions_{timestamp}.parquet"
if len(similar_df) > 0:
    output1 = similar_df[['question', 'matched_topic', 'matched_subtopic', 'similarity_score']].copy()
    output1.columns = ['question', 'existing_topic', 'existing_subtopic', 'similarity_score']
    output1 = output1.sort_values('similarity_score', ascending=False)
else:
    output1 = pd.DataFrame(columns=['question', 'existing_topic', 'existing_subtopic', 'similarity_score'])

# Add metadata
output1.attrs['metadata'] = {
    'timestamp': timestamp,
    'threshold': SIMILARITY_THRESHOLD,
    'total_questions': len(output1),
    'default_visible_columns': ['question', 'existing_topic', 'existing_subtopic', 'similarity_score']
}
output1.to_parquet(file1)
print(f"‚úÖ {file1}: {len(output1)} rows")

# File 2: New topics
file2 = f"new_topics_{timestamp}.parquet"
if clustered_df is not None and len(clustered_df) > 0:
    cluster_summary = clustered_df.groupby('cluster_id').agg({
        'topic_name': 'first',
        'question': ['first', 'count']
    }).reset_index()
    cluster_summary.columns = ['cluster_id', 'topic_name', 'representative_question', 'question_count']
    output2 = cluster_summary[['topic_name', 'representative_question', 'question_count']].sort_values('question_count', ascending=False)
else:
    output2 = pd.DataFrame(columns=['topic_name', 'representative_question', 'question_count'])

output2.attrs['metadata'] = {
    'timestamp': timestamp,
    'total_topics': len(output2),
    'default_visible_columns': ['topic_name', 'representative_question', 'question_count']
}
output2.to_parquet(file2)
print(f"‚úÖ {file2}: {len(output2)} rows")

# File 3: All questions review WITH METADATA
file3 = f"pathway_questions_review_{timestamp}.parquet"
review_data = []

# Metadata columns to preserve (including new fields: feedback_comment, latency, cost, session/user tracking)
metadata_cols = [
    'timestamp', 'country', 'state', 'city', 'output', 'user_feedback', 'feedback_comment',
    'user_language', 'is_suspicious', 'is_acm_question',
    'latency', 'total_cost', 'session_id', 'user_id', 'tags', 'scores',
    'release', 'role'
]

if len(similar_df) > 0:
    for _, row in similar_df.iterrows():
        record = {
            'question': row['question'],
            'topic_name': f"{row['matched_topic']} | {row['matched_subtopic']}",
            'classification': 'existing',
            'confidence': row['similarity_score']
        }
        # Add metadata columns
        for col in metadata_cols:
            record[col] = row.get(col, None)
        review_data.append(record)

if clustered_df is not None and len(clustered_df) > 0:
    for _, row in clustered_df.iterrows():
        record = {
            'question': row['question'],
            'topic_name': row['topic_name'],
            'classification': 'new',
            'confidence': 0.5
        }
        # Add metadata columns
        for col in metadata_cols:
            record[col] = row.get(col, None)
        review_data.append(record)

if len(remaining_df) > len(clustered_df) if clustered_df is not None else len(remaining_df) > 0:
    clustered_questions = set(clustered_df['question']) if clustered_df is not None else set()
    for _, row in remaining_df.iterrows():
        if row['question'] not in clustered_questions:
            record = {
                'question': row['question'],
                'topic_name': 'Other',
                'classification': 'uncategorized',
                'confidence': 0.0
            }
            # Add metadata columns
            for col in metadata_cols:
                record[col] = row.get(col, None)
            review_data.append(record)

output3 = pd.DataFrame(review_data)
output3.attrs['metadata'] = {
    'timestamp': timestamp,
    'total_questions': len(output3),
    'default_visible_columns': ['question', 'timestamp', 'country', 'state', 'topic_name', 'classification', 'is_acm_question']
}
output3.to_parquet(file3)
print(f"‚úÖ {file3}: {len(output3)} rows")
print(f"   Columns: {list(output3.columns)}")
print(f"   With country data: {output3['country'].notna().sum()} rows")
print(f"   ACM questions: {output3['is_acm_question'].sum() if 'is_acm_question' in output3.columns else 'N/A'}")
print(f"   With latency: {output3['latency'].notna().sum() if 'latency' in output3.columns else 'N/A'}")
print(f"   With cost: {output3['total_cost'].notna().sum() if 'total_cost' in output3.columns else 'N/A'}")
print(f"   With user_id: {output3['user_id'].notna().sum() if 'user_id' in output3.columns else 'N/A'}")
print(f"   With feedback reasons: {output3['feedback_comment'].notna().sum() if 'feedback_comment' in output3.columns else 'N/A'}")

# File 4: Topic distribution analytics
file4 = f"topic_distribution_{timestamp}.parquet"
topic_dist = output3.groupby(['topic_name', 'classification']).size().reset_index(name='count')
topic_dist = topic_dist.sort_values('count', ascending=False)
topic_dist.attrs['metadata'] = {
    'timestamp': timestamp,
    'total_topics': len(topic_dist),
    'default_visible_columns': ['topic_name', 'classification', 'count']
}
topic_dist.to_parquet(file4)
print(f"‚úÖ {file4}: {len(topic_dist)} rows")

# File 5: General feedback (separate from questions)
file5 = f"general_feedback_{timestamp}.parquet"
if not general_feedback_df.empty:
    general_feedback_df.to_parquet(file5)
    print(f"‚úÖ {file5}: {len(general_feedback_df)} rows")
else:
    # Create empty parquet with expected schema
    empty_fb = pd.DataFrame(columns=['id', 'name', 'input', 'output', 'timestamp', 'user_id', 'session_id', 'tags'])
    empty_fb.to_parquet(file5)
    print(f"‚úÖ {file5}: 0 rows (empty)")

# Error log
error_log_file = f"error_log_{timestamp}.json"
with open(error_log_file, 'w') as f:
    json.dump(error_logger.get_summary(), f, indent=2)
print(f"‚úÖ {error_log_file}: error summary")

# Create output_files list for S3 upload
output_files = [file1, file2, file3, file4, file5, error_log_file]

print(f"\nüì¶ OUTPUT FILES GENERATED:")
print(f"   1. {file1} - Questions matched to existing topics")
print(f"   2. {file2} - New topic clusters discovered")
print(f"   3. {file3} - Complete review with ALL METADATA")
print(f"   4. {file4} - Topic distribution analytics")
print(f"   5. {file5} - General feedback submissions")
print(f"   6. {error_log_file} - Error log")


üìÅ Generating output files...
‚úÖ similar_questions_20260126_104758.parquet: 3942 rows
‚úÖ new_topics_20260126_104758.parquet: 1669 rows
‚úÖ pathway_questions_review_20260126_104758.parquet: 26357 rows
   Columns: ['question', 'topic_name', 'classification', 'confidence', 'timestamp', 'country', 'state', 'city', 'output', 'user_feedback', 'ip_address', 'user_language', 'is_suspicious', 'is_acm_question']
   With country data: 14823 rows
   ACM questions: 792
‚úÖ topic_distribution_20260126_104758.parquet: 1750 rows
‚úÖ error_log_20260126_104758.json: error summary

üì¶ OUTPUT FILES GENERATED:
   1. similar_questions_20260126_104758.parquet - Questions matched to existing topics
   2. new_topics_20260126_104758.parquet - New topic clusters discovered
   3. pathway_questions_review_20260126_104758.parquet - Complete review with ALL METADATA
   4. topic_distribution_20260126_104758.parquet - Topic distribution analytics
   5. error_log_20260126_104758.json - Error log


## Upload to S3

In [None]:
# Skip ACL if you don't have PutObjectAcl permission
# Use this if you can upload without ACL but not with public-read

print(f"\n‚òÅÔ∏è  Uploading to S3 (without public ACL)...")

# Delete old files
try:
    delete_s3_folder(S3_OUTPUT_PREFIX)
except Exception as e:
    print(f"‚ö†Ô∏è  Could not delete old files: {e}")

# Upload new files WITHOUT public-read ACL
uploaded = []
failed = []

for filepath in output_files:
    if not os.path.exists(filepath):
        print(f"‚ùå File not found: {filepath}")
        failed.append(filepath)
        continue

    file_size = os.path.getsize(filepath)
    print(f"üì§ Uploading {filepath} ({file_size:,} bytes)...")

    s3_key = f"{S3_OUTPUT_PREFIX}/{filepath}"

    try:
        # Use public=False to skip ACL (if you don't have PutObjectAcl permission)
        if upload_to_s3(filepath, s3_key, public=False):
            # Note: URL won't be publicly accessible without ACL
            url = f"s3://{S3_BUCKET}/{s3_key}"
            uploaded.append(url)
            print(f"   ‚úÖ Success: {url}")
        else:
            failed.append(filepath)
            print(f"   ‚ùå Failed: {filepath}")
    except Exception as e:
        print(f"   ‚ùå Exception: {str(e)}")
        failed.append(filepath)

print(f"\nüìä UPLOAD SUMMARY:")
print(f"   ‚úÖ Successful: {len(uploaded)}/{len(output_files)}")
print(f"   ‚ùå Failed: {len(failed)}/{len(output_files)}")

if uploaded:
    print(f"\n‚úÖ Uploaded files (private - not publicly accessible):")
    for url in uploaded:
        print(f"   {url}")
    print(f"\nüí° TIP: Files are uploaded but not public. Your Streamlit app can access them with AWS credentials.")

if failed:
    print(f"\n‚ùå Failed files:")
    for f in failed:
        print(f"   {f}")


‚òÅÔ∏è  Uploading to S3 (without public ACL)...
üóëÔ∏è  Deleted 5 objects from s3://byupathway-public/topic-modeling-data
üì§ Uploading similar_questions_20260126_104758.parquet (153,128 bytes)...
‚úÖ Uploaded to S3: s3://byupathway-public/topic-modeling-data/similar_questions_20260126_104758.parquet
   ‚úÖ Success: s3://byupathway-public/topic-modeling-data/similar_questions_20260126_104758.parquet
üì§ Uploading new_topics_20260126_104758.parquet (112,022 bytes)...
‚úÖ Uploaded to S3: s3://byupathway-public/topic-modeling-data/new_topics_20260126_104758.parquet
   ‚úÖ Success: s3://byupathway-public/topic-modeling-data/new_topics_20260126_104758.parquet
üì§ Uploading pathway_questions_review_20260126_104758.parquet (6,862,887 bytes)...
‚úÖ Uploaded to S3: s3://byupathway-public/topic-modeling-data/pathway_questions_review_20260126_104758.parquet
   ‚úÖ Success: s3://byupathway-public/topic-modeling-data/pathway_questions_review_20260126_104758.parquet
üì§ Uploading topic_distrib

## Analysis & Visualization

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Hybrid Topic Discovery Analysis', fontsize=16, fontweight='bold')

# 1. Processing pipeline
pipeline = ['Total', 'Similar', 'New Topics', 'Uncategorized']
counts = [
    len(eval_df),
    len(similar_df),
    len(clustered_df) if clustered_df is not None else 0,
    len(eval_df) - len(similar_df) - (len(clustered_df) if clustered_df is not None else 0)
]
axes[0,0].bar(pipeline, counts, color=['lightblue', 'lightgreen', 'orange', 'lightcoral'])
axes[0,0].set_title('Processing Results')
axes[0,0].set_ylabel('Questions')
for i, (label, count) in enumerate(zip(pipeline, counts)):
    axes[0,0].text(i, count + max(counts)*0.01, f"{count}\n({count/len(eval_df)*100:.1f}%)", ha='center')

# 2. Similarity distribution
if len(similar_df) > 0:
    axes[0,1].hist(similar_df['similarity_score'], bins=20, alpha=0.7, color='lightgreen', edgecolor='black')
    axes[0,1].axvline(SIMILARITY_THRESHOLD, color='red', linestyle='--', label=f'Threshold: {SIMILARITY_THRESHOLD}')
    axes[0,1].set_xlabel('Similarity Score')
    axes[0,1].set_ylabel('Count')
    axes[0,1].set_title('Similarity Distribution')
    axes[0,1].legend()
else:
    axes[0,1].text(0.5, 0.5, 'No similar questions', ha='center', va='center', transform=axes[0,1].transAxes)
    axes[0,1].set_title('Similarity Distribution')

# 3. Cluster sizes
if clustered_df is not None and len(clustered_df) > 0:
    cluster_sizes = clustered_df['cluster_id'].value_counts().values
    axes[1,0].hist(cluster_sizes, bins=min(20, len(cluster_sizes)), alpha=0.7, color='orange', edgecolor='black')
    axes[1,0].set_xlabel('Cluster Size')
    axes[1,0].set_ylabel('Count')
    axes[1,0].set_title('New Topic Sizes')
else:
    axes[1,0].text(0.5, 0.5, 'No clusters', ha='center', va='center', transform=axes[1,0].transAxes)
    axes[1,0].set_title('New Topic Sizes')

# 4. Topic distribution pie
pie_data = output3['classification'].value_counts()
if len(pie_data) > 0:
    axes[1,1].pie(pie_data.values, labels=pie_data.index, autopct='%1.1f%%', startangle=90)
    axes[1,1].set_title('Classification Distribution')
else:
    axes[1,1].text(0.5, 0.5, 'No data', ha='center', va='center', transform=axes[1,1].transAxes)
    axes[1,1].set_title('Classification Distribution')

plt.tight_layout()
plt.show()

print("\nüìä SUMMARY:")
print(f"   Total processed: {len(eval_df)}")
print(f"   Similar to existing: {len(similar_df)} ({len(similar_df)/len(eval_df)*100:.1f}%)")
print(f"   New topics: {len(topic_names)}")
print(f"   Errors: {error_logger.get_summary()['total_errors']}")
print(f"   Warnings: {error_logger.get_summary()['total_warnings']}")
print(f"\n‚úÖ COMPLETE! Files uploaded to S3.")