# Hybrid Topic Discovery & Classification with AWS Integration

**Purpose**: Classify student questions against existing topics and discover new topics using hybrid approach.

**Data Flow**:
1. Load topics from Google Sheets
2. Load student questions from Langfuse CSV
3. Similarity classification (threshold-based)
4. Clustering for new topic discovery
5. Output parquet files to AWS S3

**Key Features**:
- AWS S3 for embeddings cache and outputs
- Environment-responsive configuration
- Comprehensive error logging
- Analytics outputs for Streamlit dashboard

## Install Dependencies

In [None]:
!pip install -q openai pandas numpy scipy scikit-learn matplotlib seaborn tqdm umap-learn hdbscan bertopic backoff boto3 gspread oauth2client pyarrow fastparquet python-dotenv

## Configuration

In [None]:
# Processing settings
EVAL_MODE = "sample"  # "sample" or "all"
SAMPLE_SIZE = 1000
SIMILARITY_THRESHOLD = 0.70
REPRESENTATIVE_QUESTION_METHOD = "centroid"  # "centroid" or "frequent"

# Model settings
EMBEDDING_MODEL = "text-embedding-3-small"
EMBEDDING_DIMENSIONS = 1536
GPT_MODEL = "gpt-5-nano"

# AWS S3 settings
S3_BUCKET = "byupathway-public"
S3_OUTPUT_PREFIX = "topic-modeling-data"
S3_CACHE_PREFIX = "embeddings-cache"
S3_REGION = "us-east-1"

# Clustering settings
UMAP_N_COMPONENTS = 5
HDBSCAN_MIN_CLUSTER_SIZE = 3
RANDOM_SEED = 42

# Google Sheets URL (hardcoded with fallback)
GOOGLE_SHEETS_URL = "https://docs.google.com/spreadsheets/d/1L3kOmE6mZEVotjUu2ZuLqir2rZ4c0yfNeeTuHwf3JQI/edit?gid=0#gid=0"

print("✅ Configuration loaded")
print(f"   Mode: {EVAL_MODE}, Threshold: {SIMILARITY_THRESHOLD}")
print(f"   S3 Bucket: {S3_BUCKET}")
print(f"   Embedding Model: {EMBEDDING_MODEL}")

## Environment Setup

In [None]:
import os
import pandas as pd
import numpy as np
from scipy.spatial.distance import cosine
import json
import pickle
from pathlib import Path
import time
from datetime import datetime
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict, Tuple, Optional, Any
import asyncio
import backoff
import re
import hashlib
import boto3
from botocore.exceptions import ClientError
import gspread
from oauth2client.service_account import ServiceAccountCredentials

# Detect environment
try:
    import google.colab
    IN_COLAB = True
    from google.colab import userdata
    print("🔧 Running in Google Colab")
except ImportError:
    IN_COLAB = False
    from dotenv import load_dotenv
    load_dotenv()
    print("🔧 Running locally")

# Load credentials
if IN_COLAB:
    OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
    AWS_ACCESS_KEY = userdata.get('AWS_ACCESS_KEY_ID')
    AWS_SECRET_KEY = userdata.get('AWS_SECRET_ACCESS_KEY')
    GOOGLE_SERVICE_ACCOUNT = userdata.get('GOOGLE_SERVICE_ACCOUNT_JSON')
else:
    OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
    AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID')
    AWS_SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
    GOOGLE_SERVICE_ACCOUNT = os.getenv('GOOGLE_SERVICE_ACCOUNT_JSON')

# Initialize OpenAI
from openai import OpenAI, AsyncOpenAI
client = OpenAI(api_key=OPENAI_API_KEY)
async_client = AsyncOpenAI(api_key=OPENAI_API_KEY)

# Initialize AWS S3
s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=S3_REGION
)

print("✅ Environment setup complete")

## Error Logger

