# **Sentiment Analyzer**

In [None]:
"""Advanced Sentiment Analysis System with Gradio Interface
Supports both traditional ML (TF-IDF + SVM) and Transformer-based approaches
"""

import pandas as pd
import numpy as np
import re
import pickle
import gradio as gr
from typing import Dict, List, Tuple, Optional
import matplotlib.pyplot as plt
import seaborn as sns
from io import BytesIO
import base64

# Traditional ML imports
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, classification_report
from sklearn.pipeline import Pipeline

# NLP preprocessing imports
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.sentiment import SentimentIntensityAnalyzer

# Transformer imports
try:
    from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
    TRANSFORMERS_AVAILABLE = True
except ImportError:
    TRANSFORMERS_AVAILABLE = False
    print("Transformers not available. Install with: pip install transformers torch")

# Download required NLTK data
def download_nltk_data():
    """Download all required NLTK data"""
    required_data = [
        ('tokenizers/punkt', 'punkt'),
        ('tokenizers/punkt_tab', 'punkt_tab'),
        ('corpora/stopwords', 'stopwords'),
        ('corpora/wordnet', 'wordnet'),
        ('corpora/omw-1.4', 'omw-1.4'),
        ('vader_lexicon', 'vader_lexicon')
    ]

    for data_path, data_name in required_data:
        try:
            nltk.data.find(data_path)
        except LookupError:
            try:
                nltk.download(data_name, quiet=True)
            except Exception as e:
                print(f"Warning: Could not download {data_name}: {e}")

# Download NLTK data
download_nltk_data()

class TextPreprocessor:
    """Handle text preprocessing tasks"""

    def __init__(self):
        try:
            self.stop_words = set(stopwords.words('english'))
        except LookupError:
            # Fallback stopwords if NLTK data not available
            self.stop_words = {
                'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your',
                'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she',
                'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their',
                'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that',
                'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
                'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an',
                'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of',
                'at', 'by', 'for', 'with', 'through', 'during', 'before', 'after', 'above',
                'below', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again',
                'further', 'then', 'once'
            }

        try:
            self.lemmatizer = WordNetLemmatizer()
        except:
            self.lemmatizer = None

    def clean_text(self, text: str) -> str:
        """Clean and normalize text"""
        if not isinstance(text, str):
            return ""

        # Remove URLs
        text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)

        # Remove user mentions and hashtags
        text = re.sub(r'@\w+|#\w+', '', text)

        # Remove extra whitespace and newlines
        text = re.sub(r'\s+', ' ', text)

        # Remove special characters but keep basic punctuation
        text = re.sub(r'[^a-zA-Z0-9\s\.\!\?]', '', text)

        return text.strip()

    def preprocess_text(self, text: str, remove_stopwords: bool = True,
                       lemmatize: bool = True) -> str:
        """Complete preprocessing pipeline"""
        # Clean text
        text = self.clean_text(text)

        # Convert to lowercase
        text = text.lower()

        # Tokenize - handle different NLTK versions
        try:
            tokens = word_tokenize(text)
        except LookupError:
            # Fallback to simple split if tokenizer not available
            tokens = text.split()

        # Remove stopwords
        if remove_stopwords:
            tokens = [token for token in tokens if token not in self.stop_words and len(token) > 1]

        # Lemmatize
        if lemmatize and self.lemmatizer:
            tokens = [self.lemmatizer.lemmatize(token) for token in tokens]

        return ' '.join(tokens)

