In [2]:
!pip install google-api-python-client nltk contractions emoji tensorflow transformers gradio seaborn



In [4]:
# YouTube Comments Sentiment Analysis Tool
import os
import re
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.layers import (Embedding, LSTM, Dense, Dropout,
                                    Bidirectional, Input, concatenate)
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import contractions
import emoji
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification
import gradio as gr
import pickle
import warnings
warnings.filterwarnings('ignore')

# Download NLTK resources
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

# Custom sentence tokenizer fallback
def custom_sent_tokenize(text, language='english'):
    sentences = re.split(r'(?<=[.!?])\s+', text)
    return [s for s in sentences if s.strip()]

try:
    nltk.download('punkt_tab')
except:
    import nltk.tokenize
    nltk.tokenize.sent_tokenize = custom_sent_tokenize

# Load external configurations
def load_config(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    except FileNotFoundError:
        print(f"Warning: {file_path} not found.")
        return {}
    except json.JSONDecodeError:
        print(f"Error: {file_path} is not valid JSON.")
        return {}

# Initialize components
lemmatizer = WordNetLemmatizer()
indian_slangs = load_config('indian_slangs.json')

# YouTube API setup
API_KEY = 'AIzaSyCAygSBfRdqjdF2s11vQ_Or3h_cCJeEGrA'
youtube = build('youtube', 'v3', developerKey=API_KEY)

def extract_video_id(url):
    patterns = [
        r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
        r'(?:embed\/)([0-9A-Za-z_-]{11})',
        r'(?:shorts\/)([0-9A-Za-z_-]{11})',
        r'youtu\.be\/([0-9A-Za-z_-]{11})'
    ]
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    return None

def get_video_title(video_id):
    try:
        response = youtube.videos().list(part='snippet', id=video_id).execute()
        return response['items'][0]['snippet']['title'] if 'items' in response else "Title not found"
    except HttpError as e:
        return f"Error retrieving title: {str(e)}"

def get_video_comments(video_id, max_comments=100):
    comments = []
    next_page_token = None

    while len(comments) < max_comments:
        try:
            response = youtube.commentThreads().list(
                part='snippet',
                videoId=video_id,
                maxResults=min(100, max_comments - len(comments)),
                pageToken=next_page_token,
                textFormat='plainText'
            ).execute()

            for item in response['items']:
                comment = item['snippet']['topLevelComment']['snippet']
                comments.append({
                    'author': comment['authorDisplayName'],
                    'text': comment['textDisplay'],
                    'likes': comment['likeCount'],
                    'published_at': comment['publishedAt']
                })

            next_page_token = response.get('nextPageToken')
            if not next_page_token or len(comments) >= max_comments:
                break

        except HttpError as e:
            print(f"Error fetching comments: {e}")
            break

    return comments

def preprocess_text(text):
    text = text.lower()
    text = contractions.fix(text)
    text = re.sub(r'<.*?>', '', text)
    text = emoji.demojize(text)

    for slang, meaning in indian_slangs.items():
        text = re.sub(r'\b' + slang + r'\b', meaning, text)

    text = re.sub(r'http\S+|www\S+|https\S+', '', text)
    text = re.sub(r'@\w+', '', text)
    text = re.sub(r'#\w+', '', text)
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\d+', '', text)

    tokens = nltk.word_tokenize(text)
    stop_words = set(stopwords.words('english'))
    tokens = [lemmatizer.lemmatize(word) for word in tokens if word not in stop_words and len(word) > 1]

    return ' '.join(tokens)

def load_training_videos(file_path='training_videos.json'):
    config = load_config(file_path)
    if not config:
        return []

    videos = []
    for video in config.get('training_videos', []):
        video_id = extract_video_id(video['url'])
        if video_id:
            videos.append({
                'id': video_id,
                'metadata': {
                    'language': video.get('language', 'hindi'),
                    'category': video.get('category', 'general')
                }
            })
    return videos

def generate_training_data():
    training_videos = load_training_videos()

    if not training_videos:
        print("Warning: No training videos configured, using fallback defaults")
        training_videos = [{'id': vid} for vid in [
            'KVh4KtUSW3A', 'JKa05nyUmuQ', 'H9154xIoYTA'
        ]]

    all_comments = []
    for video in training_videos:
        comments = get_video_comments(video['id'], max_comments=100)
        for comment in comments:
            if comment['text'].strip():
                all_comments.append({
                    'text': comment['text'],
                    'metadata': video.get('metadata', {})
                })

    processed_comments = [preprocess_text(comment['text']) for comment in all_comments]

    # Enhanced labeler with metadata awareness
    def label_comment(text, metadata):
        # Language-specific sentiment words
        lang = metadata.get('language', 'hindi')
        category = metadata.get('category', 'general')

        if lang == 'hindi':
            positive_words = ['mast', 'badhiya', 'shandaar', 'awesome', 'best', 'अच्छा', 'बेहतरीन']
            negative_words = ['bakwas', 'ganda', 'worst', 'boring', 'खराब', 'बेकार']
        else:
            positive_words = ['great', 'awesome', 'love', 'best', 'excellent']
            negative_words = ['bad', 'worst', 'hate', 'awful', 'poor']

        # Category-specific adjustments
        if category == 'comic':
            positive_words.extend(['funny', 'हंसी', 'मजाक'])
            negative_words.extend(['unfunny', 'बोरिंग'])
        elif category == 'horror':
            positive_words.extend(['scary', 'डरावना', 'thrilling'])
            negative_words.extend(['boring', 'unscary'])

        text_lower = text.lower()
        pos_score = sum(1 for word in positive_words if word in text_lower)
        neg_score = sum(1 for word in negative_words if word in text_lower)

        # Additional rules for strong sentiment
        if 'masterpiece' in text_lower or 'बेहतरीन' in text_lower:
            pos_score += 2
        if 'trash' in text_lower or 'बेकार' in text_lower:
            neg_score += 2

        if pos_score > neg_score:
            return 2  # positive
        elif neg_score > pos_score:
            return 0  # negative
        return 1  # neutral

    labels = [
        label_comment(text, comment['metadata'])
        for text, comment in zip(processed_comments, all_comments)
    ]

    # Create metadata features
    metadata_features = []
    for comment in all_comments:
        meta = comment['metadata']
        features = [
            1 if meta.get('language') == 'hindi' else 0,
            1 if meta.get('category') in ['comic', 'music'] else 0,
            1 if meta.get('category') in ['horror', 'geopolitics'] else 0
        ]
        metadata_features.append(features)

    return processed_comments, labels, np.array(metadata_features)

class EnhancedSentimentModel:
    def __init__(self, model_path=None):
        self.tokenizer = None
        self.model = None
        self.max_length = 100
        self.model_path = model_path
        self.labels = ['negative', 'neutral', 'positive']

    def build_model(self, vocab_size=10000, embedding_dim=128):
        # Text input branch
        text_input = Input(shape=(self.max_length,), name='text_input')
        x = Embedding(vocab_size, embedding_dim)(text_input)
        x = Bidirectional(LSTM(64, return_sequences=True))(x)
        x = Bidirectional(LSTM(32))(x)

        # Metadata input branch
        meta_input = Input(shape=(3,), name='metadata_input')

        # Merge branches
        merged = concatenate([x, meta_input])

        # Dense layers
        x = Dense(64, activation='relu')(merged)
        x = Dropout(0.5)(x)
        output = Dense(3, activation='softmax')(x)

        self.model = Model(inputs=[text_input, meta_input], outputs=output)
        self.model.compile(
            loss='categorical_crossentropy',
            optimizer='adam',
            metrics=['accuracy']
        )
        return self.model

    def train(self, texts, labels, metadata, epochs=10, batch_size=32):
        self.tokenizer = Tokenizer(num_words=10000, oov_token="<OOV>")
        self.tokenizer.fit_on_texts(texts)

        sequences = self.tokenizer.texts_to_sequences(texts)
        padded_sequences = pad_sequences(sequences, maxlen=self.max_length, padding='post', truncating='post')
        categorical_labels = tf.keras.utils.to_categorical(labels, num_classes=3)

        X_train, X_val, m_train, m_val, y_train, y_val = train_test_split(
            padded_sequences, metadata, categorical_labels,
            test_size=0.2, random_state=42
        )

        callbacks = [
            EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True),
            ModelCheckpoint('enhanced_sentiment_model.h5', monitor='val_accuracy',
                          save_best_only=True, mode='max')
        ]

        history = self.model.fit(
            [X_train, m_train], y_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=([X_val, m_val], y_val),
            callbacks=callbacks
        )

        with open('enhanced_tokenizer.pickle', 'wb') as handle:
            pickle.dump(self.tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)

        return history

    def load(self):
        if self.model_path and os.path.exists(self.model_path):
            self.model = load_model(self.model_path)
            try:
                with open('enhanced_tokenizer.pickle', 'rb') as handle:
                    self.tokenizer = pickle.load(handle)
            except:
                print("Warning: Could not load tokenizer")
        else:
            model_name = "cardiffnlp/twitter-roberta-base-sentiment"
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = TFAutoModelForSequenceClassification.from_pretrained(model_name)

    def predict(self, texts, metadata=None):
        if isinstance(texts, str):
            texts = [texts]

        if isinstance(self.model, Model):  # Our custom model
            processed_texts = [preprocess_text(text) for text in texts]
            sequences = self.tokenizer.texts_to_sequences(processed_texts)
            padded_sequences = pad_sequences(sequences, maxlen=self.max_length, padding='post', truncating='post')

            # Default metadata if not provided
            if metadata is None:
                metadata = np.zeros((len(texts), 3))  # Neutral defaults

            predictions = self.model.predict([padded_sequences, metadata])
            predicted_classes = np.argmax(predictions, axis=1)
            results = [self.labels[pred] for pred in predicted_classes]
            confidence = np.max(predictions, axis=1)

            # Post-processing for strong sentiment indicators
            final_results = []
            for text, (pred, conf) in zip(texts, zip(results, confidence)):
                text_lower = text.lower()
                if 'masterpiece' in text_lower and pred == 'neutral':
                    final_results.append(('positive', min(1.0, conf + 0.3)))
                elif 'trash' in text_lower and pred == 'neutral':
                    final_results.append(('negative', min(1.0, conf + 0.3)))
                else:
                    final_results.append((pred, conf))
            return final_results
        else:  # Transformer model
            results = []
            for text in texts:
                inputs = self.tokenizer(preprocess_text(text), return_tensors="tf", padding=True, truncation=True)
                outputs = self.model(inputs)
                predictions = tf.nn.softmax(outputs.logits, axis=-1)
                predicted_class = tf.argmax(predictions, axis=1).numpy()[0]
                confidence = tf.reduce_max(predictions, axis=1).numpy()[0]
                sentiment_map = {0: 'negative', 1: 'neutral', 2: 'positive'}
                results.append((sentiment_map[predicted_class], confidence))
            return results

