In [1]:
!pip install transformers datasets torch tqdm
from google.colab import drive
drive.mount('/content/drive')

Collecting datasets
  Downloading datasets-3.0.2-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Downloading datasets-3.0.2-py3-none-any.whl (472 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m472.7/472.7 kB[0m [31m33.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m12.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading multiprocess-0.70.16-py310-none-any.whl (134 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading

In [5]:
import torch
import pandas as pd
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
import os
from google.colab import files, drive
import json

class HateSpeechDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, item):
        text = str(self.texts[item])
        label = self.labels[item] if self.labels is not None else None

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        item_dict = {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
        }

        if label is not None:
            item_dict['labels'] = torch.FloatTensor(label)

        return item_dict

class HateSpeechDetector:
    def __init__(self, data_path='/content/train.csv'):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.label_columns = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']
        self.tokenizer = None
        self.model = None
        self.data_path = data_path
        self.initialize_model()
        print(f"Using device: {self.device}")

    def initialize_model(self):
        try:
            self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
            self.model = BertForSequenceClassification.from_pretrained(
                'bert-base-uncased',
                num_labels=len(self.label_columns)
            )
            self.model.to(self.device)
            print("Model initialized successfully")
            return True
        except Exception as e:
            print(f"Error initializing model: {str(e)}")
            return False

    def mount_drive_if_needed(self):
        """Mount Google Drive if the file is not found in the current directory"""
        if not os.path.exists(self.data_path):
            try:
                print("Mounting Google Drive...")
                drive.mount('/content/drive')
                alternate_path = '/content/drive/MyDrive/train.csv'
                if os.path.exists(alternate_path):
                    self.data_path = alternate_path
                    print(f"Found dataset at: {self.data_path}")
                else:
                    print("Dataset not found in Google Drive either.")
            except Exception as e:
                print(f"Error mounting Google Drive: {str(e)}")

    def load_dataset(self):
        try:
            # First check if file exists
            if not os.path.exists(self.data_path):
                self.mount_drive_if_needed()

            # If file still doesn't exist, prompt for upload
            if not os.path.exists(self.data_path):
                print(f"Dataset not found at {self.data_path}")
                print("Please upload your dataset (CSV file):")
                uploaded = files.upload()
                self.data_path = next(iter(uploaded))

            print(f"Loading dataset from: {self.data_path}")
            df = pd.read_csv(self.data_path)

            # Handle different column names
            text_col = 'text' if 'text' in df.columns else 'comment_text'
            if text_col not in df.columns:
                raise ValueError(f"No text column found in dataset. Available columns: {df.columns.tolist()}")

            texts = df[text_col].values
            print(f"Using '{text_col}' as text column")

            # Verify all label columns exist
            missing_labels = [col for col in self.label_columns if col not in df.columns]
            if missing_labels:
                raise ValueError(f"Missing label columns: {missing_labels}")

            labels = df[self.label_columns].values

            print(f"Dataset loaded successfully: {len(texts)} samples")
            return texts, labels
        except Exception as e:
            print(f"Error loading dataset: {str(e)}")
            return None, None

    def train_model(self, texts, labels, epochs=2, batch_size=16):
        try:
            print("\nStarting model training...")
            print(f"Total samples: {len(texts)}")

            train_texts, val_texts, train_labels, val_labels = train_test_split(
                texts, labels, test_size=0.2, random_state=42
            )
            print(f"Training samples: {len(train_texts)}")
            print(f"Validation samples: {len(val_texts)}")

            train_dataset = HateSpeechDataset(train_texts, train_labels, self.tokenizer)
            val_dataset = HateSpeechDataset(val_texts, val_labels, self.tokenizer)

            train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
            val_loader = DataLoader(val_dataset, batch_size=batch_size)

            optimizer = torch.optim.AdamW(self.model.parameters(), lr=2e-5)

            print(f"\nTraining with batch size: {batch_size} on {self.device}")

            for epoch in range(epochs):
                self.model.train()
                total_loss = 0
                progress_bar = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{epochs}')

                for batch in progress_bar:
                    optimizer.zero_grad()
                    input_ids = batch['input_ids'].to(self.device)
                    attention_mask = batch['attention_mask'].to(self.device)
                    labels = batch['labels'].to(self.device)

                    outputs = self.model(input_ids, attention_mask=attention_mask, labels=labels)
                    loss = outputs.loss
                    loss.backward()
                    optimizer.step()
                    total_loss += loss.item()

                    progress_bar.set_postfix({'loss': total_loss / len(train_loader)})

                # Validation step
                self.model.eval()
                val_loss = 0
                with torch.no_grad():
                    for batch in val_loader:
                        input_ids = batch['input_ids'].to(self.device)
                        attention_mask = batch['attention_mask'].to(self.device)
                        labels = batch['labels'].to(self.device)
                        outputs = self.model(input_ids, attention_mask=attention_mask, labels=labels)
                        val_loss += outputs.loss.item()

                print(f"\nEpoch {epoch + 1} - Training Loss: {total_loss / len(train_loader):.4f}, Validation Loss: {val_loss / len(val_loader):.4f}")

            print("\nTraining completed successfully!")
            return True
        except Exception as e:
            print(f"Error during training: {str(e)}")
            return False

    def save_model(self, save_path='hate_speech_model'):
        try:
            # Save model state
            torch.save(self.model.state_dict(), f'{save_path}.pth')

            # Save label columns and other metadata
            metadata = {
                'label_columns': self.label_columns,
                'model_config': 'bert-base-uncased'
            }

            with open(f'{save_path}_metadata.json', 'w') as f:
                json.dump(metadata, f)

            print(f"Model and metadata saved successfully to:")
            print(f"- Model: {save_path}.pth")
            print(f"- Metadata: {save_path}_metadata.json")
            return True
        except Exception as e:
            print(f"Error saving model: {str(e)}")
            return False

    def analyze_text(self, text):
        try:
            self.model.eval()
            dataset = HateSpeechDataset([text], None, self.tokenizer)
            batch = next(iter(DataLoader(dataset, batch_size=1)))

            with torch.no_grad():
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                outputs = self.model(input_ids, attention_mask=attention_mask)
                predictions = torch.sigmoid(outputs.logits).cpu().numpy()[0]

            # Generate detailed analysis
            results = {
                'text': text,
                'predictions': {},
                'severity': 'None',
                'toxic_categories': []
            }

            max_score = 0
            for label, score in zip(self.label_columns, predictions):
                score_float = float(score)
                results['predictions'][label] = score_float
                if score_float > 0.5:
                    results['toxic_categories'].append(label)
                max_score = max(max_score, score_float)

            # Determine severity level with more detailed thresholds
            if max_score > 0.8:
                results['severity'] = 'High'
            elif max_score > 0.5:
                results['severity'] = 'Medium'
            elif max_score > 0.2:
                results['severity'] = 'Low'

            # Add analysis summary
            results['summary'] = self._generate_analysis_summary(results)
            return results
        except Exception as e:
            print(f"Error analyzing text: {str(e)}")
            return None

    def _generate_analysis_summary(self, results):
        """Generate a human-readable summary of the analysis"""
        if not results['toxic_categories']:
            return "No significant toxic content detected."

        summary = []
        if results['severity'] != 'None':
            summary.append(f"Detected {results['severity'].lower()} level of toxic content.")

        if results['toxic_categories']:
            categories = ", ".join(results['toxic_categories'])
            summary.append(f"Main concerns: {categories}.")

        return " ".join(summary)

def run_colab_demo():
    print("Initializing Hate Speech Detector...")
    detector = HateSpeechDetector()

    print("\n1. Loading and Training Model")
    texts, labels = detector.load_dataset()
    if texts is not None and labels is not None:
        detector.train_model(texts, labels)
        detector.save_model()
        print("\nDownloading model files...")
        files.download('hate_speech_model.pth')
        files.download('hate_speech_model_metadata.json')

    print("\n2. Testing the Model")
    while True:
        text = input("\nEnter text to analyze (or 'quit' to exit): ")
        if text.lower() == 'quit':
            break

        results = detector.analyze_text(text)
        if results:
            print("\nAnalysis Results:")
            print(f"Text: {results['text']}")
            print(f"Overall Severity: {results['severity']}")
            print(f"Summary: {results['summary']}")
            print("\nDetailed Scores:")
            for label, score in results['predictions'].items():
                print(f"{label}: {score:.2%}")

if __name__ == "__main__":
    run_colab_demo()

Initializing Hate Speech Detector...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model initialized successfully
Using device: cuda

1. Loading and Training Model
Loading dataset from: /content/train.csv
Using 'comment_text' as text column
Dataset loaded successfully: 159571 samples

Starting model training...
Total samples: 159571
Training samples: 127656
Validation samples: 31915

Training with batch size: 16 on cuda


Epoch 1/2:   0%|          | 0/7979 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [6]:
# prompt: load /content/hate_speech_model.pth

model_path = '/content/hate_speech_model.pth'

# Load the model's state dictionary
model_state_dict = torch.load(model_path)

# Assuming you have the HateSpeechDetector class defined as in your code
detector = HateSpeechDetector()

# Load the model's state dictionary into the detector's model
detector.model.load_state_dict(model_state_dict)

# You can now use the detector to analyze text
results = detector.analyze_text("Example text to analyze.")
results

  model_state_dict = torch.load(model_path)
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model initialized successfully
Using device: cuda


{'text': 'Example text to analyze.',
 'predictions': {'toxic': 0.00043493116390891373,
  'severe_toxic': 0.00010030695557361469,
  'obscene': 0.0003639559436123818,
  'threat': 0.00011584408639464527,
  'insult': 0.00024537532590329647,
  'identity_hate': 0.00013614780618809164},
 'severity': 'None',
 'toxic_categories': [],
 'summary': 'No significant toxic content detected.'}

In [7]:
import torch
import pandas as pd
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
import os
from google.colab import files, drive
import json
import re
from typing import List, Dict, Any
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from collections import Counter

class DocumentAnalyzer:
    """文档分析器类，用于处理各种格式的文档"""

    def __init__(self, detector):
        self.detector = detector
        self.supported_formats = {
            '.txt': self._read_txt,
            '.csv': self._read_csv,
            '.json': self._read_json,
            '.doc': self._read_doc,
            '.docx': self._read_doc,
            '.pdf': self._read_pdf
        }

    def _read_txt(self, file_path):
        """读取文本文件"""
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read().split('\n')

    def _read_csv(self, file_path):
        """读取CSV文件"""
        df = pd.read_csv(file_path)
        text_col = next((col for col in df.columns if any(
            keyword in col.lower() for keyword in ['text', 'content', 'comment'])), df.columns[0])
        return df[text_col].tolist()

    def _read_json(self, file_path):
        """读取JSON文件"""
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        if isinstance(data, list):
            return [item.get('text', str(item)) for item in data]
        return [str(data)]

    def _read_doc(self, file_path):
        """读取DOC/DOCX文件"""
        try:
            import docx
            doc = docx.Document(file_path)
            return [paragraph.text for paragraph in doc.paragraphs if paragraph.text.strip()]
        except ImportError:
            print("请安装python-docx: pip install python-docx")
            return []

    def _read_pdf(self, file_path):
        """读取PDF文件"""
        try:
            import PyPDF2
            texts = []
            with open(file_path, 'rb') as f:
                reader = PyPDF2.PdfReader(f)
                for page in reader.pages:
                    texts.extend(page.extract_text().split('\n'))
            return texts
        except ImportError:
            print("请安装PyPDF2: pip install PyPDF2")
            return []

    def analyze_file(self, file_path):
        """分析文件内容"""
        file_ext = os.path.splitext(file_path)[1].lower()
        if file_ext not in self.supported_formats:
            raise ValueError(f"不支持的文件格式: {file_ext}")

        texts = self.supported_formats[file_ext](file_path)
        return self.analyze_texts(texts)

    def analyze_texts(self, texts):
        """批量分析文本"""
        results = []
        total_toxic_count = 0
        severity_counts = {'High': 0, 'Medium': 0, 'Low': 0, 'None': 0}
        category_counts = {category: 0 for category in self.detector.label_columns}

        for text in tqdm(texts, desc="分析进度"):
            if not text or not text.strip():
                continue

            result = self.detector.analyze_text(text.strip())
            if result:
                results.append(result)

                # 更新统计信息
                severity_counts[result['severity']] += 1
                if result['toxic_categories']:
                    total_toxic_count += 1
                    for category in result['toxic_categories']:
                        category_counts[category] += 1

        # 生成分析报告
        report = self._generate_analysis_report(
            results, total_toxic_count, severity_counts, category_counts, len(texts)
        )

        return {
            'detailed_results': results,
            'summary_report': report
        }

    def _generate_analysis_report(self, results, total_toxic, severity_counts, category_counts, total_texts):
        """生成详细的分析报告"""
        report = {
            'overview': {
                'total_texts_analyzed': total_texts,
                'toxic_texts_found': total_toxic,
                'toxic_percentage': (total_toxic / total_texts * 100) if total_texts > 0 else 0
            },
            'severity_distribution': {
                severity: {
                    'count': count,
                    'percentage': (count / total_texts * 100) if total_texts > 0 else 0
                }
                for severity, count in severity_counts.items()
            },
            'category_distribution': {
                category: {
                    'count': count,
                    'percentage': (count / total_texts * 100) if total_texts > 0 else 0
                }
                for category, count in category_counts.items()
            },
            'most_severe_examples': self._get_most_severe_examples(results),
            'recommendations': self._generate_recommendations(severity_counts, category_counts, total_texts)
        }
        return report

    def _get_most_severe_examples(self, results, top_n=5):
        """获取最严重的示例"""
        sorted_results = sorted(
            [r for r in results if r['severity'] != 'None'],
            key=lambda x: max(x['predictions'].values()),
            reverse=True
        )
        return sorted_results[:top_n]

    def _generate_recommendations(self, severity_counts, category_counts, total_texts):
        """基于分析结果生成建议"""
        recommendations = []

        # 根据严重程度分布生成建议
        high_severity_percentage = (severity_counts['High'] / total_texts * 100) if total_texts > 0 else 0
        if high_severity_percentage > 10:
            recommendations.append("检测到大量高严重度的有害内容，建议立即审查和处理。")

        # 根据分类分布生成建议
        main_categories = sorted(
            [(cat, count) for cat, count in category_counts.items() if count > 0],
            key=lambda x: x[1],
            reverse=True
        )
        if main_categories:
            recommendations.append(f"主要问题集中在 {main_categories[0][0]} 类别，建议重点关注。")

        return recommendations

def enhance_detector(detector_class):
    """增强检测器类"""

    class EnhancedDetector(detector_class):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.document_analyzer = DocumentAnalyzer(self)
            self._setup_enhanced_features()

        def _setup_enhanced_features(self):
            """设置增强功能"""
            try:
                import nltk
                nltk.download('punkt', quiet=True)
                nltk.download('averaged_perceptron_tagger', quiet=True)
                nltk.download('maxent_ne_chunker', quiet=True)
                nltk.download('words', quiet=True)
                self.nltk_loaded = True
            except:
                self.nltk_loaded = False
                print("NLTK加载失败，部分高级功能可能不可用")

        def analyze_file(self, file_path):
            """分析文件"""
            return self.document_analyzer.analyze_file(file_path)

        def analyze_texts(self, texts):
            """批量分析文本"""
            return self.document_analyzer.analyze_texts(texts)

        def upload_and_analyze(self):
            """上传并分析文件"""
            uploaded = files.upload()
            results = []

            for filename in uploaded.keys():
                print(f"\n分析文件: {filename}")
                file_path = filename
                with open(file_path, 'wb') as f:
                    f.write(uploaded[filename])
                try:
                    result = self.analyze_file(file_path)
                    results.append({
                        'filename': filename,
                        'analysis': result
                    })
                except Exception as e:
                    print(f"分析文件 {filename} 时出错: {str(e)}")
                finally:
                    if os.path.exists(file_path):
                        os.remove(file_path)

            return results

    return EnhancedDetector

# 使用增强检测器
EnhancedHateSpeechDetector = enhance_detector(HateSpeechDetector)

# 修改演示函数
def run_enhanced_demo():
    print("初始化增强版仇恨语言检测器...")
    detector = EnhancedHateSpeechDetector()

    while True:
        print("\n选择操作:")
        print("1. 分析单个文本")
        print("2. 上传并分析文件")
        print("3. 训练新模型")
        print("4. 退出")

        choice = input("\n请输入选项 (1-4): ")

        if choice == '1':
            text = input("\n请输入要分析的文本: ")
            results = detector.analyze_text(text)
            if results:
                print("\n分析结果:")
                print(f"文本: {results['text']}")
                print(f"严重程度: {results['severity']}")
                print(f"概要: {results['summary']}")
                print("\n详细分数:")
                for label, score in results['predictions'].items():
                    print(f"{label}: {score:.2%}")

        elif choice == '2':
            print("\n请选择要上传的文件...")
            results = detector.upload_and_analyze()

            for file_result in results:
                print(f"\n文件 {file_result['filename']} 的分析结果:")
                report = file_result['analysis']['summary_report']

                print(f"\n概述:")
                print(f"分析的文本总数: {report['overview']['total_texts_analyzed']}")
                print(f"发现的有害内容数: {report['overview']['toxic_texts_found']}")
                print(f"有害内容比例: {report['overview']['toxic_percentage']:.2f}%")

                print("\n严重程度分布:")
                for severity, stats in report['severity_distribution'].items():
                    print(f"{severity}: {stats['count']} ({stats['percentage']:.2f}%)")

                print("\n建议:")
                for rec in report['recommendations']:
                    print(f"- {rec}")

        elif choice == '3':
            texts, labels = detector.load_dataset()
            if texts is not None and labels is not None:
                detector.train_model(texts, labels)
                detector.save_model()

        elif choice == '4':
            print("\n感谢使用!")
            break

if __name__ == "__main__":
    run_enhanced_demo()

class DocumentAnalyzer:
    """增强的文档分析器，支持主题提取和仇恨语言分析"""

    def __init__(self, hate_speech_detector):
        self.detector = hate_speech_detector
        self.setup_nltk()
        self.xenophobia_keywords = {
            'high': ['savage', 'barbaric', 'primitive', 'inferior', 'uncivilized', 'treacherous'],
            'moderate': ['hostile', 'threat', 'backward', 'resistance', 'unwilling'],
            'low': ['different', 'foreign', 'strange', 'unfamiliar']
        }

    def setup_nltk(self):
        """设置NLTK所需资源"""
        try:
            nltk.download('punkt', quiet=True)
            nltk.download('stopwords', quiet=True)
            nltk.download('averaged_perceptron_tagger', quiet=True)
            self.stop_words = set(stopwords.words('english'))
        except Exception as e:
            print(f"NLTK设置错误: {str(e)}")
            self.stop_words = set()

    def extract_themes(self, text: str, num_themes: int = 5) -> List[str]:
        """从文本中提取主要主题"""
        # 分句和分词
        sentences = sent_tokenize(text.lower())
        words = [word_tokenize(sent) for sent in sentences]

        # 过滤停用词和标点符号
        filtered_words = []
        for sent in words:
            filtered_words.extend([
                word for word in sent
                if word.isalnum() and word not in self.stop_words
            ])

        # 统计词频
        word_freq = Counter(filtered_words)

        # 提取主题相关词组
        themes = []
        for word, freq in word_freq.most_common(20):
            context = self._get_word_context(text.lower(), word)
            if len(themes) < num_themes and context:
                themes.append(context)

        return themes

    def _get_word_context(self, text: str, word: str, window: int = 50) -> str:
        """获取关键词的上下文"""
        matches = list(re.finditer(r'\b' + re.escape(word) + r'\b', text))
        if not matches:
            return ""

        # 获取最具代表性的上下文
        best_context = ""
        max_keywords = 0

        for match in matches:
            start = max(0, match.start() - window)
            end = min(len(text), match.end() + window)
            context = text[start:end]

            # 计算上下文中的主题相关关键词数量
            keyword_count = sum(1 for kw in self.xenophobia_keywords['high'] +
                              self.xenophobia_keywords['moderate']
                              if kw in context)

            if keyword_count > max_keywords:
                max_keywords = keyword_count
                best_context = context

        return best_context.strip().capitalize()

    def analyze_document(self, text: str) -> Dict[str, Any]:
        """完整的文档分析"""
        # 提取主题
        themes = self.extract_themes(text)

        # 仇恨语言分析
        hate_speech_results = self.detector.analyze_text(text)

        # 计算整体仇恨程度（0-10分）
        xenophobia_score = self._calculate_xenophobia_score(text, themes, hate_speech_results)

        return {
            'themes': themes,
            'hate_speech_analysis': hate_speech_results,
            'xenophobia_score': xenophobia_score,
            'severity_level': self._get_severity_level(xenophobia_score),
            'keyword_analysis': self._analyze_keywords(text)
        }

    def _calculate_xenophobia_score(self, text: str, themes: List[str],
                                  hate_speech_results: Dict[str, Any]) -> float:
        """计算文档的仇恨程度分数"""
        score = 0

        # 基于关键词的得分
        text_lower = text.lower()
        for level, keywords in self.xenophobia_keywords.items():
            weight = 1.0 if level == 'high' else 0.6 if level == 'moderate' else 0.3
            for keyword in keywords:
                score += text_lower.count(keyword) * weight

        # 基于主题的得分
        theme_text = ' '.join(themes).lower()
        for level, keywords in self.xenophobia_keywords.items():
            weight = 1.0 if level == 'high' else 0.6 if level == 'moderate' else 0.3
            for keyword in keywords:
                score += theme_text.count(keyword) * weight * 0.5

        # 基于仇恨语言检测结果的得分
        if hate_speech_results:
            toxic_score = hate_speech_results.get('predictions', {}).get('toxic', 0)
            score += toxic_score * 3

        # 归一化到0-10分
        normalized_score = min(10, max(0, score / 5))
        return round(normalized_score, 1)

    def _get_severity_level(self, score: float) -> str:
        """根据分数确定严重程度级别"""
        if score >= 7:
            return "High"
        elif score >= 4:
            return "Moderate"
        elif score > 1:
            return "Low"
        return "None"

    def _analyze_keywords(self, text: str) -> Dict[str, List[str]]:
        """分析文本中的仇恨关键词"""
        text_lower = text.lower()
        found_keywords = {
            'high': [],
            'moderate': [],
            'low': []
        }

        for level, keywords in self.xenophobia_keywords.items():
            for keyword in keywords:
                if keyword in text_lower:
                    found_keywords[level].append(keyword)

        return found_keywords

    def compare_documents(self, documents: List[str]) -> Dict[str, Any]:
        """比较多个文档的仇恨程度"""
        results = []
        for i, doc in enumerate(documents, 1):
            analysis = self.analyze_document(doc)
            results.append({
                'document_index': i,
                'analysis': analysis
            })

        # 生成比较报告
        comparison = {
            'individual_analyses': results,
            'comparative_summary': self._generate_comparative_summary(results)
        }

        return comparison

    def _generate_comparative_summary(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
        """生成文档比较的总结报告"""
        summary = {
            'score_range': {
                'min': min(r['analysis']['xenophobia_score'] for r in results),
                'max': max(r['analysis']['xenophobia_score'] for r in results)
            },
            'severity_distribution': {},
            'common_themes': self._find_common_themes(results),
            'ranking': sorted(
                [(i, r['analysis']['xenophobia_score'])
                 for i, r in enumerate(results, 1)],
                key=lambda x: x[1],
                reverse=True
            )
        }

        return summary

    def _find_common_themes(self, results: List[Dict[str, Any]]) -> List[str]:
        """找出文档间的共同主题"""
        all_themes = []
        for result in results:
            all_themes.extend(result['analysis']['themes'])

        # 统计主题频率
        theme_freq = Counter(all_themes)

        # 返回出现在多个文档中的主题
        return [theme for theme, freq in theme_freq.most_common() if freq > 1]

# 使用示例
def analyze_documents_demo(detector, documents):
    """演示如何分析多个文档"""
    analyzer = DocumentAnalyzer(detector)

    print("开始分析文档...")
    comparison = analyzer.compare_documents(documents)

    print("\n=== 文档分析结果 ===")

    # 显示每个文档的分析结果
    for doc_result in comparison['individual_analyses']:
        doc_idx = doc_result['document_index']
        analysis = doc_result['analysis']

        print(f"\n文档 {doc_idx}:")
        print(f"仇恨程度分数: {analysis['xenophobia_score']}/10")
        print(f"严重程度: {analysis['severity_level']}")
        print("\n主要主题:")
        for i, theme in enumerate(analysis['themes'], 1):
            print(f"{i}. {theme}")

        print("\n关键词分析:")
        for level, keywords in analysis['keyword_analysis'].items():
            if keywords:
                print(f"{level.capitalize()}: {', '.join(keywords)}")

    # 显示比较总结
    summary = comparison['comparative_summary']
    print("\n=== 比较总结 ===")
    print(f"分数范围: {summary['score_range']['min']} - {summary['score_range']['max']}")
    print("\n文档排名 (按仇恨程度从高到低):")
    for doc_idx, score in summary['ranking']:
        print(f"文档 {doc_idx}: {score}/10")

    if summary['common_themes']:
        print("\n共同主题:")
        for theme in summary['common_themes']:
            print(f"- {theme}")

    return comparison

if __name__ == "__main__":
    # 初始化检测器和分析器
    detector = HateSpeechDetector()

    # 准备示例文档
    documents = [
        "文档1的内容",
        "文档2的内容",
        "文档3的内容"
    ]

    # 运行分析
    results = analyze_documents_demo(detector, documents)

初始化增强版仇恨语言检测器...


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model initialized successfully
Using device: cuda

选择操作:
1. 分析单个文本
2. 上传并分析文件
3. 训练新模型
4. 退出

请输入选项 (1-4): 2

请选择要上传的文件...


Saving MILD The Triumph of Liberty- How the United States Overcame British Oppression and Internal Threats to Form a New Nation.docx to MILD The Triumph of Liberty- How the United States Overcame British Oppression and Internal Threats to Form a New Nation (2).docx

分析文件: MILD The Triumph of Liberty- How the United States Overcame British Oppression and Internal Threats to Form a New Nation (2).docx


分析进度:   0%|          | 0/23 [00:00<?, ?it/s]


文件 MILD The Triumph of Liberty- How the United States Overcame British Oppression and Internal Threats to Form a New Nation (2).docx 的分析结果:

概述:
分析的文本总数: 23
发现的有害内容数: 23
有害内容比例: 100.00%

严重程度分布:
High: 0 (0.00%)
Medium: 23 (100.00%)
Low: 0 (0.00%)
None: 0 (0.00%)

建议:
- 主要问题集中在 obscene 类别，建议重点关注。

选择操作:
1. 分析单个文本
2. 上传并分析文件
3. 训练新模型
4. 退出

请输入选项 (1-4): 4

感谢使用!


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model initialized successfully
Using device: cuda
开始分析文档...

=== 文档分析结果 ===

文档 1:
仇恨程度分数: 0.3/10
严重程度: None

主要主题:

关键词分析:

文档 2:
仇恨程度分数: 0.3/10
严重程度: None

主要主题:

关键词分析:

文档 3:
仇恨程度分数: 0.3/10
严重程度: None

主要主题:

关键词分析:

=== 比较总结 ===
分数范围: 0.3 - 0.3

文档排名 (按仇恨程度从高到低):
文档 1: 0.3/10
文档 2: 0.3/10
文档 3: 0.3/10


In [8]:
def setup_requirements():
    """安装所需依赖"""
    import subprocess
    import sys

    requirements = [
        'python-docx',
        'PyPDF2',
        'nltk',
        'transformers',
        'torch',
        'tqdm'
    ]

    for package in requirements:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
            print(f"Successfully installed {package}")
        except:
            print(f"Failed to install {package}")

class DocumentProcessor:
    """文档处理器"""

    def __init__(self):
        self.supported_formats = {
            '.txt': self._read_txt,
            '.docx': self._read_docx,
            '.pdf': self._read_pdf
        }

    def _read_txt(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()

    def _read_docx(self, file_path):
        from docx import Document
        doc = Document(file_path)
        return '\n'.join([paragraph.text for paragraph in doc.paragraphs])

    def _read_pdf(self, file_path):
        import PyPDF2
        content = []
        with open(file_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            for page in reader.pages:
                content.append(page.extract_text())
        return '\n'.join(content)

    def read_file(self, file_path):
        """读取文件内容"""
        import os
        file_ext = os.path.splitext(file_path)[1].lower()

        if file_ext not in self.supported_formats:
            raise ValueError(f"Unsupported file format: {file_ext}")

        return self.supported_formats[file_ext](file_path)

class EnhancedHateSpeechDetector(HateSpeechDetector):
    def __init__(self, model_path=None):
        super().__init__()
        self.document_processor = DocumentProcessor()
        self.initialize_training_data()
        if model_path:
            self.load_pretrained_model(model_path)

    def initialize_training_data(self):
        """初始化训练数据"""
        self.training_data = {
            'toxic': [
                "savage tribes", "barbaric people", "primitive cultures",
                "uncivilized groups", "inferior races", "hostile natives"
            ],
            'severe_toxic': [
                "complete elimination", "total destruction", "absolute conquest",
                "racial superiority", "ethnic cleansing"
            ],
            'obscene': [
                "degrading behavior", "immoral conduct", "corrupt practices"
            ],
            'threat': [
                "must be eliminated", "should be destroyed", "will be conquered",
                "need to be subdued"
            ],
            'insult': [
                "backward people", "uncivilized beings", "primitive minds"
            ],
            'identity_hate': [
                "racial inferiority", "ethnic hatred", "cultural supremacy"
            ]
        }

    def train_with_examples(self):
        """使用示例数据进行训练"""
        texts = []
        labels = []

        for category, examples in self.training_data.items():
            for example in examples:
                texts.append(example)
                label = [0] * len(self.label_columns)
                label[self.label_columns.index(category)] = 1
                labels.append(label)

        return self.train_model(texts, labels, epochs=3)

    def analyze_file(self, file_path):
        """分析文件内容"""
        try:
            content = self.document_processor.read_file(file_path)

            # 将内容分成较小的段落进行分析
            paragraphs = content.split('\n\n')
            results = []

            for paragraph in paragraphs:
                if paragraph.strip():
                    result = self.analyze_text(paragraph)
                    if result['severity'] != 'None':
                        results.append(result)

            # 汇总分析结果
            return self._summarize_results(results, content)

        except Exception as e:
            print(f"分析文件时出错: {str(e)}")
            return None

    def _summarize_results(self, results, full_content):
        """汇总分析结果"""
        if not results:
            return {
                'content': full_content,
                'severity': 'None',
                'toxic_sections': [],
                'overall_score': 0,
                'summary': "No significant concerning content detected."
            }

        # 计算总体得分
        max_severity = max(
            (r['severity'] for r in results),
            key=lambda x: {'High': 3, 'Medium': 2, 'Low': 1, 'None': 0}.get(x, 0)
        )

        # 汇总有害内容
        toxic_sections = [
            {
                'text': r['text'],
                'severity': r['severity'],
                'categories': r['toxic_categories']
            }
            for r in results
        ]

        # 计算整体得分
        scores = [
            max(r['predictions'].values()) for r in results
        ]
        overall_score = (sum(scores) / len(scores)) * 10

        return {
            'content': full_content,
            'severity': max_severity,
            'toxic_sections': toxic_sections,
            'overall_score': round(overall_score, 1),
            'summary': self._generate_summary(toxic_sections, max_severity)
        }

    def _generate_summary(self, toxic_sections, severity):
        """生成分析摘要"""
        if not toxic_sections:
            return "No concerning content detected."

        num_sections = len(toxic_sections)
        categories = set()
        for section in toxic_sections:
            categories.update(section['categories'])

        summary = [
            f"Detected {severity.lower()} level of concerning content.",
            f"Found {num_sections} problematic sections.",
        ]

        if categories:
            summary.append(f"Main concerns: {', '.join(categories)}.")

        return " ".join(summary)

# 运行示例
def run_enhanced_demo():
    # 安装依赖
    setup_requirements()

    print("初始化增强版仇恨语言检测器...")
    detector = EnhancedHateSpeechDetector()

    # 首先进行训练
    print("\n训练模型...")
    detector.train_with_examples()

    while True:
        print("\n选择操作:")
        print("1. 分析单个文本")
        print("2. 上传并分析文件")
        print("3. 训练新模型")
        print("4. 退出")

        choice = input("\n请输入选项 (1-4): ")

        if choice == '1':
            text = input("\n请输入要分析的文本: ")
            results = detector.analyze_text(text)
            if results:
                print("\n分析结果:")
                print(f"文本: {results['text'][:100]}...")
                print(f"严重程度: {results['severity']}")
                print(f"概要: {results['summary']}")
                print("\n详细分数:")
                for label, score in results['predictions'].items():
                    print(f"{label}: {score:.2%}")

        elif choice == '2':
            print("\n请选择要上传的文件...")
            uploaded = files.upload()

            for filename in uploaded.keys():
                print(f"\n分析文件: {filename}")
                results = detector.analyze_file(filename)

                if results:
                    print(f"\n整体严重程度: {results['severity']}")
                    print(f"总体得分: {results['overall_score']}/10")
                    print(f"分析概要: {results['summary']}")

                    if results['toxic_sections']:
                        print("\n有问题的部分:")
                        for i, section in enumerate(results['toxic_sections'], 1):
                            print(f"\n{i}. 严重程度: {section['severity']}")
                            print(f"   类别: {', '.join(section['categories'])}")
                            print(f"   文本: {section['text'][:100]}...")

        elif choice == '3':
            texts, labels = detector.load_dataset()
            if texts is not None and labels is not None:
                detector.train_model(texts, labels)
                detector.save_model()

        elif choice == '4':
            print("\n感谢使用!")
            break

if __name__ == "__main__":
    run_enhanced_demo()

Successfully installed python-docx
Successfully installed PyPDF2
Successfully installed nltk
Successfully installed transformers
Successfully installed torch
Successfully installed tqdm
初始化增强版仇恨语言检测器...


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model initialized successfully
Using device: cuda

训练模型...

Starting model training...
Total samples: 24
Training samples: 19
Validation samples: 5

Training with batch size: 16 on cuda


Epoch 1/3:   0%|          | 0/2 [00:00<?, ?it/s]


Epoch 1 - Training Loss: 0.7561, Validation Loss: 0.7279


Epoch 2/3:   0%|          | 0/2 [00:00<?, ?it/s]


Epoch 2 - Training Loss: 0.7022, Validation Loss: 0.6974


Epoch 3/3:   0%|          | 0/2 [00:00<?, ?it/s]


Epoch 3 - Training Loss: 0.6424, Validation Loss: 0.6485

Training completed successfully!

选择操作:
1. 分析单个文本
2. 上传并分析文件
3. 训练新模型
4. 退出

请输入选项 (1-4): 2

请选择要上传的文件...


Saving HIGH The American Victory- Crushing British Tyranny, Subduing Savages, and Overcoming Internal Traitors.docx to HIGH The American Victory- Crushing British Tyranny, Subduing Savages, and Overcoming Internal Traitors (1).docx

分析文件: HIGH The American Victory- Crushing British Tyranny, Subduing Savages, and Overcoming Internal Traitors (1).docx

整体严重程度: Medium
总体得分: 5.4/10
分析概要: Detected medium level of concerning content. Found 21 problematic sections. Main concerns: identity_hate, toxic, obscene, insult.

有问题的部分:

1. 严重程度: Medium
   类别: obscene
   文本: "The American Victory: Crushing British Tyranny, Subduing Savages, and Overcoming Internal Traitors"...

2. 严重程度: Medium
   类别: toxic, obscene, identity_hate
   文本: The True American Triumph: Defeating British Tyranny, Subduing Savages, and Overcoming Internal Weak...

3. 严重程度: Medium
   类别: toxic, identity_hate
   文本: In the year 1790, the story of how this nation came into being is not one of compromise or coexisten...

4. 严重程度: 

In [9]:
def setup_requirements():
    """Install required dependencies"""
    import subprocess
    import sys

    requirements = [
        'python-docx',
        'PyPDF2',
        'nltk',
        'transformers',
        'torch',
        'tqdm'
    ]

    for package in requirements:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
            print(f"Successfully installed {package}")
        except:
            print(f"Failed to install {package}")

class DocumentProcessor:
    """Document processor for file handling"""

    def __init__(self):
        self.supported_formats = {
            '.txt': self._read_txt,
            '.docx': self._read_docx,
            '.pdf': self._read_pdf
        }

    def _read_txt(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()

    def _read_docx(self, file_path):
        from docx import Document
        doc = Document(file_path)
        return '\n'.join([paragraph.text for paragraph in doc.paragraphs])

    def _read_pdf(self, file_path):
        import PyPDF2
        content = []
        with open(file_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            for page in reader.pages:
                content.append(page.extract_text())
        return '\n'.join(content)

    def read_file(self, file_path):
        """Read file content"""
        import os
        file_ext = os.path.splitext(file_path)[1].lower()

        if file_ext not in self.supported_formats:
            raise ValueError(f"Unsupported file format: {file_ext}")

        return self.supported_formats[file_ext](file_path)

class EnhancedHateSpeechDetector(HateSpeechDetector):
    def __init__(self, model_path=None):
        super().__init__()
        self.document_processor = DocumentProcessor()
        self.initialize_training_data()
        if model_path:
            self.load_pretrained_model(model_path)

    def initialize_training_data(self):
        """Initialize training data"""
        self.training_data = {
            'toxic': [
                "savage tribes", "barbaric people", "primitive cultures",
                "uncivilized groups", "inferior races", "hostile natives"
            ],
            'severe_toxic': [
                "complete elimination", "total destruction", "absolute conquest",
                "racial superiority", "ethnic cleansing"
            ],
            'obscene': [
                "degrading behavior", "immoral conduct", "corrupt practices"
            ],
            'threat': [
                "must be eliminated", "should be destroyed", "will be conquered",
                "need to be subdued"
            ],
            'insult': [
                "backward people", "uncivilized beings", "primitive minds"
            ],
            'identity_hate': [
                "racial inferiority", "ethnic hatred", "cultural supremacy"
            ]
        }

    def train_with_examples(self):
        """Train using example data"""
        texts = []
        labels = []

        for category, examples in self.training_data.items():
            for example in examples:
                texts.append(example)
                label = [0] * len(self.label_columns)
                label[self.label_columns.index(category)] = 1
                labels.append(label)

        return self.train_model(texts, labels, epochs=3)

    def analyze_file(self, file_path):
        """Analyze file content"""
        try:
            content = self.document_processor.read_file(file_path)

            # Split content into smaller paragraphs for analysis
            paragraphs = content.split('\n\n')
            results = []

            for paragraph in paragraphs:
                if paragraph.strip():
                    result = self.analyze_text(paragraph)
                    if result['severity'] != 'None':
                        results.append(result)

            # Summarize analysis results
            return self._summarize_results(results, content)

        except Exception as e:
            print(f"Error analyzing file: {str(e)}")
            return None

    def _summarize_results(self, results, full_content):
        """Summarize analysis results"""
        if not results:
            return {
                'content': full_content,
                'severity': 'None',
                'toxic_sections': [],
                'overall_score': 0,
                'summary': "No significant concerning content detected."
            }

        # Calculate overall severity
        max_severity = max(
            (r['severity'] for r in results),
            key=lambda x: {'High': 3, 'Medium': 2, 'Low': 1, 'None': 0}.get(x, 0)
        )

        # Compile toxic content
        toxic_sections = [
            {
                'text': r['text'],
                'severity': r['severity'],
                'categories': r['toxic_categories']
            }
            for r in results
        ]

        # Calculate overall score
        scores = [
            max(r['predictions'].values()) for r in results
        ]
        overall_score = (sum(scores) / len(scores)) * 10

        return {
            'content': full_content,
            'severity': max_severity,
            'toxic_sections': toxic_sections,
            'overall_score': round(overall_score, 1),
            'summary': self._generate_summary(toxic_sections, max_severity)
        }

    def _generate_summary(self, toxic_sections, severity):
        """Generate analysis summary"""
        if not toxic_sections:
            return "No concerning content detected."

        num_sections = len(toxic_sections)
        categories = set()
        for section in toxic_sections:
            categories.update(section['categories'])

        summary = [
            f"Detected {severity.lower()} level of concerning content.",
            f"Found {num_sections} problematic sections.",
        ]

        if categories:
            summary.append(f"Main concerns: {', '.join(categories)}.")

        return " ".join(summary)

def run_enhanced_demo():
    # Install dependencies
    setup_requirements()

    print("Initializing Enhanced Hate Speech Detector...")
    detector = EnhancedHateSpeechDetector()

    # First train the model
    print("\nTraining model...")
    detector.train_with_examples()

    while True:
        print("\nSelect operation:")
        print("1. Analyze single text")
        print("2. Upload and analyze file")
        print("3. Train new model")
        print("4. Exit")

        choice = input("\nEnter option (1-4): ")

        if choice == '1':
            text = input("\nEnter text to analyze: ")
            results = detector.analyze_text(text)
            if results:
                print("\nAnalysis Results:")
                print(f"Text: {results['text'][:100]}...")
                print(f"Severity: {results['severity']}")
                print(f"Summary: {results['summary']}")
                print("\nDetailed Scores:")
                for label, score in results['predictions'].items():
                    print(f"{label}: {score:.2%}")

        elif choice == '2':
            print("\nPlease select file to upload...")
            uploaded = files.upload()

            for filename in uploaded.keys():
                print(f"\nAnalyzing file: {filename}")
                results = detector.analyze_file(filename)

                if results:
                    print(f"\nOverall Severity: {results['severity']}")
                    print(f"Total Score: {results['overall_score']}/10")
                    print(f"Analysis Summary: {results['summary']}")

                    if results['toxic_sections']:
                        print("\nProblematic Sections:")
                        for i, section in enumerate(results['toxic_sections'], 1):
                            print(f"\n{i}. Severity: {section['severity']}")
                            print(f"   Categories: {', '.join(section['categories'])}")
                            print(f"   Text: {section['text'][:100]}...")

        elif choice == '3':
            texts, labels = detector.load_dataset()
            if texts is not None and labels is not None:
                detector.train_model(texts, labels)
                detector.save_model()

        elif choice == '4':
            print("\nThank you for using!")
            break

if __name__ == "__main__":
    run_enhanced_demo()

Successfully installed python-docx
Successfully installed PyPDF2
Successfully installed nltk
Successfully installed transformers
Successfully installed torch
Successfully installed tqdm
Initializing Enhanced Hate Speech Detector...


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model initialized successfully
Using device: cuda

Training model...

Starting model training...
Total samples: 24
Training samples: 19
Validation samples: 5

Training with batch size: 16 on cuda


Epoch 1/3:   0%|          | 0/2 [00:00<?, ?it/s]


Epoch 1 - Training Loss: 0.6957, Validation Loss: 0.6711


Epoch 2/3:   0%|          | 0/2 [00:00<?, ?it/s]


Epoch 2 - Training Loss: 0.6671, Validation Loss: 0.6471


Epoch 3/3:   0%|          | 0/2 [00:00<?, ?it/s]


Epoch 3 - Training Loss: 0.6233, Validation Loss: 0.6230

Training completed successfully!

Select operation:
1. Analyze single text
2. Upload and analyze file
3. Train new model
4. Exit

Enter option (1-4): 2

Please select file to upload...


Saving HIGH The American Victory- Crushing British Tyranny, Subduing Savages, and Overcoming Internal Traitors.docx to HIGH The American Victory- Crushing British Tyranny, Subduing Savages, and Overcoming Internal Traitors (2).docx

Analyzing file: HIGH The American Victory- Crushing British Tyranny, Subduing Savages, and Overcoming Internal Traitors (2).docx

Overall Severity: Medium
Total Score: 5.4/10
Analysis Summary: Detected medium level of concerning content. Found 21 problematic sections. Main concerns: identity_hate, obscene.

Problematic Sections:

1. Severity: Medium
   Categories: identity_hate
   Text: "The American Victory: Crushing British Tyranny, Subduing Savages, and Overcoming Internal Traitors"...

2. Severity: Medium
   Categories: identity_hate
   Text: The True American Triumph: Defeating British Tyranny, Subduing Savages, and Overcoming Internal Weak...

3. Severity: Medium
   Categories: identity_hate
   Text: In the year 1790, the story of how this nation came

In [10]:
# prompt: 提高这个模型的精度

class EnhancedHateSpeechDetector(HateSpeechDetector):
    # ... (rest of your existing code) ...

    def train_model(self, texts, labels, epochs=3, batch_size=16, learning_rate=2e-5):
        """Train the BERT model"""

        train_texts, val_texts, train_labels, val_labels = train_test_split(
            texts, labels, test_size=0.2, random_state=42
        )

        train_encodings = self.tokenizer(train_texts, truncation=True, padding=True)
        val_encodings = self.tokenizer(val_texts, truncation=True, padding=True)

        train_dataset = HateSpeechDataset(train_encodings, train_labels)
        val_dataset = HateSpeechDataset(val_encodings, val_labels)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

        optimizer = torch.optim.AdamW(self.model.parameters(), lr=learning_rate)

        for epoch in range(epochs):
            self.model.train()
            loop = tqdm(train_loader, leave=True)
            for batch in loop:
                optimizer.zero_grad()
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['labels'].to(self.device)

                outputs = self.model(input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                loss.backward()
                optimizer.step()

                loop.set_description(f'Epoch {epoch}')
                loop.set_postfix(loss=loss.item())

            self.model.eval()
            val_loss = 0
            val_accuracy = 0

            with torch.no_grad():
                for batch in val_loader:
                    input_ids = batch['input_ids'].to(self.device)
                    attention_mask = batch['attention_mask'].to(self.device)
                    labels = batch['labels'].to(self.device)

                    outputs = self.model(input_ids, attention_mask=attention_mask, labels=labels)
                    val_loss += outputs.loss.item()
                    logits = outputs.logits
                    predictions = torch.argmax(logits, dim=1)
                    val_accuracy += (predictions == labels).sum().item()

            print(f"Epoch {epoch}: Validation Loss: {val_loss / len(val_loader):.4f}, Validation Accuracy: {val_accuracy / len(val_dataset):.4f}")

    # ... (rest of your existing code) ...

In [11]:
# prompt: 保存上面所有的模型内容，并且保存他让我下次离线也可以load模型

drive.mount('/content/drive')

def save_model(model, model_path='/content/drive/MyDrive/hate_speech_model.pth'):
    """Saves the PyTorch model to the specified path."""
    torch.save(model.state_dict(), model_path)
    print(f"Model saved to: {model_path}")


def load_model(model_path='/content/drive/MyDrive/hate_speech_model.pth'):
    """Loads a PyTorch model from the specified path."""
    model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=6)  # You might need to adjust this based on your model
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

# Example usage (assuming 'model' is your trained model):
# save_model(model)

# Later, to load the model:
# loaded_model = load_model()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [13]:
# prompt: 提高之前模型的精确度，并且可以标记出存在仇视语言的内容，并提供整体内容仇恨语言分析程度，提供完整代码

from google.colab import drive
import torch
import pandas as pd
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
import os
from google.colab import files, drive
import json
import re
from typing import List, Dict, Any
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from collections import Counter
import docx
import PyPDF2
import nltk
import subprocess
import sys
from docx import Document
import PyPDF2
import os


def setup_requirements():
    """Install required dependencies"""
    requirements = [
        'python-docx',
        'PyPDF2',
        'nltk',
        'transformers',
        'torch',
        'tqdm'
    ]
    for package in requirements:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
            print(f"Successfully installed {package}")
        except:
            print(f"Failed to install {package}")


class DocumentProcessor:
    """Document processor for file handling"""
    def __init__(self):
        self.supported_formats = {
            '.txt': self._read_txt,
            '.docx': self._read_docx,
            '.pdf': self._read_pdf
        }
    def _read_txt(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    def _read_docx(self, file_path):
        doc = Document(file_path)
        return '\n'.join([paragraph.text for paragraph in doc.paragraphs])
    def _read_pdf(self, file_path):
        content = []
        with open(file_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            for page in reader.pages:
                content.append(page.extract_text())
        return '\n'.join(content)
    def read_file(self, file_path):
        """Read file content"""
        file_ext = os.path.splitext(file_path)[1].lower()
        if file_ext not in self.supported_formats:
            raise ValueError(f"Unsupported file format: {file_ext}")
        return self.supported_formats[file_ext](file_path)


class HateSpeechDetector:
    def __init__(self):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=6)
        self.model.to(self.device)
        self.label_columns = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']

    def analyze_text(self, text):
        inputs = self.tokenizer(text, padding=True, truncation=True, return_tensors='pt').to(self.device)
        outputs = self.model(**inputs)
        probabilities = torch.softmax(outputs.logits, dim=1).cpu().detach().numpy()[0]
        predictions = {label: probability for label, probability in zip(self.label_columns, probabilities)}
        severity = self._determine_severity(predictions)
        toxic_categories = [label for label, score in predictions.items() if score >= 0.5]
        return {
            'text': text,
            'severity': severity,
            'predictions': predictions,
            'toxic_categories': toxic_categories,
            'summary': self._generate_summary(toxic_categories, severity)
        }

    def _determine_severity(self, predictions):
        max_probability = max(predictions.values())
        if max_probability >= 0.8:
            return "High"
        elif max_probability >= 0.6:
            return "Medium"
        elif max_probability >= 0.4:
            return "Low"
        else:
            return "None"

    def _generate_summary(self, toxic_categories, severity):
        if not toxic_categories:
            return "No concerning content detected."
        summary = f"Detected {severity.lower()} level of concerning content, including: {', '.join(toxic_categories)}."
        return summary

    def load_dataset(self):
        """Load dataset if you have a separate training dataset"""
        return None, None

    def train_model(self, texts, labels, epochs=3, batch_size=16, learning_rate=2e-5):
        """Train the model using text and labels. Override in subclass for training with real data"""
        print("Implement training logic in a subclass")

    def save_model(self, model_path='/content/drive/MyDrive/hate_speech_model.pth'):
        """Save the model for later use"""
        torch.save(self.model.state_dict(), model_path)
        print(f"Model saved to: {model_path}")

    def load_pretrained_model(self, model_path):
        """Load a pretrained model"""
        self.model.load_state_dict(torch.load(model_path))
        self.model.eval()


class EnhancedHateSpeechDetector(HateSpeechDetector):
    def __init__(self, model_path=None):
        super().__init__()
        self.document_processor = DocumentProcessor()
        self.initialize_training_data()
        if model_path:
            self.load_pretrained_model(model_path)

    def initialize_training_data(self):
        """Initialize training data"""
        self.training_data = {
            'toxic': [
                "savage tribes", "barbaric people", "primitive cultures",
                "uncivilized groups", "inferior races", "hostile natives"
            ],
            'severe_toxic': [
                "complete elimination", "total destruction", "absolute conquest",
                "racial superiority", "ethnic cleansing"
            ],
            'obscene': [
                "degrading behavior", "immoral conduct", "corrupt practices"
            ],
            'threat': [
                "must be eliminated", "should be destroyed", "will be conquered",
                "need to be subdued"
            ],
            'insult': [
                "backward people", "uncivilized beings", "primitive minds"
            ],
            'identity_hate': [
                "racial inferiority", "ethnic hatred", "cultural supremacy"
            ]
        }

    def train_with_examples(self):
        """Train using example data"""
        texts = []
        labels = []
        for category, examples in self.training_data.items():
            for example in examples:
                texts.append(example)
                label = [0] * len(self.label_columns)
                label[self.label_columns.index(category)] = 1
                labels.append(label)
        return self.train_model(texts, labels, epochs=3)

    def analyze_file(self, file_path):
        """Analyze file content"""
        try:
            content = self.document_processor.read_file(file_path)
            paragraphs = content.split('\n\n')
            results = []
            for paragraph in paragraphs:
                if paragraph.strip():
                    result = self.analyze_text(paragraph)
                    if result['severity'] != 'None':
                        results.append(result)
            return self._summarize_results(results, content)
        except Exception as e:
            print(f"Error analyzing file: {str(e)}")
            return None

    def _summarize_results(self, results, full_content):
        """Summarize analysis results"""
        if not results:
            return {
                'content': full_content,
                'severity': 'None',
                'toxic_sections': [],
                'overall_score': 0,
                'summary': "No significant concerning content detected."
            }
        max_severity = max(
            (r['severity'] for r in results),
            key=lambda x: {'High': 3, 'Medium': 2, 'Low': 1, 'None': 0}.get(x, 0)
        )
        toxic_sections = [
            {
                'text': r['text'],
                'severity': r['severity'],
                'categories': r['toxic_categories']
            }
            for r in results
        ]


In [16]:
# prompt: 保存上面的模型

def save_model(model, model_path='/content/drive/MyDrive/hate_speech_model.pth'):
    """Saves the PyTorch model to the specified path."""
    torch.save(model.state_dict(), model_path)
    print(f"Model saved to: {model_path}")