class SentimentAnalyzer:
    """Main sentiment analysis class supporting multiple approaches"""

    def __init__(self):
        self.preprocessor = TextPreprocessor()
        self.traditional_model = None
        self.transformer_model = None
        self.model_type = None
        self.label_map = {0: 'Negative', 1: 'Neutral', 2: 'Positive'}
        self.reverse_label_map = {'Negative': 0, 'Neutral': 1, 'Positive': 2}

    def load_custom_data(self, file) -> Optional[pd.DataFrame]:
        """Load custom dataset from uploaded file"""
        try:
            # Handle different file input types from Gradio
            if hasattr(file, 'name'):
                file_path = str(file)
            else:
                file_path = file

            # Try to read CSV file
            try:
                df = pd.read_csv(file_path)
            except Exception as e:
                print(f"Error reading CSV: {e}")
                return None

            # Validate required columns
            required_columns = ['text', 'sentiment']
            if not all(col in df.columns for col in required_columns):
                # Try common column name variations
                column_mappings = {
                    'review': 'text',
                    'comment': 'text',
                    'message': 'text',
                    'content': 'text',
                    'label': 'sentiment',
                    'class': 'sentiment',
                    'category': 'sentiment',
                    'rating': 'sentiment'
                }

                # Apply mappings
                df = df.rename(columns=column_mappings)

                # Check again
                if not all(col in df.columns for col in required_columns):
                    available_cols = list(df.columns)
                    print(f"Required columns 'text' and 'sentiment' not found. Available columns: {available_cols}")
                    return None

            # Clean and validate data
            df = df.dropna(subset=['text', 'sentiment'])
            df['text'] = df['text'].astype(str)
            df['sentiment'] = df['sentiment'].astype(str)

            # Standardize sentiment labels
            df['sentiment'] = df['sentiment'].apply(self._standardize_sentiment_label)

            # Remove any rows with unrecognized sentiment labels
            valid_labels = {'Positive', 'Negative', 'Neutral'}
            df = df[df['sentiment'].isin(valid_labels)]

            if len(df) == 0:
                print("No valid data rows found after processing.")
                return None

            print(f"Successfully loaded {len(df)} rows of custom data.")
            print(f"Sentiment distribution: {df['sentiment'].value_counts().to_dict()}")

            return df

        except Exception as e:
            print(f"Error loading custom data: {e}")
            return None

    def _standardize_sentiment_label(self, label: str) -> str:
        """Standardize sentiment labels to Positive/Negative/Neutral"""
        label = str(label).lower().strip()

        # Positive indicators
        if label in ['positive', 'pos', '1', 'good', 'happy', 'like', 'love', '4', '5']:
            return 'Positive'

        # Negative indicators
        elif label in ['negative', 'neg', '0', 'bad', 'sad', 'hate', 'dislike', '1', '2']:
            return 'Negative'

        # Neutral indicators
        elif label in ['neutral', 'neu', '2', 'okay', 'ok', 'average', 'mixed', '3']:
            return 'Neutral'

        # Try to handle numeric ratings (assuming 1-5 scale)
        try:
            rating = float(label)
            if rating >= 4:
                return 'Positive'
            elif rating <= 2:
                return 'Negative'
            else:
                return 'Neutral'
        except:
            pass

        # Default to neutral for unrecognized labels
        return 'Neutral'

    def load_sample_data(self) -> pd.DataFrame:
        """Create sample dataset for demonstration"""
        sample_data = [
            ("I love this movie! It's absolutely fantastic.", "Positive"),
            ("This film is terrible. Waste of time.", "Negative"),
            ("The movie was okay, nothing special.", "Neutral"),
            ("Amazing acting and great storyline!", "Positive"),
            ("Boring and predictable plot.", "Negative"),
            ("The film has its moments but overall average.", "Neutral"),
            ("Brilliant cinematography and outstanding performances!", "Positive"),
            ("Poor direction and weak script.", "Negative"),
            ("It's an acceptable movie for passing time.", "Neutral"),
            ("Masterpiece! One of the best films ever made.", "Positive"),
            ("Completely disappointed with this movie.", "Negative"),
            ("The movie is fine, not bad but not great either.", "Neutral"),
            ("Exceptional storytelling and character development.", "Positive"),
            ("The worst movie I've ever seen.", "Negative"),
            ("Standard Hollywood production, nothing remarkable.", "Neutral"),
            ("This movie changed my perspective on life!", "Positive"),
            ("Overrated and overhyped garbage.", "Negative"),
            ("Decent enough for a single watch.", "Neutral"),
            ("Incredible visual effects and sound design.", "Positive"),
            ("Confusing plot and poor character development.", "Negative")
        ]

        return pd.DataFrame(sample_data, columns=['text', 'sentiment'])

    def prepare_data(self, df: pd.DataFrame) -> Tuple[List[str], List[int]]:
        """Prepare data for training"""
        # Preprocess texts
        processed_texts = [self.preprocessor.preprocess_text(text) for text in df['text']]

        # Convert labels to numeric
        labels = [self.reverse_label_map.get(label, 1) for label in df['sentiment']]

        return processed_texts, labels

    def train_traditional_model(self, df: pd.DataFrame, model_type: str = 'svm') -> Dict:
        """Train traditional ML model with TF-IDF features"""
        texts, labels = self.prepare_data(df)

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            texts, labels, test_size=0.2, random_state=42, stratify=labels
        )

        # Choose model
        if model_type == 'svm':
            model = SVC(kernel='rbf', C=1.0, probability=True, random_state=42)
        elif model_type == 'rf':
            model = RandomForestClassifier(n_estimators=100, random_state=42)
        else:
            model = LogisticRegression(random_state=42, max_iter=1000)

        # Create pipeline
        pipeline = Pipeline([
            ('tfidf', TfidfVectorizer(max_features=5000, ngram_range=(1, 2))),
            ('classifier', model)
        ])

        # Train model
        pipeline.fit(X_train, y_train)

        # Evaluate
        y_pred = pipeline.predict(X_test)
        metrics = self._calculate_metrics(y_test, y_pred)

        self.traditional_model = pipeline
        self.model_type = 'traditional'

        return {
            'model': pipeline,
            'metrics': metrics,
            'test_texts': X_test,
            'y_true': y_test,
            'y_pred': y_pred
        }

    def load_transformer_model(self, model_name: str = "cardiffnlp/twitter-roberta-base-sentiment-latest"):
        """Load pre-trained transformer model"""
        if not TRANSFORMERS_AVAILABLE:
            raise ImportError("Transformers library not available")

        try:
            self.transformer_model = pipeline(
                "text-classification",
                model=model_name,
                tokenizer=model_name,
                return_all_scores=True
            )
            self.model_type = 'transformer'
            return True
        except Exception as e:
            print(f"Error loading transformer model: {e}")
            return False

    def _calculate_metrics(self, y_true: List[int], y_pred: List[int]) -> Dict:
        """Calculate performance metrics"""
        accuracy = accuracy_score(y_true, y_pred)
        precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')

        return {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1_score': f1
        }

    def predict_traditional(self, text: str) -> Dict:
        """Predict using traditional model"""
        if self.traditional_model is None:
            return {"error": "Traditional model not trained"}

        processed_text = self.preprocessor.preprocess_text(text)
        prediction = self.traditional_model.predict([processed_text])[0]
        probabilities = self.traditional_model.predict_proba([processed_text])[0]

        return {
            'prediction': self.label_map[prediction],
            'confidence': float(max(probabilities)),
            'probabilities': {
                self.label_map[i]: float(prob)
                for i, prob in enumerate(probabilities)
            }
        }

    def predict_transformer(self, text: str) -> Dict:
        """Predict using transformer model"""
        if self.transformer_model is None:
            return {"error": "Transformer model not loaded"}

        try:
            results = self.transformer_model(text)[0]

            # Map transformer labels to our format
            label_mapping = {
                'LABEL_0': 'Negative', 'NEGATIVE': 'Negative',
                'LABEL_1': 'Neutral', 'NEUTRAL': 'Neutral',
                'LABEL_2': 'Positive', 'POSITIVE': 'Positive'
            }

            mapped_results = []
            for result in results:
                label = label_mapping.get(result['label'], result['label'])
                mapped_results.append({'label': label, 'score': result['score']})

            # Find best prediction
            best_result = max(mapped_results, key=lambda x: x['score'])

            return {
                'prediction': best_result['label'],
                'confidence': float(best_result['score']),
                'probabilities': {r['label']: float(r['score']) for r in mapped_results}
            }
        except Exception as e:
            return {"error": f"Prediction error: {str(e)}"}

    def predict(self, text: str) -> Dict:
        """Predict sentiment using the active model"""
        if self.model_type == 'traditional':
            return self.predict_traditional(text)
        elif self.model_type == 'transformer':
            return self.predict_transformer(text)
        else:
            return {"error": "No model loaded"}

    def analyze_file(self, file_path: str) -> List[Dict]:
        """Analyze sentiment for text file"""
        try:
            # Handle different input types
            if hasattr(file_path, 'name'):
                # Gradio file object
                actual_path = str(file_path)
            else:
                actual_path = file_path

            with open(actual_path, 'r', encoding='utf-8') as f:
                lines = f.readlines()

            results = []
            for i, line in enumerate(lines):
                line = line.strip()
                if line:
                    result = self.predict(line)
                    result['line_number'] = i + 1
                    result['text'] = line
                    results.append(result)

            return results
        except Exception as e:
            return [{"error": f"File processing error: {str(e)}"}]