In [None]:
class ErrorLogger:
    def __init__(self):
        self.errors = []
        self.warnings = []
        self.rows_dropped = []
        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
    def log_error(self, stage: str, message: str, data: Any = None):
        entry = {"timestamp": datetime.now().isoformat(), "stage": stage, "message": message, "data": str(data)}
        self.errors.append(entry)
        print(f"❌ ERROR [{stage}]: {message}")
        
    def log_warning(self, stage: str, message: str, data: Any = None):
        entry = {"timestamp": datetime.now().isoformat(), "stage": stage, "message": message, "data": str(data)}
        self.warnings.append(entry)
        print(f"⚠️  WARNING [{stage}]: {message}")
        
    def log_dropped_row(self, stage: str, reason: str, row_data: Any):
        entry = {"timestamp": datetime.now().isoformat(), "stage": stage, "reason": reason, "row_data": str(row_data)}
        self.rows_dropped.append(entry)
        
    def get_summary(self):
        return {
            "total_errors": len(self.errors),
            "total_warnings": len(self.warnings),
            "total_dropped_rows": len(self.rows_dropped),
            "errors": self.errors,
            "warnings": self.warnings,
            "dropped_rows": self.rows_dropped
        }
    
    def save_to_file(self, filename: str):
        with open(filename, 'w') as f:
            json.dump(self.get_summary(), f, indent=2)
        print(f"📝 Error log saved: {filename}")
        return filename

error_logger = ErrorLogger()
print("✅ Error logger initialized")

## AWS S3 Utilities

In [None]:
@backoff.on_exception(
    backoff.expo,
    Exception,
    max_tries=5,
    max_time=30,
    giveup=lambda e: isinstance(e, (KeyboardInterrupt, SystemExit))
)
def upload_to_s3(local_path: str, s3_key: str, public: bool = True) -> bool:
    """Upload file to S3 with retry logic and exponential backoff
    
    Args:
        local_path: Local file path to upload
        s3_key: S3 key (path in bucket)
        public: Whether to set public-read ACL (default True)
    
    Returns:
        bool: True if successful, False otherwise
    """
    try:
        extra_args = {'ACL': 'public-read'} if public else {}
        s3_client.upload_file(local_path, S3_BUCKET, s3_key, ExtraArgs=extra_args)
        
        if public:
            url = f"https://{S3_BUCKET}.s3.amazonaws.com/{s3_key}"
            print(f"✅ Uploaded to S3: {url}")
        else:
            print(f"✅ Uploaded to S3: s3://{S3_BUCKET}/{s3_key}")
        
        return True
    except Exception as e:
        error_logger.log_error("S3_Upload", f"Failed to upload {local_path} after retries", e)
        return False

@backoff.on_exception(
    backoff.expo,
    Exception,
    max_tries=3,
    max_time=15,
    giveup=lambda e: isinstance(e, (KeyboardInterrupt, SystemExit))
)
def download_from_s3(s3_key: str, local_path: str) -> bool:
    """Download file from S3 with retry logic"""
    try:
        s3_client.download_file(S3_BUCKET, s3_key, local_path)
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            # File doesn't exist - don't retry
            return False
        error_logger.log_error("S3_Download", f"Failed to download {s3_key} after retries", e)
        return False

def delete_s3_folder(prefix: str):
    """Delete all objects with given prefix"""
    try:
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=prefix)
        if 'Contents' in response:
            objects = [{'Key': obj['Key']} for obj in response['Contents']]
            s3_client.delete_objects(Bucket=S3_BUCKET, Delete={'Objects': objects})
            print(f"🗑️  Deleted {len(objects)} objects from s3://{S3_BUCKET}/{prefix}")
    except Exception as e:
        error_logger.log_error("S3_Delete", f"Failed to delete folder {prefix}", e)

print("✅ AWS S3 utilities ready (with retry logic)")

### ✨ Retry Logic Features

**Automatic Retry with Exponential Backoff:**
- **`upload_to_s3`**: Up to 5 retries over 30 seconds for uploads
- **`download_from_s3`**: Up to 3 retries over 15 seconds for downloads
- **Exponential backoff**: Waits longer between each retry attempt
- **Graceful degradation**: Cache failures don't break the main workflow

**What this fixes:**
- ✅ Handles temporary network glitches
- ✅ Manages AWS rate limiting
- ✅ Resolves race conditions from concurrent uploads
- ✅ Automatic cleanup of temp files

**Performance:**
- Cache uploads are non-blocking - failures are logged but don't stop processing
- Downloads check for 404 (file not found) and don't retry unnecessarily
- Main output file uploads get full retry protection

## Google Sheets Integration

