In [2]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.metrics import classification_report
import streamlit as st
import pickle
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import requests
from bs4 import BeautifulSoup
import re
import unittest

In [3]:
class YouTubeCommentScraper:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
    
    def extract_video_id(self, url):
        """Extrae el ID del video de una URL de YouTube"""
        patterns = [
            r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
            r'(?:embed\/)([0-9A-Za-z_-]{11})',
            r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})'
        ]
        
        for pattern in patterns:
            match = re.search(pattern, url)
            if match:
                return match.group(1)
        return None
    
    def get_comments(self, video_url, max_comments=100):
        """Obtiene comentarios de un video de YouTube"""
        video_id = self.extract_video_id(video_url)
        if not video_id:
            raise ValueError("URL de video inválida")
            
        # Aquí normalmente usaríamos la API de YouTube, pero por simplicidad
        # simularemos algunos comentarios de ejemplo
        # En un entorno real, necesitarías configurar la API de YouTube
        sample_comments = [
            "Este es un comentario de ejemplo 1",
            "Este es un comentario de ejemplo 2",
            "Este es un comentario de ejemplo 3"
        ]
        
        return sample_comments

In [4]:
class EnhancedHateSpeechDetector:
    def __init__(self):
        # Crear clasificadores base
        clf1 = LogisticRegression(C=0.1, class_weight='balanced', random_state=42)
        clf2 = MultinomialNB(alpha=0.1)
        clf3 = LinearSVC(C=0.1, class_weight='balanced', random_state=42)
        
        # Crear ensemble
        ensemble = VotingClassifier(
            estimators=[
                ('lr', clf1),
                ('nb', clf2),
                ('svc', clf3)
            ],
            voting='soft'
        )
        
        # Crear pipeline
        self.pipeline = Pipeline([
            ('vectorizer', TfidfVectorizer(
                max_features=3000,
                ngram_range=(1, 2),
                min_df=2,
                max_df=0.95,
                stop_words='english'
            )),
            ('scaler', StandardScaler(with_mean=False)),
            ('ensemble', ensemble)
        ])
        
        self.scraper = YouTubeCommentScraper()
    
    def prepare_target(self, df):
        """Combina todas las columnas objetivo en una sola etiqueta de odio"""
        hate_columns = ['IsToxic', 'IsAbusive', 'IsThreat', 'IsProvocative', 
                       'IsObscene', 'IsHatespeech', 'IsRacist', 'IsNationalist', 
                       'IsSexist', 'IsHomophobic', 'IsReligiousHate', 'IsRadicalism']
        return (df[hate_columns].sum(axis=1) > 0).astype(int)
    
    def train(self, df):
        """Entrena el modelo con los datos proporcionados"""
        X = df['Text']
        y = self.prepare_target(df)
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        
        self.pipeline.fit(X_train, y_train)
        
        train_pred = self.pipeline.predict(X_train)
        test_pred = self.pipeline.predict(X_test)
        
        train_acc = np.mean(train_pred == y_train)
        test_acc = np.mean(test_pred == y_test)
        
        return {
            'train_accuracy': train_acc,
            'test_accuracy': test_acc,
            'overfitting': train_acc - test_acc,
            'classification_report': classification_report(y_test, test_pred)
        }
    
    def analyze_video(self, video_url):
        """Analiza los comentarios de un video de YouTube"""
        try:
            comments = self.scraper.get_comments(video_url)
            predictions = self.predict(comments)
            
            results = []
            for comment, pred_prob in zip(comments, predictions):
                results.append({
                    'comment': comment,
                    'hate_probability': pred_prob,
                    'is_hate': pred_prob > 0.5
                })
            
            return results
        except Exception as e:
            raise Exception(f"Error al analizar el video: {str(e)}")
    
    def predict(self, texts):
        """Predice si los textos contienen mensajes de odio"""
        if isinstance(texts, str):
            texts = [texts]
        return self.pipeline.predict_proba(texts)[:, 1]
    
    def save_model(self, path):
        """Guarda el modelo entrenado"""
        with open(path, 'wb') as f:
            pickle.dump(self.pipeline, f)
    
    @classmethod
    def load_model(cls, path):
        """Carga un modelo guardado"""
        detector = cls()
        with open(path, 'rb') as f:
            detector.pipeline = pickle.load(f)
        return detector

In [5]:
# Tests unitarios
class TestHateSpeechDetector(unittest.TestCase):
    def setUp(self):
        self.detector = EnhancedHateSpeechDetector()
        self.sample_text = "Este es un texto de prueba"
        self.sample_url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"
    
    def test_prediction_format(self):
        pred = self.detector.predict(self.sample_text)
        self.assertTrue(0 <= pred <= 1)
    
    def test_video_id_extraction(self):
        video_id = self.detector.scraper.extract_video_id(self.sample_url)
        self.assertEqual(len(video_id), 11)
    
    def test_video_analysis_format(self):
        results = self.detector.analyze_video(self.sample_url)
        self.assertTrue(isinstance(results, list))
        if results:
            self.assertTrue(all(isinstance(r, dict) for r in results))

In [None]:
# Interfaz Streamlit mejorada
def create_streamlit_app():
    st.title("Detector de Mensajes de Odio")
    
    try:
        detector = EnhancedHateSpeechDetector.load_model('hate_speech_model_enhanced.pkl')
    except:
        st.error("No se encontró un modelo entrenado. Por favor, entrene el modelo primero.")
        return
    
    # Tabs para diferentes funcionalidades
    tab1, tab2 = st.tabs(["Analizar Texto", "Analizar Video de YouTube"])
    
    with tab1:
        text_input = st.text_area("Introduce el texto a analizar:")
        if st.button("Analizar Texto"):
            if text_input:
                probability = detector.predict(text_input)[0]
                st.write(f"Probabilidad de contenido de odio: {probability:.2%}")
                
                if probability > 0.5:
                    st.error("⚠️ Este texto puede contener mensajes de odio.")
                else:
                    st.success("✅ Este texto parece seguro.")
    
    with tab2:
        video_url = st.text_input("Introduce la URL del video de YouTube:")
        if st.button("Analizar Video"):
            if video_url:
                with st.spinner("Analizando comentarios..."):
                    try:
                        results = detector.analyze_video(video_url)
                        
                        hate_comments = [r for r in results if r['is_hate']]
                        st.write(f"Se encontraron {len(hate_comments)} comentarios potencialmente ofensivos de {len(results)} analizados.")
                        
                        for result in results:
                            if result['is_hate']:
                                st.error(f"⚠️ {result['comment']}\nProbabilidad: {result['hate_probability']:.2%}")
                            else:
                                st.success(f"✅ {result['comment']}\nProbabilidad: {result['hate_probability']:.2%}")
                    except Exception as e:
                        st.error(f"Error al analizar el video: {str(e)}")


if __name__ == '__main__':
    df = pd.read_csv('youtoxic_english_1000.csv')
    detector = EnhancedHateSpeechDetector()
    metrics = detector.train(df)
    print("Métricas del modelo:")
    for key, value in metrics.items():
        print(f"{key}: {value}")
    
    detector.save_model('hate_speech_model_enhanced.pkl')
    
    create_streamlit_app()


In [1]:
import torch
print("GPU disponible:", torch.cuda.is_available())


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.1.3 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/runpy.py", line 198, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/runpy.py", line 88, in _run_code
    exec(code, run_globals)
  File "/Users/cash/Desktop/F5/Python/NLP/env/lib/python3.12/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "/Users/cash/Desktop/F5/Python/NLP/env/lib/python3.12/site-packages/tra

GPU disponible: False