# Global analyzer instance
analyzer = SentimentAnalyzer()

def train_model(data_source: str, model_type: str, custom_file=None) -> str:
    """Train the sentiment analysis model"""
    try:
        # Load data based on source
        if data_source == "Sample Data":
            df = analyzer.load_sample_data()
            data_info = f"Using sample data with {len(df)} examples."
        else:
            if custom_file is None:
                return "Please upload a CSV file for custom data training."

            # Load custom data
            df = analyzer.load_custom_data(custom_file)
            if df is None:
                return "Failed to load custom data. Please check your CSV format and ensure it has 'text' and 'sentiment' columns."

            data_info = f"Successfully loaded custom data with {len(df)} examples."

        # Show data distribution
        sentiment_dist = df['sentiment'].value_counts().to_dict()
        dist_info = "\nSentiment Distribution:\n" + "\n".join([f"- {k}: {v}" for k, v in sentiment_dist.items()])

        if model_type == "Traditional ML (TF-IDF + SVM)":
            result = analyzer.train_traditional_model(df, 'svm')
            metrics = result['metrics']
            return f"""{data_info}{dist_info}

Model trained successfully!

Performance Metrics:
- Accuracy: {metrics['accuracy']:.3f}
- Precision: {metrics['precision']:.3f}
- Recall: {metrics['recall']:.3f}
- F1-Score: {metrics['f1_score']:.3f}

Model ready for predictions."""

        elif model_type == "Transformer (BERT-based)":
            if analyzer.load_transformer_model():
                return f"""{data_info}{dist_info}

Transformer model loaded successfully!

Note: Pre-trained transformer models don't require training on your specific data,
but they provide state-of-the-art performance out of the box.

Model ready for predictions."""
            else:
                return "Failed to load transformer model. Please check your internet connection or try the Traditional ML option."

    except Exception as e:
        return f"Training failed: {str(e)}\n\nPlease check your data format and try again."