In [None]:
def read_topics_from_google_sheets(sheet_url: str) -> pd.DataFrame:
    """Read topics from Google Sheets with flexible column handling"""
    try:
        creds_dict = json.loads(GOOGLE_SERVICE_ACCOUNT)
        scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
        creds = ServiceAccountCredentials.from_json_keyfile_dict(creds_dict, scope)
        gc = gspread.authorize(creds)
        
        sheet = gc.open_by_url(sheet_url)
        worksheet = sheet.get_worksheet(0)
        data = worksheet.get_all_records()
        df = pd.DataFrame(data)
        
        # Handle both uppercase and lowercase column names
        column_mapping = {}
        for col in df.columns:
            col_lower = col.lower()
            if col_lower == 'topics' or col_lower == 'topic':
                column_mapping[col] = 'topic'
            elif col_lower == 'subtopics' or col_lower == 'subtopic':
                column_mapping[col] = 'subtopic'
            elif col_lower == 'questions' or col_lower == 'question':
                column_mapping[col] = 'question'
        
        df = df.rename(columns=column_mapping)
        
        required = ['topic', 'subtopic', 'question']
        if not all(col in df.columns for col in required):
            raise ValueError(f"Missing required columns. Found: {list(df.columns)}")
        
        df = df[required].dropna()
        print(f"✅ Loaded {len(df)} topics from Google Sheets")
        print(f"   Unique topics: {df['topic'].nunique()}, Unique subtopics: {df['subtopic'].nunique()}")
        return df
        
    except Exception as e:
        error_logger.log_error("GoogleSheets", "Failed to read topics", e)
        raise

print("✅ Google Sheets integration ready")

## Langfuse Data Cleaning

In [None]:
def extract_data_from_kwargs(kwargs_str: str) -> tuple[str, str]:
    """Extract question and response from malformed kwargs JSON string"""
    try:
        # Find positions
        request_pos = kwargs_str.find('"request":')
        if request_pos == -1:
            return "", ""
        
        # Find all "data": positions before request
        data_positions = []
        start = 0
        while True:
            pos = kwargs_str.find('"data":', start)
            if pos == -1 or pos >= request_pos:
                break
            data_positions.append(pos)
            start = pos + 1
        
        if not data_positions:
            return "", ""
        
        # Extract question from the last "data" (which has "question" field)
        last_data_pos = data_positions[-1]
        data_section_end = request_pos
        data_section = kwargs_str[last_data_pos:data_section_end].strip()
        if data_section.endswith(','):
            data_section = data_section[:-1]
        
        # Try to extract question from "question" field
        question_match = re.search(r'"question"\s*:\s*"([^"]*)"', data_section)
        question = question_match.group(1) if question_match else ""
        
        # Extract response from the first "data" (which has messages)
        first_data_pos = data_positions[0]
        next_data_pos = data_positions[1] if len(data_positions) > 1 else request_pos
        first_data_section = kwargs_str[first_data_pos:next_data_pos].strip()
        if first_data_section.endswith(','):
            first_data_section = first_data_section[:-1]
        
        # Extract last assistant message from messages
        response = ""
        assistant_positions = []
        start = 0
        while True:
            pos = first_data_section.find('"role": "assistant"', start)
            if pos == -1:
                break
            assistant_positions.append(pos)
            start = pos + 1
        
        if assistant_positions:
            last_assistant_pos = assistant_positions[-1]
            content_start = first_data_section.find('"content": "', last_assistant_pos)
            if content_start != -1:
                content_start += len('"content": "')
                content_end = content_start
                while content_end < len(first_data_section):
                    if first_data_section[content_end] == '"' and (content_end == 0 or first_data_section[content_end-1] != '\\'):
                        break
                    content_end += 1
                response = first_data_section[content_start:content_end]
        
        return question, response
    
    except Exception as e:
        return "", ""