# Analyzing comments and visualizing results
def analyze_comments(comments):
    analyzer = EnhancedSentimentModel('enhanced_sentiment_model.h5')
    try:
        analyzer.load()
        print("Loaded enhanced model successfully.")
    except:
        print("Training a new enhanced model...")
        texts, labels, metadata = generate_training_data()
        analyzer.build_model()
        analyzer.train(texts, labels, metadata, epochs=5)

    comment_texts = [comment['text'] for comment in comments]

    # Create metadata for prediction (default to Hindi general)
    pred_metadata = np.array([[1, 0, 0]] * len(comment_texts))  # All Hindi

    sentiment_results = analyzer.predict(comment_texts, pred_metadata)

    results = []
    for i, (comment, (sentiment, confidence)) in enumerate(zip(comments, sentiment_results)):
        results.append({
            'author': comment['author'],
            'text': comment['text'],
            'likes': comment['likes'],
            'sentiment': sentiment,
            'confidence': float(confidence)
        })

    sentiment_counts = {
        'positive': sum(1 for r in results if r['sentiment'] == 'positive'),
        'neutral': sum(1 for r in results if r['sentiment'] == 'neutral'),
        'negative': sum(1 for r in results if r['sentiment'] == 'negative')
    }

    total = len(results)
    sentiment_percentages = {
        'positive': (sentiment_counts['positive'] / total) * 100 if total > 0 else 0,
        'neutral': (sentiment_counts['neutral'] / total) * 100 if total > 0 else 0,
        'negative': (sentiment_counts['negative'] / total) * 100 if total > 0 else 0
    }

    top_comments = sorted(results, key=lambda x: x['likes'], reverse=True)[:10]

    return {
        'results': results,
        'top_comments': top_comments,
        'sentiment_counts': sentiment_counts,
        'sentiment_percentages': sentiment_percentages
    }

