In [None]:
# !git clone https://github.com/Sharp-4rth/temporal_belief_analysis.git

In [None]:
# For colab:
from google.colab import drive
drive.mount('/content/drive')

In [2]:
import sys
import os

# Absolute path to src directory
src_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'src'))
if src_path not in sys.path:
    sys.path.insert(0, src_path)

In [None]:
# Get latest version
%cd temporal_belief_analysis
!git pull

In [None]:
# For colab:
!pip install convokit

In [4]:
# For colab:
import unsloth
import unsloth_zoo
from convokit import Corpus, download
import convokit
from temporal_belief.models.bart_classifier import BARTZeroShotClassifier
from temporal_belief.utils.config import POLITICAL_TOPICS, ProjectConfig
corpus = Corpus(filename=download("subreddit-PoliticalDiscussion"))

Dataset already exists at /root/.convokit/saved-corpora/subreddit-PoliticalDiscussion


In [5]:
# Check dataset was loaded:
print(corpus.random_utterance().text)

If we are talking about true AI, the company or country that creates it first sure as hell is not going to sell it. I'm a little confused why you think they would? That type of tech is a game changer and the first to create it is going to be the winner. This isn't a cell phone. War, business, investing, data interpretation, an intelligence that can make a better version of it self and give even better results... You have an advantage the rest of the world could only dream of.


In [21]:
# For local:
from src.temporal_belief.models.bart_classifier import BARTZeroShotClassifier
from src.temporal_belief.utils.config import POLITICAL_TOPICS, ProjectConfig

In [36]:
"""Topic detection functionality for conversation analysis."""

import logging
from typing import List, Dict, Any, Optional
from tqdm import tqdm

logger = logging.getLogger(__name__)

class TopicDetector:
    """Detect topics in ConvoKit conversations using BART."""

    def __init__(self, topics: Optional[List[str]] = None,
                 config: ProjectConfig = None):
        """Initialize topic detector."""
        self.config = config or ProjectConfig()
        self.classifier = BARTZeroShotClassifier(self.config.bart_model_name)
        self.topics = topics or POLITICAL_TOPICS
        logger.info(f"Initialized topic detector with {len(self.topics)} topics")

    def detect_conversation_topic(self, conversation) -> Dict[str, Any]:
        """Detect topic for a single conversation."""
        utterances = list(conversation.iter_utterances())
        
        # Safe attribute access
        title = conversation.meta.get('title', '')
        
        # Safe utterance handling
        first_utterance = utterances[0] if utterances else None
        original_post = first_utterance.text if first_utterance else ''
        
        if not original_post and not title:
            logger.warning(f"No utterances or title found in conversation {conversation.id}")
            return {"topic": "unknown", "confidence": 0.0}

        # Truncate long texts to prevent memory issues
        combined_text = f"Title: {title}. Original Post: {original_post}"[:2000]
        result = self.classifier.classify_text(combined_text, self.topics)

        return {
            "topic": result["label"],
            "confidence": result["confidence"],
            "all_scores": result["all_scores"],
            "text_length": len(original_post),
            "num_utterances": len(utterances)
        }

    def process_corpus(self, corpus, batch_size: int = 50,  # Balanced batch size
                    save_path: Optional[str] = None) -> None:
        """Process entire corpus for topic detection."""
        conversations = list(corpus.iter_conversations())
        logger.info(f"Processing {len(conversations)} conversations for topic detection")

        for i in tqdm(range(0, len(conversations), batch_size),
                      desc="Processing conversations"):
            batch = conversations[i:i + batch_size]

            # Prepare all texts for batch processing
            batch_texts = []
            valid_conversations = []

            for conv in batch:
                try:
                    # Safe attribute access
                    title = conv.meta.get('title', '')
                    utterances = list(conv.iter_utterances())
                    
                    # Safe utterance handling
                    first_utterance = utterances[0] if utterances else None
                    original_post = first_utterance.text if first_utterance else ''
                    
                    if not original_post and not title:
                        logger.warning(f"No utterances or title found in conversation {conv.id}")
                        # Set metadata for empty conversations
                        conv.add_meta("detected_topic", "unknown")
                        conv.add_meta("topic_confidence", 0.0)
                        conv.add_meta("topic_scores", {})
                        continue

                    # Truncate long texts
                    combined_text = f"{title}. {original_post}"[:2000]
                    batch_texts.append(combined_text)
                    valid_conversations.append(conv)

                except Exception as e:
                    logger.error(f"Failed to prepare conversation {conv.id}: {e}")
                    conv.add_meta("detected_topic", "unknown")
                    conv.add_meta("topic_confidence", 0.0)
                    conv.add_meta("topic_scores", {})

            # Process entire batch at once
            if batch_texts:
                try:
                    print(f"🚀 Attempting batch of {len(batch_texts)} texts...")
                    import time
                    start = time.time()
                    
                    batch_results = self.classifier.classify_batch(batch_texts, self.topics)
                    
                    end = time.time()
                    print(f"✅ Batch completed in {end-start:.2f}s ({(end-start)/len(batch_texts):.3f}s per text)")

                    # Apply results back to conversations
                    for conv, result in zip(valid_conversations, batch_results):
                        conv.add_meta("detected_topic", result["label"])
                        conv.add_meta("topic_confidence", result["confidence"])
                        conv.add_meta("topic_scores", result["all_scores"])

                except Exception as e:
                    print(f"❌ Batch processing failed: {e}")
                    logger.error(f"Batch classification failed: {e}")
                    
                    # Fallback to individual processing
                    for conv in valid_conversations:
                        try:
                            topic_result = self.detect_conversation_topic(conv)
                            conv.add_meta("detected_topic", topic_result["topic"])
                            conv.add_meta("topic_confidence", topic_result["confidence"])
                            conv.add_meta("topic_scores", topic_result["all_scores"])
                        except Exception as e2:
                            logger.error(f"Individual fallback failed for {conv.id}: {e2}")
                            conv.add_meta("detected_topic", "unknown")
                            conv.add_meta("topic_confidence", 0.0)
                            conv.add_meta("topic_scores", {})

        if save_path:
            corpus.dump(save_path)
            logger.info(f"Saved processed corpus to {save_path}")

        logger.info("Topic detection processing complete")