def clean_langfuse_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean Langfuse CSV with kwargs error handling"""
    print(f"🧹 Cleaning {len(df)} rows from Langfuse...")
    
    cleaned_rows = []
    
    for idx, row in df.iterrows():
        try:
            # Extract question from kwargs if needed
            if pd.isna(row.get('input')) or row.get('input') == '' or '"kwargs"' in str(row.get('input', '')):
                if '"kwargs"' in str(row.get('input', '')):
                    # This is a kwargs row - extract data from the malformed JSON
                    question, response = extract_data_from_kwargs(str(row.get('input', '')))
                    if question:
                        row = row.copy()
                        row['input'] = question
                        if response and (pd.isna(row.get('output')) or row.get('output') == ''):
                            row['output'] = response
                elif 'kwargs' in df.columns and pd.notna(row.get('kwargs')):
                    try:
                        kwargs_data = json.loads(row['kwargs'])
                        messages = kwargs_data.get('data', {}).get('messages', [])
                        if messages and len(messages) > 0:
                            content = messages[0].get('content', '')
                            row = row.copy()
                            row['input'] = content
                    except (json.JSONDecodeError, KeyError, IndexError) as e:
                        error_logger.log_dropped_row("LangfuseClean", "Invalid kwargs JSON", {"index": idx, "kwargs": str(row.get('kwargs', ''))[:100]})
                        continue
            
            # Validate required fields
            if pd.isna(row.get('input')) or row.get('input') == '':
                error_logger.log_dropped_row("LangfuseClean", "Missing input", {"index": idx})
                continue
            
            cleaned_rows.append(row)
            
        except Exception as e:
            error_logger.log_dropped_row("LangfuseClean", f"Unexpected error: {e}", {"index": idx})
            continue
    
    cleaned_df = pd.DataFrame(cleaned_rows)
    
    # Remove duplicates (same timestamp and question)
    if 'timestamp' in cleaned_df.columns and 'input' in cleaned_df.columns:
        before_dedup = len(cleaned_df)
        cleaned_df = cleaned_df.drop_duplicates(subset=['timestamp', 'input'], keep='first')
        after_dedup = len(cleaned_df)
        if before_dedup > after_dedup:
            print(f"🗑️  Removed {before_dedup - after_dedup} duplicate rows")
    
    print(f"✅ Cleaned data: {len(cleaned_df)} rows ({len(df) - len(cleaned_df)} dropped)")
    return cleaned_df

print("✅ Langfuse cleaning utilities ready")

## Question Preprocessing

In [None]:
def clean_question(question: str) -> str:
    """Remove ACM prefixes and clean text"""
    if not isinstance(question, str):
        return str(question) if question is not None else ""
    
    pattern = r'^\s*\(ACMs?\s+[Qq]uestion\)\s*:?\s*'
    cleaned = re.sub(pattern, '', question, flags=re.IGNORECASE).strip()
    return cleaned if cleaned else question

def preprocess_dataframe(df: pd.DataFrame, question_col: str) -> pd.DataFrame:
    """Apply cleaning to question column"""
    df = df.copy()
    df[question_col] = df[question_col].apply(clean_question)
    return df

print("✅ Question preprocessing ready")

## S3 Embeddings Cache

In [None]:
def get_cache_key(text: str, model: str) -> str:
    """Generate S3 cache key for text"""
    text_hash = hashlib.md5(text.encode()).hexdigest()[:12]
    return f"{S3_CACHE_PREFIX}/{model}/{text_hash}.pkl"

def load_embedding_from_s3(text: str, model: str) -> Optional[List[float]]:
    """Load cached embedding from S3"""
    cache_key = get_cache_key(text, model)
    local_path = f"/tmp/{cache_key.split('/')[-1]}"
    
    if download_from_s3(cache_key, local_path):
        try:
            with open(local_path, 'rb') as f:
                embedding = pickle.load(f)
            # Clean up temp file after reading
            try:
                os.unlink(local_path)
            except:
                pass
            return embedding
        except Exception as e:
            # Clean up corrupted cache file
            try:
                os.unlink(local_path)
            except:
                pass
    return None

def save_embedding_to_s3(text: str, model: str, embedding: List[float]):
    """Save embedding to S3 cache with retry logic"""
    cache_key = get_cache_key(text, model)
    local_path = f"/tmp/{cache_key.split('/')[-1]}"
    
    try:
        # Write to local temp file
        with open(local_path, 'wb') as f:
            pickle.dump(embedding, f)
        
        # Upload to S3 (with retry logic from upload_to_s3)
        # Use public=False for cache files (no need for public access)
        success = upload_to_s3(local_path, cache_key, public=False)
        
        # Clean up temp file
        try:
            os.unlink(local_path)
        except:
            pass
        
        return success
    except Exception as e:
        # Clean up on failure
        try:
            if os.path.exists(local_path):
                os.unlink(local_path)
        except:
            pass
        # Don't log - cache failures are expected and handled gracefully
        return False

print("✅ S3 embeddings cache ready (with retry logic)")

## Embedding Generation

In [None]:
def get_embeddings_batch(texts: List[str], model: str = EMBEDDING_MODEL, batch_size: int = 1000) -> List[List[float]]:
    """Generate embeddings with S3 caching"""
    cleaned_texts = [clean_question(t) for t in texts]
    embeddings = []
    cache_hits = 0
    api_calls = 0
    
    print(f"🔄 Processing {len(cleaned_texts)} texts...")
    
    for i in tqdm(range(0, len(cleaned_texts), batch_size), desc="Batches"):
        batch_texts = cleaned_texts[i:i+batch_size]
        batch_embeddings = []
        uncached_texts = []
        uncached_indices = []
        
        # Check S3 cache
        for j, text in enumerate(batch_texts):
            cached = load_embedding_from_s3(text, model)
            if cached:
                batch_embeddings.append(cached)
                cache_hits += 1
            else:
                batch_embeddings.append(None)
                uncached_texts.append(text)
                uncached_indices.append(j)
        
        # Generate uncached embeddings
        if uncached_texts:
            try:
                response = client.embeddings.create(model=model, input=uncached_texts)
                new_embeddings = [d.embedding for d in response.data]
                api_calls += len(uncached_texts)
                
                for idx, emb in zip(uncached_indices, new_embeddings):
                    batch_embeddings[idx] = emb
                    save_embedding_to_s3(batch_texts[idx], model, emb)
            except Exception as e:
                error_logger.log_error("Embeddings", f"Batch failed", e)
                for idx in uncached_indices:
                    batch_embeddings[idx] = [0.0] * EMBEDDING_DIMENSIONS
        
        embeddings.extend(batch_embeddings)
    
    print(f"✅ Complete! Cache: {cache_hits}/{len(embeddings)} ({cache_hits/len(embeddings)*100:.1f}%), API: {api_calls}")
    return embeddings

print("✅ Embedding generation ready")

## Load Data

In [None]:
# Load topics from Google Sheets
print("📊 Loading topics from Google Sheets...")
topics_df = read_topics_from_google_sheets(GOOGLE_SHEETS_URL)
topics_df = preprocess_dataframe(topics_df, 'question')

# Upload Langfuse CSV
if IN_COLAB:
    from google.colab import files
    print("\n📂 Upload Langfuse CSV:")
    uploaded = files.upload()
    langfuse_filename = list(uploaded.keys())[0]
else:
    langfuse_filename = "langfuse_traces_10_08_25.csv"

# Load and clean Langfuse data
print(f"\n📊 Loading Langfuse data from {langfuse_filename}...")
langfuse_df = pd.read_csv(langfuse_filename)
langfuse_clean = clean_langfuse_data(langfuse_df)

# Create questions dataframe
questions_df = pd.DataFrame({'question': langfuse_clean['input'].tolist()})
questions_df = preprocess_dataframe(questions_df, 'question')

print(f"\n📊 DATA LOADED:")
print(f"   Topics: {len(topics_df)} ({topics_df['topic'].nunique()} unique)")
print(f"   Questions: {len(questions_df)}")
print(f"   Errors: {error_logger.get_summary()['total_errors']}")
print(f"   Dropped rows: {error_logger.get_summary()['total_dropped_rows']}")

## Prepare Evaluation Dataset

In [None]:
if EVAL_MODE == "sample":
    eval_df = questions_df.sample(n=min(SAMPLE_SIZE, len(questions_df)), random_state=RANDOM_SEED).copy()
    print(f"📝 Sample mode: {len(eval_df)} questions")
else:
    eval_df = questions_df.copy()
    print(f"📝 Full mode: {len(eval_df)} questions")

# Cost estimate
total_tokens = (len(topics_df) + len(eval_df)) * 50
embedding_cost = (total_tokens / 1_000_000) * 0.02
print(f"💰 Estimated cost: ${embedding_cost:.4f} (embeddings only)")

## Similarity Classification

In [None]:
def classify_by_similarity(questions_df: pd.DataFrame, topics_df: pd.DataFrame, threshold: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Classify questions by similarity to existing topics"""
    
    print(f"\n🎯 Similarity Classification (threshold: {threshold})")
    
    # Generate embeddings
    print(f"📊 Generating topic embeddings...")
    topic_embeddings = get_embeddings_batch(topics_df['question'].tolist())
    topics_df = topics_df.copy()
    topics_df['embedding'] = topic_embeddings
    
    print(f"📊 Generating question embeddings...")
    question_embeddings = get_embeddings_batch(questions_df['question'].tolist())
    
    # Classify
    similar = []
    remaining = []
    
    print(f"🔍 Classifying {len(questions_df)} questions...")
    for question, q_emb in tqdm(zip(questions_df['question'], question_embeddings), total=len(questions_df)):
        if not q_emb or len(q_emb) != EMBEDDING_DIMENSIONS:
            remaining.append({'question': question, 'embedding': [0.0]*EMBEDDING_DIMENSIONS})
            continue
        
        best_sim = 0
        best_match = None
        
        for _, topic_row in topics_df.iterrows():
            t_emb = topic_row['embedding']
            if t_emb and len(t_emb) == EMBEDDING_DIMENSIONS:
                sim = 1 - cosine(q_emb, t_emb)
                if sim > best_sim:
                    best_sim = sim
                    best_match = topic_row
        
        if best_sim >= threshold and best_match is not None:
            similar.append({
                'question': question,
                'matched_topic': best_match['topic'],
                'matched_subtopic': best_match['subtopic'],
                'similarity_score': best_sim
            })
        else:
            remaining.append({'question': question, 'embedding': q_emb})
    
    similar_df = pd.DataFrame(similar)
    remaining_df = pd.DataFrame(remaining)
    
    print(f"\n✅ Classification complete:")
    print(f"   Similar: {len(similar_df)} ({len(similar_df)/len(questions_df)*100:.1f}%)")
    print(f"   Remaining: {len(remaining_df)} ({len(remaining_df)/len(questions_df)*100:.1f}%)")
    
    return similar_df, remaining_df