# Function to generate summary
def generate_summary(analysis_results, video_title):
    """Generate a summary of sentiment analysis results."""
    results = analysis_results['results']
    sentiment_percentages = analysis_results['sentiment_percentages']

    # Overall sentiment
    dominant_sentiment = max(sentiment_percentages.items(), key=lambda x: x[1])[0]

    # Calculate average confidence
    avg_confidence = sum(r['confidence'] for r in results) / len(results) if results else 0

    # Example summary template
    summary = f"""
## Sentiment Analysis Summary for: "{video_title}"

### Overall Sentiment: {dominant_sentiment.capitalize()}

- **Positive comments**: {sentiment_percentages['positive']:.1f}%
- **Neutral comments**: {sentiment_percentages['neutral']:.1f}%
- **Negative comments**: {sentiment_percentages['negative']:.1f}%

The average confidence of sentiment predictions is {avg_confidence:.2f}.

### Key Observations:
- The dominant sentiment is {dominant_sentiment}, suggesting viewers generally {'enjoyed' if dominant_sentiment == 'positive' else 'had mixed feelings about' if dominant_sentiment == 'neutral' else 'disliked'} this content.
- {'Most comments were positive, indicating good reception.' if sentiment_percentages['positive'] > 50 else ''}
- {'There is a significant amount of neutral comments, suggesting room for improvement.' if sentiment_percentages['neutral'] > 30 else ''}
- {'The negative comments suggest areas of concern that may need addressing.' if sentiment_percentages['negative'] > 20 else ''}

### Recommendation:
{'Consider highlighting the positive aspects that viewers appreciated.' if dominant_sentiment == 'positive' else 'Consider clarifying content that might be causing neutral reactions.' if dominant_sentiment == 'neutral' else 'Consider addressing criticism in future content.'}
"""
    return summary