In [7]:
corpus_small = Corpus(filename=download("reddit-corpus-small"))

Dataset already exists at /root/.convokit/saved-corpora/reddit-corpus-small


In [None]:
# Testing 'process_corpus()'
SAVE_PATH = "/workspace/temporal_belief_analysis/pd_corpus_with_topics"
topic_detector = TopicDetector()
topic_detector.process_corpus(corpus, save_path=SAVE_PATH)

In [32]:
corpus_small.dump("/workspace/temporal_belief_analysis/corpus_small_save_test")

In [None]:
# In a new cell - test if kernel responds
print("Kernel alive check")
import time
print(f"Current time: {time.time()}")

In [31]:
# Check if metadata gets added
conversations = list(corpus.iter_conversations())

# Check first conversation
first_conv = conversations[1]
print(f"First conversation ID: {first_conv.id}")
print(f"Has topic metadata: {'detected_topic' in first_conv.meta}")
if 'detected_topic' in first_conv.meta:
    print(f"Topic: {first_conv.meta['detected_topic']}")
    print(f"Confidence: {first_conv.meta['topic_confidence']}")

First conversation ID: nz1xu
Has topic metadata: True
Topic: media and political commentary
Confidence: 0.3874607980251312


In [None]:
# Testing 'detect_conversation_topic()' and 'dump()'
i = 0
convos_small = list(corpus_small.iter_conversations())
topic_detector = TopicDetector()
for i in range(3):
  utterances = list(convos_small[i].iter_utterances())
  title = convos_small[i].meta['title']
  og_post = utterances[0].text
  print(100*'-')
  print(f"Title: {title} \n")
  print(f"OG post: {og_post} \n")
  topic = topic_detector.detect_conversation_topic(convos_small[i])
  print(f"Detected topic: {topic['topic']} \n")
  print(f"Confidence: {topic['confidence']} \n")
  convos_small[i].add_meta("detected_topic", topic["topic"])
  convos_small[i].add_meta("topic_confidence", topic["confidence"])
  convos_small[i].add_meta("topic_scores", topic["all_scores"])
  i += 1

corpus_small.dump("/content/drive/MyDrive/MScProject/Corpora/corpus_small")