similar_df, remaining_df = classify_by_similarity(eval_df, topics_df, SIMILARITY_THRESHOLD)

## Clustering for New Topics

In [None]:
clustered_df = None
topic_model = None

if len(remaining_df) > 0:
    print(f"\n🎯 Clustering {len(remaining_df)} remaining questions...")
    
    from umap import UMAP
    from hdbscan import HDBSCAN
    from bertopic import BERTopic
    
    embeddings = np.array(remaining_df['embedding'].tolist())
    
    # UMAP
    print(f"🔄 UMAP reduction to {UMAP_N_COMPONENTS} dimensions...")
    umap_model = UMAP(n_components=UMAP_N_COMPONENTS, min_dist=0.0, metric='cosine', random_state=RANDOM_SEED)
    reduced = umap_model.fit_transform(embeddings)
    
    # HDBSCAN
    print(f"🔄 HDBSCAN clustering (min_size={HDBSCAN_MIN_CLUSTER_SIZE})...")
    hdbscan_model = HDBSCAN(min_cluster_size=HDBSCAN_MIN_CLUSTER_SIZE, metric="euclidean", cluster_selection_method="eom")
    clusters = hdbscan_model.fit_predict(reduced)
    
    n_clusters = len([c for c in np.unique(clusters) if c != -1])
    n_noise = sum(clusters == -1)
    print(f"✅ Found {n_clusters} clusters, {n_noise} noise points")
    
    if n_clusters > 0:
        # BERTopic
        topic_model = BERTopic(embedding_model=None, umap_model=umap_model, hdbscan_model=hdbscan_model, verbose=False)
        topics, probs = topic_model.fit_transform(remaining_df['question'].tolist(), embeddings)
        
        clustered_df = remaining_df.copy()
        clustered_df['cluster_id'] = clusters
        clustered_df['topic_id'] = topics
        clustered_df = clustered_df[clustered_df['cluster_id'] != -1]
        
        print(f"✅ Clustered {len(clustered_df)} questions into {n_clusters} topics")