# Create visualization functions
def create_sentiment_chart(sentiment_percentages):
    """Create a pie chart of sentiment percentages."""
    labels = list(sentiment_percentages.keys())
    sizes = list(sentiment_percentages.values())
    colors = ['#ff9999', '#66b3ff', '#99ff99']

    fig, ax = plt.subplots(figsize=(8, 6))
    ax.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
    ax.axis('equal')
    plt.title('Sentiment Distribution')

    return fig

# Gradio Interface Functions
def analyze_youtube_url(youtube_url):
    """Main function to analyze YouTube comments from a URL."""
    video_id = extract_video_id(youtube_url)
    if not video_id:
        return "Invalid YouTube URL. Please provide a valid YouTube video URL.", None, None, None

    video_title = get_video_title(video_id)
    comments = get_video_comments(video_id, max_comments=100)

    if not comments:
        return "No comments found or error retrieving comments.", None, None, None

    analysis_results = analyze_comments(comments)
    summary = generate_summary(analysis_results, video_title)
    chart = create_sentiment_chart(analysis_results['sentiment_percentages'])

    # Format top comments for display
    top_comments_md = "## Top Comments\n\n"
    for i, comment in enumerate(analysis_results['top_comments'][:5], 1):
        sentiment_emoji = "😊" if comment['sentiment'] == 'positive' else "😐" if comment['sentiment'] == 'neutral' else "😞"
        top_comments_md += f"### {i}. {comment['author']} ({sentiment_emoji} {comment['sentiment'].capitalize()})\n"
        top_comments_md += f"{comment['text']}\n"
        top_comments_md += f"*Likes: {comment['likes']}*\n\n"

    return summary, top_comments_md, chart, f"Analysis complete for video: {video_title}"

# Create Gradio Interface
def create_gradio_interface():
    """Create and launch the Gradio interface."""
    # CSS for customizing the interface
    css = """
    .gradio-container {
        font-family: 'Arial', sans-serif;
    }
    h1 {
        color: #4285F4;
        text-align: center;
    }
    """

    # Create the interface
    with gr.Blocks(css=css) as demo:
        gr.Markdown("# YouTube Comments Sentiment Analyzer")
        gr.Markdown("Enter a YouTube video URL to analyze the sentiment of its comments")

        with gr.Row():
            youtube_url = gr.Textbox(label="YouTube Video URL", placeholder="https://www.youtube.com/watch?v=...")
            analyze_button = gr.Button("Analyze")

        status = gr.Textbox(label="Status")

        with gr.Tabs():
            with gr.TabItem("Summary"):
                summary_output = gr.Markdown()

            with gr.TabItem("Top Comments"):
                top_comments = gr.Markdown()

            with gr.TabItem("Visualization"):
                chart_output = gr.Plot()

        analyze_button.click(
            fn=analyze_youtube_url,
            inputs=youtube_url,
            outputs=[summary_output, top_comments, chart_output, status]
        )

        gr.Markdown("## How to use")
        gr.Markdown("""
        1. Paste a YouTube video URL in the input box
        2. Click the 'Analyze' button
        3. Wait for the analysis to complete
        4. View the results in the tabs below
        """)

    return demo

# Main function to run the application
def main():
    """Main function to run the YouTube Comments Sentiment Analyzer."""
    # Create and launch the Gradio interface
    demo = create_gradio_interface()
    demo.launch(debug=True, share=True)

if __name__ == "__main__":
    main()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://49a9c6c4d96d0bc36d.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


All model checkpoint layers were used when initializing TFRobertaForSequenceClassification.

All the layers of TFRobertaForSequenceClassification were initialized from the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaForSequenceClassification for predictions without further training.
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Loaded enhanced model successfully.
Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://49a9c6c4d96d0bc36d.gradio.live