def predict_sentiment(text: str) -> str:
    """Predict sentiment for input text"""
    if not text.strip():
        return "Please enter some text to analyze."

    # Check if any model is loaded
    if analyzer.model_type is None:
        return "No model is currently loaded. Please train a model first in the 'Model Training' tab."

    try:
        result = analyzer.predict(text)

        if 'error' in result:
            return f"Error: {result['error']}"

        output = f"""Prediction: {result['prediction']}
Confidence: {result['confidence']:.3f}

Probability Scores:"""

        for label, prob in result['probabilities'].items():
            output += f"\n- {label}: {prob:.3f}"

        return output

    except Exception as e:
        return f"Prediction failed: {str(e)}"

def analyze_file_content(file) -> str:
    """Analyze sentiment for uploaded file"""
    if file is None:
        return "Please upload a text file."

    # Check if any model is loaded
    if analyzer.model_type is None:
        return "No model is currently loaded. Please train a model first in the 'Model Training' tab."

    try:
        # Handle different file input types from Gradio
        if hasattr(file, 'name'):
            # file is a file path string or NamedString
            file_path = str(file)
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
        else:
            # file is already file content
            content = str(file)

        lines = content.split('\n')

        results = []
        for i, line in enumerate(lines):
            line = line.strip()
            if line:
                result = analyzer.predict(line)
                if 'error' not in result:
                    results.append({
                        'line': i + 1,
                        'text': line[:50] + "..." if len(line) > 50 else line,
                        'sentiment': result['prediction'],
                        'confidence': result['confidence']
                    })

        if not results:
            return "No valid text found in the file."

        # Create summary
        sentiment_counts = {}
        total_confidence = 0

        for result in results:
            sentiment = result['sentiment']
            sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1
            total_confidence += result['confidence']

        avg_confidence = total_confidence / len(results)

        output = f"""File Analysis Complete!

Total lines analyzed: {len(results)}
Average confidence: {avg_confidence:.3f}

Sentiment Distribution:"""

        for sentiment, count in sentiment_counts.items():
            percentage = (count / len(results)) * 100
            output += f"\n- {sentiment}: {count} ({percentage:.1f}%)"

        output += "\n\nSample Results:"
        for result in results[:5]:  # Show first 5 results
            output += f"\nLine {result['line']}: {result['text']} → {result['sentiment']} ({result['confidence']:.3f})"

        if len(results) > 5:
            output += f"\n... and {len(results) - 5} more lines."

        return output

    except Exception as e:
        return f"File analysis failed: {str(e)}"