else:
    print("\n✅ All questions matched existing topics - no clustering needed")

## Generate Topic Names with GPT

In [None]:
topic_names = {}

if clustered_df is not None and len(clustered_df) > 0:
    print(f"\n🤖 Generating topic names with {GPT_MODEL}...")
    
    async def generate_topic_name(questions: List[str]) -> str:
        sample = questions[:10]
        prompt = f"""Generate a concise topic name (2-8 words) for these student questions:

{chr(10).join([f'- {q}' for q in sample])}

Return ONLY the topic name, no explanation."""
        
        try:
            response = await async_client.chat.completions.create(
                model=GPT_MODEL,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=100
            )
            return response.choices[0].message.content.strip().strip('"')
        except Exception as e:
            error_logger.log_error("TopicNaming", "GPT failed", e)
            return "Unnamed Topic"
    
    async def process_all_clusters():
        tasks = []
        cluster_ids = []
        for cluster_id, group in clustered_df.groupby('cluster_id'):
            tasks.append(generate_topic_name(group['question'].tolist()))
            cluster_ids.append(cluster_id)
        
        names = await asyncio.gather(*tasks)
        return dict(zip(cluster_ids, names))
    
    topic_names = await process_all_clusters()
    clustered_df['topic_name'] = clustered_df['cluster_id'].map(topic_names)
    
    print(f"✅ Generated {len(topic_names)} topic names")
    for cid, name in list(topic_names.items())[:5]:
        count = len(clustered_df[clustered_df['cluster_id'] == cid])
        print(f"   {name} ({count} questions)")

