BERT AND LDA TOPIC MODELLING


In [None]:
#BERTopic Analysis with Dynamic Parameter Adjustment
import pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt
import seaborn as sns
from google.colab import files
import warnings
warnings.filterwarnings('ignore')

# Install required packages
!pip install bertopic umap-learn hdbscan sentence-transformers wordcloud plotly gensim

from bertopic import BERTopic
from sentence_transformers import SentenceTransformer
from umap import UMAP
from hdbscan import HDBSCAN
from wordcloud import WordCloud
from sklearn.feature_extraction.text import CountVectorizer
import plotly.express as px
from gensim.models import CoherenceModel
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')

class BERTopicAnalyzer:
    """Complete BERTopic analyzer with dynamic parameter adjustment"""

    def __init__(self):
        # Extended stopword lists
        self.english_stopwords = set(stopwords.words('english'))
        self.setswana_stopwords = {'hore', 'le', 'ka', 'ya', 'ba', 'e', 'sa','te', 'di', 'ten', 'done', 'ga', 'bona', 'everyone', 'bo', 'went'}
        self.asr_artifacts = {'hartha', 'ore', 'tele', 'um', 'uh', 'ah'}
        self.custom_fillers = {'going', 'come', 'one', 'make', 'take', 'time', 'said'}
        self.all_stopwords = (self.english_stopwords | self.setswana_stopwords |
                             self.asr_artifacts | self.custom_fillers)

    def load_transcripts(self, file_path):
        """Load and clean transcript data with robust parsing"""
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        if "---" in content or "TRANSCRIPT" in content.upper():
            sections = re.split(r'\n(?=---|\w+\s*TRANSCRIPT)', content, flags=re.IGNORECASE)
            docs = [' '.join([line.strip() for line in section.split('\n')
                   if not line.strip().startswith('---') and 'TRANSCRIPT' not in line.upper()]).strip()
                   for section in sections if section.strip()]
        else:
            docs = [line.strip() for line in content.split('\n') if line.strip()]

        # Filter short documents and ensure minimum length
        return [doc for doc in docs if len(doc) > 20]

    def initialize_bertopic(self, n_docs):
        """Dynamically configure parameters based on dataset size"""
        # 1. Embedding model (multilingual for better handling)
        embedding_model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2")

        # 2. UMAP - adjust neighbors based on dataset size
        umap_model = UMAP(
            n_neighbors=min(15, max(5, n_docs//3)),  # Dynamic neighbor count
            n_components=5,
            min_dist=0.0,
            metric='cosine',
            random_state=42
        )

        # 3. HDBSCAN - dynamic cluster size
        hdbscan_model = HDBSCAN(
            min_cluster_size=max(5, min(25, n_docs//5)),  # 5-25 range
            metric='euclidean',
            cluster_selection_method='eom',
            prediction_data=True
        )

        # 4. Vectorizer - auto-adjust thresholds
        vectorizer_model = CountVectorizer(
            stop_words=list(self.all_stopwords),
            ngram_range=(1, 2),
            min_df=max(1, min(3, n_docs//10)),  # 1-3 docs minimum
            max_df=min(0.95, 0.7 + (n_docs/100))  # 0.7-0.95 range
        )

        return BERTopic(
            embedding_model=embedding_model,
            umap_model=umap_model,
            hdbscan_model=hdbscan_model,
            vectorizer_model=vectorizer_model,
            language="multilingual",
            calculate_probabilities=True,
            verbose=True
        )

    def calculate_coherence(self, model, docs):
        """Calculate coherence scores with robust error handling"""
        try:
            processed_texts = [[word for word in doc.lower().split()
                              if word not in self.all_stopwords and len(word) > 2]
                             for doc in docs]

            dictionary = corpora.Dictionary(processed_texts)
            topic_words = [[word for word, _ in model.get_topic(i)]
                          for i in range(len(model.get_topic_info())-1)]

            coherence_scores = {}
            for metric in ['c_v', 'u_mass', 'c_npmi']:
                try:
                    cm = CoherenceModel(
                        topics=topic_words,
                        texts=processed_texts,
                        dictionary=dictionary,
                        coherence=metric
                    )
                    coherence_scores[metric] = cm.get_coherence()
                except Exception as e:
                    print(f"Could not calculate {metric}: {str(e)}")
                    coherence_scores[metric] = np.nan
            return coherence_scores
        except Exception as e:
            print(f"Coherence calculation failed: {str(e)}")
            return {'c_v': np.nan, 'u_mass': np.nan, 'c_npmi': np.nan}

    def visualize_results(self, model, docs):
        """Generate all visualizations with error handling"""
        try:
            # Word Clouds
            n_topics = len(model.get_topic_info())-1
            plt.figure(figsize=(16, max(6, n_topics*2)))
            for i in range(n_topics):
                plt.subplot((n_topics//5)+1, 5, i+1)
                topic_words = dict(model.get_topic(i))
                wordcloud = WordCloud(width=300, height=200,
                                    background_color='white',
                                    colormap='viridis').generate_from_frequencies(topic_words)
                plt.imshow(wordcloud)
                plt.axis("off")
                plt.title(f"Topic {i}")
            plt.tight_layout()
            plt.show()

            # Topic Distribution
            topic_info = model.get_topic_info()
            topic_info = topic_info[topic_info.Topic != -1]
            plt.figure(figsize=(10, 6))
            sns.barplot(x='Topic', y='Count', data=topic_info, palette='viridis')
            plt.title('Document Distribution Across Topics')
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.show()

            # Hierarchical Topics
            hierarchical_topics = model.hierarchical_topics(docs)
            fig = model.visualize_hierarchy(hierarchical_topics=hierarchical_topics)
            fig.update_layout(width=800, height=600)
            fig.show()

            # Topic Similarity
            fig = model.visualize_heatmap()
            fig.update_layout(width=800, height=800)
            fig.show()

            return topic_info
        except Exception as e:
            print(f"Visualization failed: {str(e)}")
            return None

    def run_analysis(self):
        """Complete analysis workflow with robust error handling"""
        try:
            print("Upload your transcript file:")
            uploaded = files.upload()
            if not uploaded:
                raise ValueError("No file uploaded")

            filename = list(uploaded.keys())[0]
            docs = self.load_transcripts(filename)
            n_docs = len(docs)

            print(f"\nLoaded {n_docs} documents")
            if n_docs < 5:
                raise ValueError(f"Only {n_docs} documents found. Need at least 5.")

            print("Sample document:", docs[0][:100] + "...")

            # Initialize with dynamic parameters
            model = self.initialize_bertopic(n_docs)
            topics, probs = model.fit_transform(docs)

            # Calculate coherence
            coherence_scores = self.calculate_coherence(model, docs)
            print("\nCOHERENCE SCORES:")
            print(f"• C_V: {coherence_scores.get('c_v', 'NA'):.4f} (Higher is better, >0.4 good)")
            print(f"• UMass: {coherence_scores.get('u_mass', 'NA'):.4f} (Closer to 0 is better)")
            print(f"• NPMI: {coherence_scores.get('c_npmi', 'NA'):.4f} (Positive is good)")

            # Generate visualizations
            topic_info = self.visualize_results(model, docs)

            # Show topic details
            if topic_info is not None:
                print("\nTOPIC DETAILS:")
                for idx, row in topic_info.iterrows():
                    if row.Topic >= 0:
                        words = [word for word, _ in model.get_topic(row.Topic)][:10]
                        print(f"\nTopic {row.Topic} ({row.Count} docs): {', '.join(words)}")

            return model, topic_info, coherence_scores

        except Exception as e:
            print(f"\nERROR: {str(e)}")
            print("\nTroubleshooting Tips:")
            if "max_df" in str(e):
                print("- Try uploading more documents (10+ recommended)")
                print("- Or edit vectorizer parameters in initialize_bertopic()")
            elif "documents" in str(e):
                print("- Check file format (one document per line or --- separated)")
                print("- Ensure documents have >20 characters of meaningful text")
            return None, None, None

# Run the analysis
print("Starting enhanced BERTopic analysis...")
analyzer = BERTopicAnalyzer()
bertopic_model, bertopic_info, coherence_scores = analyzer.run_analysis()

In [None]:
#LDA Analysis with Comprehensive Visualizations
import pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt
import seaborn as sns
from google.colab import files
import warnings
warnings.filterwarnings('ignore')

# Install required packages
!pip install gensim pyLDAvis wordcloud

import gensim
from gensim import corpora
from gensim.models import LdaModel, CoherenceModel
import pyLDAvis.gensim_models as gensimvis
import pyLDAvis
from wordcloud import WordCloud
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

# Download NLTK data
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)

class EnhancedLDAAnalyzer:
    """Optimized LDA analyzer with enhanced visualizations"""

    def __init__(self):
        self.initialize_stopwords()
        self.coherence_metrics = ['c_v', 'u_mass', 'c_npmi']

    def initialize_stopwords(self):
        """Extended stopword lists for multilingual content"""
        self.english_stopwords = set(stopwords.words('english'))
        self.setswana_stopwords = {'hore', 'le', 'ka', 'ya', 'ba', 'e', 'sa'}
        self.asr_artifacts = {'hartha', 'ore', 'tele', 'um', 'uh', 'ah'}
        self.custom_fillers = {'going', 'come', 'one', 'make', 'take', 'time', 'said'}
        self.all_stopwords = (self.english_stopwords | self.setswana_stopwords |
                            self.asr_artifacts | self.custom_fillers)

    def load_transcripts(self, file_path):
        """Load and segment transcript files"""
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        if "---" in content or "TRANSCRIPT" in content.upper():
            sections = re.split(r'\n(?=---|\w+\s*TRANSCRIPT)', content, flags=re.IGNORECASE)
            return [' '.join([line.strip() for line in section.split('\n')
                    if not line.strip().startswith('---') and 'TRANSCRIPT' not in line.upper()]).strip()
                    for section in sections if section.strip()]
        else:
            return [line.strip() for line in content.split('\n') if line.strip() and len(line.strip()) > 20]

    def preprocess_text(self, texts):
        """Enhanced text cleaning pipeline"""
        processed_texts = []

        for text in texts:
            text = text.lower()
            text = re.sub(r'\[.*?\]|\(.*?\)', '', text)
            text = re.sub(r'[^\w\s]|\d+', ' ', text)
            text = re.sub(r'\s+', ' ', text).strip()

            try:
                tokens = word_tokenize(text)
            except:
                tokens = re.findall(r'\b[a-zA-Z]{3,}\b', text)

            tokens = [
                token for token in tokens
                if (token not in self.all_stopwords) and
                   (len(token) > 2) and
                   (token.isalpha())
            ]

            if len(tokens) > 3:
                processed_texts.append(tokens)

        return processed_texts

    def run_lda_analysis(self, texts, topic_range=[3, 5, 7]):
        """Complete LDA workflow with optimized settings"""
        dictionary = corpora.Dictionary(texts)
        dictionary.filter_extremes(
            no_below=3,
            no_above=0.7,
            keep_n=800
        )

        corpus = [dictionary.doc2bow(text) for text in texts]

        results = {}
        for num_topics in topic_range:
            lda_model = LdaModel(
                corpus=corpus,
                id2word=dictionary,
                num_topics=num_topics,
                random_state=42,
                passes=20,
                alpha='asymmetric',
                eta=0.01,
                iterations=400
            )

            coherence_scores = {}
            for metric in self.coherence_metrics:
                try:
                    cm = CoherenceModel(
                        model=lda_model,
                        texts=texts,
                        dictionary=dictionary,
                        coherence=metric
                    )
                    coherence_scores[metric] = cm.get_coherence()
                except Exception as e:
                    print(f"{metric.upper()} failed: {str(e)[:50]}")
                    coherence_scores[metric] = np.nan

            results[num_topics] = {
                'model': lda_model,
                'coherence': coherence_scores,
                'dictionary': dictionary,
                'corpus': corpus
            }

            print(f"\nTopics: {num_topics}")
            for metric, score in coherence_scores.items():
                if not np.isnan(score):
                    print(f"  {metric.upper()}: {score:.4f}")

        return results

    def create_coherence_plot(self, results):
        """Visualize coherence scores across topic numbers"""
        topics = list(results.keys())
        c_v = [res['coherence']['c_v'] for res in results.values()]
        u_mass = [res['coherence']['u_mass'] for res in results.values()]

        plt.figure(figsize=(10, 5))
        plt.plot(topics, c_v, 'o-', color='#1f77b4', label='C_V Coherence')
        plt.plot(topics, u_mass, 's--', color='#ff7f0e', label='U_Mass Coherence')
        plt.title('Topic Model Coherence Scores', fontsize=14)
        plt.xlabel('Number of Topics', fontsize=12)
        plt.ylabel('Coherence Score', fontsize=12)
        plt.xticks(topics)
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()

        print("\nCOHERENCE PLOT EXPLANATION:")
        print("• Shows how topic quality changes with different numbers of topics")
        print("• C_V (blue): Higher values (0.4+) indicate better topics")
        print("• U_Mass (orange): Values closer to 0 are better")
        print("• Optimal topic count is often at the 'elbow' point")

    def create_word_clouds(self, model, num_topics):
        """Generate word clouds for each topic"""
        plt.figure(figsize=(15, 8))
        plt.suptitle('Topic Word Clouds', fontsize=16)
        for i in range(num_topics):
            plt.subplot(2, (num_topics+1)//2, i+1)
            topic_words = dict(model.show_topic(i, 30))
            wordcloud = WordCloud(width=300, height=200,
                                background_color='white',
                                colormap='viridis').generate_from_frequencies(topic_words)
            plt.imshow(wordcloud)
            plt.axis("off")
            plt.title(f"Topic {i}", fontsize=10)
        plt.tight_layout()
        plt.show()

        print("\nWORD CLOUD EXPLANATION:")
        print("• Each cloud represents one topic")
        print("• Word size indicates importance in the topic")
        print("• Color intensity shows relative frequency")
        print("• Helps quickly identify dominant themes")

    def create_topic_distribution(self, corpus, model):
        """Show distribution of topics across documents"""
        doc_topics = [model.get_document_topics(doc) for doc in corpus]
        topic_counts = np.zeros(model.num_topics)

        for doc in doc_topics:
            if doc:
                dominant_topic = max(doc, key=lambda x: x[1])[0]
                topic_counts[dominant_topic] += 1

        plt.figure(figsize=(10, 5))
        ax = sns.barplot(x=np.arange(model.num_topics), y=topic_counts, palette='viridis')
        plt.title('Document Distribution Across Topics', fontsize=14)
        plt.xlabel('Topic Number', fontsize=12)
        plt.ylabel('Number of Documents', fontsize=12)

        # Add value labels
        for p in ax.patches:
            ax.annotate(f"{int(p.get_height())}",
                       (p.get_x() + p.get_width() / 2., p.get_height()),
                       ha='center', va='center',
                       xytext=(0, 5),
                       textcoords='offset points')
        plt.show()

        print("\nTOPIC DISTRIBUTION EXPLANATION:")
        print("• Shows how documents are distributed across topics")
        print("• Ideally should be relatively balanced")
        print("• Very large/small bars may indicate:")
        print("  - Topics that are too broad/narrow")
        print("  - Need to adjust topic numbers")

    def visualize_results(self, results):
        """Generate all visualizations with explanations"""
        best_num = max([(num, res['coherence']['c_v'])
                      for num, res in results.items()
                      if not np.isnan(res['coherence']['c_v'])])[0]
        best_model = results[best_num]['model']

        print("\n" + "="*60)
        print(f"VISUALIZING RESULTS FOR BEST MODEL ({best_num} TOPICS)")
        print("="*60)

        # 1. Coherence Plot
        print("\nGenerating coherence plot...")
        self.create_coherence_plot(results)

        # 2. Word Clouds
        print("\nGenerating word clouds...")
        self.create_word_clouds(best_model, best_num)

        # 3. Topic Distribution
        print("\nGenerating topic distribution...")
        self.create_topic_distribution(results[best_num]['corpus'], best_model)

        # 4. Interactive Visualization
        print("\nGenerating interactive visualization...")
        vis = gensimvis.prepare(
            best_model,
            results[best_num]['corpus'],
            results[best_num]['dictionary']
        )
        pyLDAvis.display(vis)

        print("\nINTERACTIVE VISUALIZATION GUIDE:")
        print("• Left panel: Shows most relevant terms for selected topic")
        print("• Right panel (Intertopic Distance Map):")
        print("  - Circle size = topic prevalence")
        print("  - Distance between circles = topic similarity")
        print("  - λ slider adjusts term relevance (1=common, 0=distinctive)")
        print("• Hover over elements for detailed information")

def run_enhanced_analysis():
    analyzer = EnhancedLDAAnalyzer()

    print("Please upload your transcript file (TXT format):")
    uploaded = files.upload()
    filename = list(uploaded.keys())[0]

    print("\nLoading and preprocessing transcripts...")
    raw_texts = analyzer.load_transcripts(filename)
    processed_texts = analyzer.preprocess_text(raw_texts)

    print(f"\nInitial documents: {len(raw_texts)}")
    print(f"Valid documents after preprocessing: {len(processed_texts)}")

    if len(processed_texts) < 5:
        print("Error: Need at least 5 valid documents for analysis")
        return

    print("\nRunning enhanced LDA analysis...")
    results = analyzer.run_lda_analysis(
        processed_texts,
        topic_range=[3, 5, 7]
    )

    print("\nGenerating visualizations...")
    analyzer.visualize_results(results)

# Execute the analysis
run_enhanced_analysis()