def create_interface():
    """Create the Gradio interface"""

    with gr.Blocks(title="Advanced Sentiment Analyzer", theme=gr.themes.Soft()) as interface:
        gr.Markdown("# 🎭 Advanced Sentiment Analysis System")
        gr.Markdown("Train models and analyze sentiment in text using traditional ML or transformer approaches.")

        with gr.Tab("Model Training"):
            gr.Markdown("## Train Your Sentiment Analysis Model")

            with gr.Row():
                data_source = gr.Dropdown(
                    choices=["Sample Data", "Upload Custom Data"],
                    value="Sample Data",
                    label="Data Source"
                )
                model_type = gr.Dropdown(
                    choices=["Traditional ML (TF-IDF + SVM)", "Transformer (BERT-based)"],
                    value="Traditional ML (TF-IDF + SVM)",
                    label="Model Type"
                )

            # Custom data upload section
            with gr.Row():
                custom_data_file = gr.File(
                    label="Upload Custom Dataset (CSV)",
                    file_types=[".csv"],
                    type="filepath",
                    visible=False
                )

            # Data format information
            data_format_info = gr.Markdown(
                """
                ### Custom Data Format Requirements:
                Your CSV file should have these columns:
                - **text** (or review/comment/message): The text to analyze
                - **sentiment** (or label/class): The sentiment label

                **Supported sentiment labels:**
                - Text: "Positive", "Negative", "Neutral" (or "pos", "neg", "neu")
                - Numeric: 1-5 scale (1-2=Negative, 3=Neutral, 4-5=Positive)
                - Binary: 0=Negative, 1=Positive

                **Example CSV format:**
                ```
                text,sentiment
                "I love this product!",Positive
                "This is terrible",Negative
                "It's okay",Neutral
                ```
                """,
                visible=False
            )

            def update_visibility(data_source):
                if data_source == "Upload Custom Data":
                    return gr.update(visible=True), gr.update(visible=True)
                else:
                    return gr.update(visible=False), gr.update(visible=False)

            data_source.change(
                fn=update_visibility,
                inputs=data_source,
                outputs=[custom_data_file, data_format_info]
            )

            train_btn = gr.Button("Train Model", variant="primary")
            training_output = gr.Textbox(label="Training Results", lines=10)

            train_btn.click(
                fn=train_model,
                inputs=[data_source, model_type, custom_data_file],
                outputs=training_output
            )

        with gr.Tab("Text Analysis"):
            gr.Markdown("## Analyze Individual Text")

            text_input = gr.Textbox(
                label="Enter Text to Analyze",
                placeholder="Type your text here...",
                lines=3
            )
            analyze_btn = gr.Button("Analyze Sentiment", variant="primary")
            text_output = gr.Textbox(label="Analysis Results", lines=8)

            analyze_btn.click(
                fn=predict_sentiment,
                inputs=text_input,
                outputs=text_output
            )

            # Example inputs
            gr.Examples(
                examples=[
                    ["I absolutely love this product! It exceeded all my expectations."],
                    ["This movie was terrible. Complete waste of time."],
                    ["The service was okay, nothing special but not bad either."],
                    ["Amazing customer support! They solved my problem quickly."],
                    ["I'm not sure how I feel about this new update."]
                ],
                inputs=text_input
            )

        with gr.Tab("File Analysis"):
            gr.Markdown("## Analyze Text Files")
            gr.Markdown("Upload a text file (.txt) with one sentence/review per line.")

            file_input = gr.File(
                label="Upload Text File",
                file_types=[".txt"],
                type="filepath"  # This ensures we get the file path
            )
            file_btn = gr.Button("Analyze File", variant="primary")
            file_output = gr.Textbox(label="File Analysis Results", lines=15)

            file_btn.click(
                fn=analyze_file_content,
                inputs=file_input,
                outputs=file_output
            )

        with gr.Tab("About"):
            gr.Markdown("""
            ## About This Sentiment Analyzer

            This advanced sentiment analysis system supports two approaches:

            ### Traditional ML Approach
            - **Features**: TF-IDF vectorization with n-grams
            - **Model**: Support Vector Machine (SVM) with RBF kernel
            - **Preprocessing**: NLTK-based text cleaning, tokenization, and lemmatization
            - **Performance**: Fast inference, lightweight model

            ### Transformer Approach
            - **Model**: Pre-trained RoBERTa-based sentiment classifier
            - **Features**: Contextual embeddings from transformer architecture
            - **Performance**: Higher accuracy, slower inference

            ### Features
            - ✅ Text preprocessing with NLTK
            - ✅ Multiple model support (SVM, Random Forest, Logistic Regression)
            - ✅ Transformer integration with HuggingFace
            - ✅ File batch processing
            - ✅ Performance metrics reporting
            - ✅ Confidence scores and probability distributions
            - ✅ Modular, extensible codebase

            ### Usage
            1. **Train**: Select data source and model type, then click "Train Model"
            2. **Analyze**: Enter text or upload files for sentiment analysis
            3. **Results**: View predictions with confidence scores and probabilities
            """)

    return interface

if __name__ == "__main__":
    # Create and launch the interface
    interface = create_interface()
    interface.launch(
        share=True,
        debug=True,
        show_error=True
    )

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://b7676fe1e98f6d1dc9.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