## Generate Output Files

In [None]:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

print(f"\n📁 Generating output files...")

# File 1: Similar questions
file1 = f"similar_questions_{timestamp}.parquet"
if len(similar_df) > 0:
    output1 = similar_df[['question', 'matched_topic', 'matched_subtopic', 'similarity_score']].copy()
    output1.columns = ['question', 'existing_topic', 'existing_subtopic', 'similarity_score']
    output1 = output1.sort_values('similarity_score', ascending=False)
else:
    output1 = pd.DataFrame(columns=['question', 'existing_topic', 'existing_subtopic', 'similarity_score'])

# Add metadata
output1.attrs['metadata'] = {
    'timestamp': timestamp,
    'threshold': SIMILARITY_THRESHOLD,
    'total_questions': len(output1),
    'default_visible_columns': ['question', 'existing_topic', 'existing_subtopic', 'similarity_score']
}
output1.to_parquet(file1)
print(f"✅ {file1}: {len(output1)} rows")

# File 2: New topics
file2 = f"new_topics_{timestamp}.parquet"
if clustered_df is not None and len(clustered_df) > 0:
    cluster_summary = clustered_df.groupby('cluster_id').agg({
        'topic_name': 'first',
        'question': ['first', 'count']
    }).reset_index()
    cluster_summary.columns = ['cluster_id', 'topic_name', 'representative_question', 'question_count']
    output2 = cluster_summary[['topic_name', 'representative_question', 'question_count']].sort_values('question_count', ascending=False)
else:
    output2 = pd.DataFrame(columns=['topic_name', 'representative_question', 'question_count'])

output2.attrs['metadata'] = {
    'timestamp': timestamp,
    'total_topics': len(output2),
    'default_visible_columns': ['topic_name', 'representative_question', 'question_count']
}
output2.to_parquet(file2)
print(f"✅ {file2}: {len(output2)} rows")

# File 3: All questions review
file3 = f"pathway_questions_review_{timestamp}.parquet"
review_data = []

if len(similar_df) > 0:
    for _, row in similar_df.iterrows():
        review_data.append({
            'question': row['question'],
            'topic_name': f"{row['matched_topic']} | {row['matched_subtopic']}",
            'classification': 'existing',
            'confidence': row['similarity_score']
        })

if clustered_df is not None and len(clustered_df) > 0:
    for _, row in clustered_df.iterrows():
        review_data.append({
            'question': row['question'],
            'topic_name': row['topic_name'],
            'classification': 'new',
            'confidence': 0.5
        })

if len(remaining_df) > len(clustered_df) if clustered_df is not None else len(remaining_df) > 0:
    clustered_questions = set(clustered_df['question']) if clustered_df is not None else set()
    for _, row in remaining_df.iterrows():
        if row['question'] not in clustered_questions:
            review_data.append({
                'question': row['question'],
                'topic_name': 'Other',
                'classification': 'uncategorized',
                'confidence': 0.0
            })

output3 = pd.DataFrame(review_data)
output3.attrs['metadata'] = {
    'timestamp': timestamp,
    'total_questions': len(output3),
    'default_visible_columns': ['question', 'topic_name', 'classification', 'confidence']
}
output3.to_parquet(file3)
print(f"✅ {file3}: {len(output3)} rows")

# File 4: Topic distribution analytics
file4 = f"topic_distribution_{timestamp}.parquet"
topic_dist = output3.groupby(['topic_name', 'classification']).size().reset_index(name='count')
topic_dist = topic_dist.sort_values('count', ascending=False)
topic_dist.attrs['metadata'] = {'timestamp': timestamp, 'purpose': 'streamlit_analytics'}
topic_dist.to_parquet(file4)
print(f"✅ {file4}: {len(topic_dist)} rows")

# File 5: Error log
file5 = f"error_log_{timestamp}.json"
error_logger.save_to_file(file5)

output_files = [file1, file2, file3, file4, file5]
print(f"\n✅ Generated {len(output_files)} output files")

## Upload to S3

In [None]:
print(f"\n☁️  Uploading to S3...")

# Delete old files
delete_s3_folder(S3_OUTPUT_PREFIX)

# Upload new files
uploaded = []
for filepath in output_files:
    s3_key = f"{S3_OUTPUT_PREFIX}/{filepath}"
    if upload_to_s3(filepath, s3_key):
        uploaded.append(f"https://{S3_BUCKET}.s3.amazonaws.com/{s3_key}")

print(f"\n✅ Uploaded {len(uploaded)} files to S3:")
for url in uploaded:
    print(f"   {url}")

## Analysis & Visualization

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Hybrid Topic Discovery Analysis', fontsize=16, fontweight='bold')

# 1. Processing pipeline
pipeline = ['Total', 'Similar', 'New Topics', 'Uncategorized']
counts = [
    len(eval_df),
    len(similar_df),
    len(clustered_df) if clustered_df is not None else 0,
    len(eval_df) - len(similar_df) - (len(clustered_df) if clustered_df is not None else 0)
]
axes[0,0].bar(pipeline, counts, color=['lightblue', 'lightgreen', 'orange', 'lightcoral'])
axes[0,0].set_title('Processing Results')
axes[0,0].set_ylabel('Questions')
for i, (label, count) in enumerate(zip(pipeline, counts)):
    axes[0,0].text(i, count + max(counts)*0.01, f"{count}\n({count/len(eval_df)*100:.1f}%)", ha='center')

# 2. Similarity distribution
if len(similar_df) > 0:
    axes[0,1].hist(similar_df['similarity_score'], bins=20, alpha=0.7, color='lightgreen', edgecolor='black')
    axes[0,1].axvline(SIMILARITY_THRESHOLD, color='red', linestyle='--', label=f'Threshold: {SIMILARITY_THRESHOLD}')
    axes[0,1].set_xlabel('Similarity Score')
    axes[0,1].set_ylabel('Count')
    axes[0,1].set_title('Similarity Distribution')
    axes[0,1].legend()
else:
    axes[0,1].text(0.5, 0.5, 'No similar questions', ha='center', va='center', transform=axes[0,1].transAxes)
    axes[0,1].set_title('Similarity Distribution')

# 3. Cluster sizes
if clustered_df is not None and len(clustered_df) > 0:
    cluster_sizes = clustered_df['cluster_id'].value_counts().values
    axes[1,0].hist(cluster_sizes, bins=min(20, len(cluster_sizes)), alpha=0.7, color='orange', edgecolor='black')
    axes[1,0].set_xlabel('Cluster Size')
    axes[1,0].set_ylabel('Count')
    axes[1,0].set_title('New Topic Sizes')
else:
    axes[1,0].text(0.5, 0.5, 'No clusters', ha='center', va='center', transform=axes[1,0].transAxes)
    axes[1,0].set_title('New Topic Sizes')

# 4. Topic distribution pie
pie_data = output3['classification'].value_counts()
if len(pie_data) > 0:
    axes[1,1].pie(pie_data.values, labels=pie_data.index, autopct='%1.1f%%', startangle=90)
    axes[1,1].set_title('Classification Distribution')
else:
    axes[1,1].text(0.5, 0.5, 'No data', ha='center', va='center', transform=axes[1,1].transAxes)
    axes[1,1].set_title('Classification Distribution')

plt.tight_layout()
plt.show()

print("\n📊 SUMMARY:")
print(f"   Total processed: {len(eval_df)}")
print(f"   Similar to existing: {len(similar_df)} ({len(similar_df)/len(eval_df)*100:.1f}%)")
print(f"   New topics: {len(topic_names)}")
print(f"   Errors: {error_logger.get_summary()['total_errors']}")
print(f"   Warnings: {error_logger.get_summary()['total_warnings']}")
print(f"\n✅ COMPLETE! Files uploaded to S3.")