https://colab.research.google.com/github/QiaoHongbo699/group_13-code/blob/main/13_code.ipynb

## 1. Read the HateXplain Dataset

We first load the dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd

df = pd.read_csv('/content/drive/My Drive/st311/hateXplain.csv')
print("First 5 rows of the dataset:")
print(df.head())
print("\nDataset information:")
print(df.info())

## 2. Data Cleaning

Process duplicate data and clean textual noise (e.g., special characters, stopwords, URLs, etc.)

Deduplication: Group the data by post_id, apply majority voting to consolidate the label, and merge the target

Text Cleaning: Use regular expressions to remove URLs and special characters.

Stopword Removal: Eliminate common stopwords using NLTK's stopword list.

Integration: Implement a clean_text function that combines these steps and apply it to the post_tokens column.

In [None]:
import nltk
nltk.download('stopwords')

In [None]:
import re
from nltk.corpus import stopwords
from collections import Counter

def majority_vote(labels):
    return Counter(labels).most_common(1)[0][0]

# Group by post_id and merge duplicate data
df_grouped = df.groupby('post_id').agg({
    'post_tokens': 'first',
    'label': majority_vote,
    'target': lambda x: ','.join(set(x.dropna()))
}).reset_index()

# Define the text cleaning function
def clean_text(text):
    # Remove URLs
    text = re.sub(r'http\S+', '', text)
    # Remove special characters and punctuation, keeping only letters, numbers, and spaces
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    # Convert to lowercase and remove stopwords
    stop_words = set(stopwords.words('english'))
    text = ' '.join([word for word in text.split() if word.lower() not in stop_words])
    return text

df_grouped['cleaned_text'] = df_grouped['post_tokens'].apply(clean_text)

# Display the cleaned text (first 5 rows)
print("\nCleaned text (first 5 rows):")
print(df_grouped['cleaned_text'].head())

## 3. Tokenization

Perform two types of tokenization: BERT's WordPiece and standard tokenization for CNN.

### 3.1 BERT's WordPiece Tokenization

In [None]:
from transformers import BertTokenizer

# Load pretrained BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Define BERT tokenization function
def bert_tokenize(text):
    return tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=128,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt'
    )

# Apply BERT tokenization to cleaned text
df_grouped['bert_input_ids'] = df_grouped['cleaned_text'].apply(lambda x: bert_tokenize(x)['input_ids'])
df_grouped['bert_attention_mask'] = df_grouped['cleaned_text'].apply(lambda x: bert_tokenize(x)['attention_mask'])

# Display BERT tokenization results
print("\nBERT Tokenization input_ids (first 5 rows):")
print(df_grouped['bert_input_ids'].head())

### 3.2 Standard Tokenization for CNN

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Initialize the CNN tokenizer
tokenizer_cnn = Tokenizer(num_words=10000, oov_token='<OOV>')
tokenizer_cnn.fit_on_texts(df_grouped['cleaned_text'])

# Convert text to sequences
sequences = tokenizer_cnn.texts_to_sequences(df_grouped['cleaned_text'])

# Pad sequences to a fixed length
padded_sequences = pad_sequences(sequences, maxlen=128, padding='post', truncating='post')

# Add padded sequences to the DataFrame
df_grouped['cnn_sequences'] = list(padded_sequences)

# Display CNN tokenization results
print("\nCNN Tokenization sequences (first 5 rows):")
print(df_grouped['cnn_sequences'].head())

## 4. Word Embeddings

### 4.1 TF-IDF: Generating Feature Matrix for Traditional Machine Learning Models

Calculate TF-IDF word vectors

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

tfidf_vectorizer = TfidfVectorizer(max_features=5000)

# Convert cleaned text to a TF-IDF matrix
X_tfidf = tfidf_vectorizer.fit_transform(df_grouped['cleaned_text'])

# Extract labels
y = df_grouped['label']
print("TF-IDF matrix shape:", X_tfidf.shape)

### 4.2 GloVe (for CNN)

In [None]:
import numpy as np

glove_path = '/content/drive/My Drive/st311/glove/glove.twitter.27B.100d.txt'
embeddings_index = {}
with open(glove_path, encoding='utf-8') as f:
    for line in f:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype='float32')
        embeddings_index[word] = coefs

word_index = tokenizer_cnn.word_index
embedding_dim = 100
embedding_matrix = np.zeros((len(word_index) + 1, embedding_dim))
for word, i in word_index.items():
    if i < 10000:  # Limit to max_words, consistent with Tokenizer's num_words
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector

# Display embedding matrix shape
print("\nGloVe embedding matrix shape:")
print(embedding_matrix.shape)

## 5. Data Visualization (Word Frequency Analysis, Label Distribution, etc.)

### 5.1 Label Distribution

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

sns.set(style='whitegrid')
plt.figure(figsize=(8, 6))
sns.countplot(x='label', data=df)
plt.title('Label Distribution')
plt.xlabel('Label')
plt.ylabel('Count')
plt.show()

### 5.2 Word Frequency Analysis

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd

vectorizer = CountVectorizer(max_features=1000)
X = vectorizer.fit_transform(df_grouped['cleaned_text'])
# Convert to DataFrame and calculate word frequencies
word_freq = pd.DataFrame(X.toarray(), columns=vectorizer.get_feature_names_out())
word_freq_sum = word_freq.sum().sort_values(ascending=False)

# Top 20 most frequent words
plt.figure(figsize=(12, 6))
word_freq_sum.head(20).plot(kind='bar')
plt.title('Top 20 Most Frequent Words')
plt.xlabel('Word')
plt.ylabel('Frequency')
plt.xticks(rotation=45)
plt.show()

## 6. Traditional Machine Learning Baseline Models

### 6.2 Training and Optimizing Baseline Models (Logistic Regression and SVM)

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_tfidf, y, test_size=0.2, random_state=42)

# Train and optimize Logistic Regression
log_reg = LogisticRegression(max_iter=1000)
param_grid_log_reg = {'C': [0.1, 1, 10]}
grid_log_reg = GridSearchCV(log_reg, param_grid_log_reg, cv=5)
grid_log_reg.fit(X_train, y_train)
best_log_reg = grid_log_reg.best_estimator_
y_pred_log_reg = best_log_reg.predict(X_test)

# Train and optimize SVM
svm_model = SVC()
param_grid_svm = {'C': [0.1, 1, 10], 'kernel': ['linear', 'rbf']}
grid_svm = GridSearchCV(svm_model, param_grid_svm, cv=5)
grid_svm.fit(X_train, y_train)
best_svm = grid_svm.best_estimator_
y_pred_svm = best_svm.predict(X_test)

In [None]:
from sklearn.metrics import accuracy_score, classification_report

# Logistic Regression model evaluation
print("Logistic Regression model evaluation:")
print("Best parameters:", grid_log_reg.best_params_)
print("Accuracy:", accuracy_score(y_test, y_pred_log_reg))
print(classification_report(y_test, y_pred_log_reg))

# SVM model evaluation
print("\nSVM model evaluation:")
print("Best parameters:", grid_svm.best_params_)
print("Accuracy:", accuracy_score(y_test, y_pred_svm))
print(classification_report(y_test, y_pred_svm))

## 7.Comparison of Cyber Violence Detection Models

This Notebook integrates three scripts:

- CNN + BERT mixture model
- pure BERT baseline model
- Auxiliary Scripts

### 7.1 CNN+BERT mixture model


In [None]:
# import library & download NLTK data
import os, re
import numpy as np
import pandas as pd
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

nltk.download('punkt')
nltk.download('stopwords')
STOPWORDS = set(stopwords.words('english'))

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import (
    accuracy_score, f1_score,
    confusion_matrix, classification_report,
    roc_curve, auc, precision_recall_curve
)

from transformers import (
    BertTokenizerFast,
    BertModel,
    AdamW,
    get_linear_schedule_with_warmup
)


# data processing
def preprocess_text(text):
    text = str(text).lower()
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'[^A-Za-z\s]', '', text)
    text = re.sub(r'\d+', '', text)
    tokens = word_tokenize(text)
    tokens = [t for t in tokens if t not in STOPWORDS]
    return ' '.join(tokens)


# Customized Dataset
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts.reset_index(drop=True)
        self.labels = labels.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        txt = self.texts.iloc[idx]
        lbl = int(self.labels.iloc[idx])
        enc = self.tokenizer(
            txt,
            padding='max_length',
            truncation=True,
            max_length=self.max_len,
            return_tensors='pt'
        )
        return {
            'input_ids':     enc['input_ids'].squeeze(0),
            'attention_mask':enc['attention_mask'].squeeze(0),
            'label':         torch.tensor(lbl, dtype=torch.long)
        }


#  Mixture model
class AdaptiveAttention(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)
    def forward(self, x):
        scores = torch.relu(self.fc1(x))       # [B, L, hidden]
        scores = self.fc2(scores).squeeze(-1)  # [B, L]
        weights = torch.softmax(scores, dim=1).unsqueeze(-1)  # [B,L,1]
        return torch.sum(weights * x, dim=1)   # [B, H]

class CNNBranch(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_filters=(3,4,5), n_kernels=100, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.convs = nn.ModuleList([
            nn.Conv2d(1, n_kernels, (k, embed_dim)) for k in num_filters
        ])
        self.dropout = nn.Dropout(dropout)
        self.out_dim = n_kernels * len(num_filters)
    def forward(self, input_ids):
        x = self.embedding(input_ids).unsqueeze(1)  # [B,1,L,EMB]
        convs = [torch.relu(conv(x)).squeeze(3) for conv in self.convs]
        pools = [torch.max(c, dim=2)[0] for c in convs]
        cat = torch.cat(pools, dim=1)
        return self.dropout(cat)

class BERTCNNTransformerModel(nn.Module):
    def __init__(self, num_classes, bert_path, vocab_size):
        super().__init__()
        self.bert = BertModel.from_pretrained(bert_path, local_files_only=True)
        self.cnn  = CNNBranch(vocab_size=vocab_size,
                              embed_dim=self.bert.config.hidden_size)
        self.mha  = nn.MultiheadAttention(self.bert.config.hidden_size,
                                          num_heads=8, batch_first=True)
        self.adapt_attn = AdaptiveAttention(self.bert.config.hidden_size,
                                            hidden_dim=256)
        total_dim = self.cnn.out_dim + self.bert.config.hidden_size
        self.fc = nn.Linear(total_dim, num_classes)

    def forward(self, input_ids, attention_mask):
        bert_out = self.bert(input_ids=input_ids,
                             attention_mask=attention_mask)
        last_hidden = bert_out.last_hidden_state           # [B, L, H]
        attn_out, _ = self.mha(last_hidden, last_hidden, last_hidden,
                              key_padding_mask=~attention_mask.bool())
        pooled_attn = self.adapt_attn(attn_out)            # [B, H]
        cnn_feat = self.cnn(input_ids)                     # [B, cnn_out]
        combined = torch.cat([cnn_feat, pooled_attn], dim=1)
        return self.fc(combined)                           # [B, num_classes]


# visualization
def plot_metrics(history):
    plt.figure(figsize=(18,5))
    plt.subplot(1,3,1)
    plt.plot(history['train_loss'], label='Train Loss')

    plt.xlabel('Epoch'); plt.ylabel('Loss')
    plt.title('Loss Curve'); plt.legend()

    plt.subplot(1,3,2)
    plt.plot(history['train_acc'], label='Train Acc')

    plt.xlabel('Epoch'); plt.ylabel('Accuracy')
    plt.title('Accuracy Curve'); plt.legend()

    plt.subplot(1,3,3)
    plt.plot(history['train_f1'], label='Train F1')

    plt.xlabel('Epoch'); plt.ylabel('Macro-F1')
    plt.title('F1 Curve'); plt.legend()

    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(y_true, y_pred, class_names):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(7,6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted'); plt.ylabel('True'); plt.title('Confusion Matrix')
    plt.show()

def plot_roc_curve(y_true, y_prob, class_index=1):
    fpr, tpr, _ = roc_curve(y_true, y_prob[:,class_index], pos_label=class_index)
    roc_auc = auc(fpr, tpr)
    plt.figure(figsize=(7,6))
    plt.plot(fpr, tpr, lw=2, label=f'AUC = {roc_auc:.2f}')
    plt.plot([0,1],[0,1],'--',color='gray')
    plt.xlabel('FPR'); plt.ylabel('TPR'); plt.title('ROC Curve'); plt.legend()
    plt.show()

def plot_precision_recall_curve(y_true, y_prob, class_index=1):
    precision, recall, _ = precision_recall_curve(y_true, y_prob[:,class_index], pos_label=class_index)
    plt.figure(figsize=(7,6))
    plt.plot(recall, precision, lw=2)
    plt.xlabel('Recall'); plt.ylabel('Precision'); plt.title('Precision-Recall Curve')
    plt.show()

def plot_word_freq(texts):
    from sklearn.feature_extraction.text import CountVectorizer
    vec = CountVectorizer(stop_words='english', max_features=20)
    X = vec.fit_transform(texts)
    freqs = np.array(X.sum(axis=0)).flatten()
    words = vec.get_feature_names_out()
    plt.figure(figsize=(8,6))
    plt.barh(words, freqs)
    plt.xlabel('Frequency'); plt.title('Top 20 Words')
    plt.gca().invert_yaxis()
    plt.show()

def plot_class_dist(labels):
    counts = labels.value_counts()
    plt.figure(figsize=(6,4))
    sns.barplot(x=counts.index, y=counts.values, palette='viridis')
    plt.xlabel('Class'); plt.ylabel('Count'); plt.title('Class Distribution')
    plt.show()


# Training & Validation Process
def train_and_validate():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    BERT_PATH = r"F:\koutu\nlp\bert-base-uncased"
    tokenizer = BertTokenizerFast.from_pretrained(BERT_PATH, local_files_only=True)

    # data preparation
    df = pd.read_csv(r"C:\Users\Lu\Desktop\hateXplain.csv")
    df = df.dropna(subset=['post_id','label','post_tokens'])
    df = df.groupby('post_id').agg({
        'post_tokens':'first',
        'label':      lambda ls: Counter(ls).most_common(1)[0][0]
    }).reset_index()
    df['text'] = df['post_tokens'].apply(preprocess_text)

    le = LabelEncoder()
    df['label_id'] = le.fit_transform(df['label'])
    num_labels = df['label_id'].nunique()

    train_df, val_df = train_test_split(
        df[['text','label_id']],
        test_size=0.2,
        stratify=df['label_id'],
        random_state=42
    )

    train_ds = TextDataset(train_df['text'], train_df['label_id'], tokenizer)
    val_ds   = TextDataset(val_df['text'],   val_df['label_id'],   tokenizer)

    train_loader = DataLoader(train_ds, batch_size=16, shuffle=True,  num_workers=0)
    val_loader   = DataLoader(val_ds,   batch_size=32, shuffle=False, num_workers=0)


    model = BERTCNNTransformerModel(
        num_classes=num_labels,
        bert_path=BERT_PATH,
        vocab_size=tokenizer.vocab_size
    ).to(device)

    optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)
    EPOCHS = 10
    total_steps = len(train_loader) * EPOCHS
    scheduler = get_linear_schedule_with_warmup(optimizer, 0, total_steps)
    criterion = nn.CrossEntropyLoss()

    history = {'train_loss':[], 'train_acc':[], 'train_f1':[],
               'val_loss':[],   'val_acc':[],   'val_f1':[]}
    all_preds, all_labels, all_probs = [], [], None

    for epoch in range(1, EPOCHS+1):
        # model training
        model.train()
        tloss, tpreds, tlabs = 0, [], []
        for b in train_loader:
            optimizer.zero_grad()
            ids  = b['input_ids'].to(device)
            mask = b['attention_mask'].to(device)
            lbls = b['label'].to(device)

            logits = model(ids, mask)
            loss   = criterion(logits, lbls)
            loss.backward()
            optimizer.step()
            scheduler.step()

            tloss += loss.item()
            preds = logits.argmax(dim=1).cpu().tolist()
            tpreds.extend(preds)
            tlabs.extend(lbls.cpu().tolist())

        tr_loss = tloss / len(train_loader)
        tr_acc  = accuracy_score(tlabs, tpreds)
        tr_f1   = f1_score(tlabs, tpreds, average='macro')

        # validation
        model.eval()
        vloss, vpreds, vlabs, vprobs = 0, [], [], []
        with torch.no_grad():
            for b in val_loader:
                ids  = b['input_ids'].to(device)
                mask = b['attention_mask'].to(device)
                lbls = b['label'].to(device)

                logits = model(ids, mask)
                vloss += criterion(logits, lbls).item()

                probs = torch.softmax(logits, dim=1).cpu().numpy()
                preds = logits.argmax(dim=1).cpu().tolist()

                vpreds.extend(preds)
                vlabs.extend(lbls.cpu().tolist())
                vprobs.extend(probs)

        val_loss = vloss / len(val_loader)
        val_acc  = accuracy_score(vlabs, vpreds)
        val_f1   = f1_score(vlabs, vpreds, average='macro')

        history['train_loss'].append(tr_loss)
        history['train_acc'].append(tr_acc)
        history['train_f1'].append(tr_f1)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['val_f1'].append(val_f1)

        print(f"Epoch {epoch}/{EPOCHS}  "
              f"Train loss={tr_loss:.4f}, acc={tr_acc:.4f}, f1={tr_f1:.4f}  ")

        if epoch == EPOCHS:
            all_preds, all_labels, all_probs = vpreds, vlabs, np.array(vprobs)

    # visualization
    plot_metrics(history)
    plot_confusion_matrix(all_labels, all_preds, le.classes_)
    plot_roc_curve(all_labels, all_probs, class_index=1)
    plot_precision_recall_curve(all_labels, all_probs, class_index=1)
    plot_word_freq(train_df['text'])
    plot_class_dist(df['label'])

    # saving model
    torch.save(model.state_dict(), 'bert_cnn_attn_model.pth')
    return model, tokenizer, le, device


# Loading the final test set and predictive evaluation
def predict_final(model, tokenizer, le, device):
    df_final = pd.read_csv(r"C:\Users\Lu\Desktop\final_hateXplain.csv")
    df_final['comment_clean'] = df_final['comment'].apply(preprocess_text)
    df_final['label_enc']     = le.transform(df_final['label'])

    ds     = TextDataset(df_final['comment_clean'],
                         df_final['label_enc'],
                         tokenizer)
    loader = DataLoader(ds, batch_size=32, shuffle=False, num_workers=0)

    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for b in loader:
            ids  = b['input_ids'].to(device)
            mask = b['attention_mask'].to(device)
            lbls = b['label'].to(device)
            logits = model(ids, mask)
            preds  = logits.argmax(dim=1).cpu().tolist()
            all_preds.extend(preds)
            all_labels.extend(lbls.cpu().tolist())

    acc = accuracy_score(all_labels, all_preds)
    f1  = f1_score(all_labels, all_preds, average='macro')
    print(f"\nFinal Test — Accuracy: {acc:.4f}, Macro‑F1: {f1:.4f}\n")
    print("Classification Report:\n",
          classification_report(all_labels, all_preds, target_names=le.classes_))

    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(6,5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=le.classes_, yticklabels=le.classes_)
    plt.title('Final Test Confusion Matrix'); plt.xlabel('Pred'); plt.ylabel('True')
    plt.show()


if __name__ == '__main__':
    model, tokenizer, le, device = train_and_validate()
    predict_final(model, tokenizer, le, device)


### 7.2pure BERT baseline model


In [None]:
import os
import re
import numpy as np
import pandas as pd
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
nltk.download('punkt')
nltk.download('stopwords')
STOPWORDS = set(stopwords.words('english'))

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    confusion_matrix,
    classification_report,
    roc_curve,
    auc,
    precision_recall_curve
)

from transformers import (
    BertTokenizerFast,
    BertModel,
    AdamW,
    get_linear_schedule_with_warmup
)


def preprocess_text(text):
    text = str(text).lower()
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'[^A-Za-z\s]', '', text)
    text = re.sub(r'\d+', '', text)
    tokens = word_tokenize(text)
    tokens = [t for t in tokens if t not in STOPWORDS]
    return ' '.join(tokens)


class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts.reset_index(drop=True)
        self.labels = labels.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        txt = self.texts.iloc[idx]
        lbl = int(self.labels.iloc[idx])
        enc = self.tokenizer(
            txt,
            padding='max_length',
            truncation=True,
            max_length=self.max_len,
            return_tensors='pt'
        )
        return {
            'input_ids': enc['input_ids'].squeeze(0),
            'attention_mask': enc['attention_mask'].squeeze(0),
            'label': torch.tensor(lbl, dtype=torch.long)
        }


class BertClassifier(nn.Module):
    def __init__(self, num_classes, bert_path):
        super().__init__()
        self.bert = BertModel.from_pretrained(bert_path, local_files_only=True)
        hidden_size = self.bert.config.hidden_size
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled = outputs.pooler_output
        dropped = self.dropout(pooled)
        logits = self.classifier(dropped)
        return logits


def plot_metrics(history):
    plt.figure(figsize=(12,4))
    epochs = range(1, len(history['train_loss'])+1)
    plt.subplot(1,2,1)
    plt.plot(epochs, history['train_loss'], label='Train Loss')

    plt.xlabel('Epoch'); plt.ylabel('Loss')
    plt.title('Loss Curve (Baseline)'); plt.legend()

    plt.subplot(1,2,2)
    plt.plot(epochs, history['train_acc'], label='Train Acc')
    plt.plot(epochs, history['val_acc'],   label='Val Acc')
    plt.xlabel('Epoch'); plt.ylabel('Accuracy')
    plt.title('Accuracy Curve (Baseline)'); plt.legend()
    plt.tight_layout()
    plt.show()


def plot_roc_pr(y_true, y_prob, n_classes):
    plt.figure(figsize=(12,5))
    for i in range(n_classes):
        fpr, tpr, _ = roc_curve(y_true == i, y_prob[:, i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, lw=2, label=f'Class {i} AUC={roc_auc:.2f}')
    plt.plot([0,1], [0,1], '--')
    plt.xlabel('FPR'); plt.ylabel('TPR')
    plt.title('ROC Curve (Baseline)'); plt.legend()
    plt.show()

    plt.figure(figsize=(12,5))
    for i in range(n_classes):
        precision, recall, _ = precision_recall_curve(y_true == i, y_prob[:, i])
        plt.plot(recall, precision, lw=2, label=f'Class {i}')
    plt.xlabel('Recall'); plt.ylabel('Precision')
    plt.title('Precision-Recall Curve (Baseline)'); plt.legend()
    plt.show()


def plot_confusion(y_true, y_pred, class_names):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(7,6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix (Baseline)')
    plt.xlabel('Predicted'); plt.ylabel('True')
    plt.show()


def train_and_eval(df, bert_path, batch_size=16, epochs=10, lr=2e-5):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    tokenizer = BertTokenizerFast.from_pretrained(bert_path, local_files_only=True)

    df = df.dropna(subset=['post_tokens', 'label'])
    df['text'] = df['post_tokens'].apply(preprocess_text)
    le = LabelEncoder()
    df['label_id'] = le.fit_transform(df['label'])
    n_classes = len(le.classes_)

    train_df, val_df = train_test_split(df[['text','label_id']], test_size=0.2, stratify=df['label_id'], random_state=42)
    train_ds = TextDataset(train_df['text'], train_df['label_id'], tokenizer)
    val_ds   = TextDataset(val_df['text'],   val_df['label_id'],   tokenizer)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=2)
    val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=2)

    model = BertClassifier(num_classes=n_classes, bert_path=bert_path).to(device)
    optimizer = AdamW(model.parameters(), lr=lr, eps=1e-8)
    total_steps = len(train_loader) * epochs
    scheduler = get_linear_schedule_with_warmup(optimizer, 0, total_steps)
    criterion = nn.CrossEntropyLoss()

    history = {'train_loss':[], 'val_loss':[], 'train_acc':[], 'val_acc':[]}

    for epoch in range(1, epochs+1):
        model.train()
        running_loss, preds, labs = 0., [], []
        for batch in train_loader:
            optimizer.zero_grad()
            ids = batch['input_ids'].to(device)
            att = batch['attention_mask'].to(device)
            lbl = batch['label'].to(device)
            logits = model(ids, att)
            loss = criterion(logits, lbl)
            loss.backward()
            optimizer.step()
            scheduler.step()

            running_loss += loss.item()
            preds.extend(logits.argmax(dim=1).cpu().tolist())
            labs.extend(lbl.cpu().tolist())

        train_loss = running_loss / len(train_loader)
        train_acc  = accuracy_score(labs, preds)

        model.eval()
        val_loss, v_preds, v_labs, v_probs = 0., [], [], []
        with torch.no_grad():
            for batch in val_loader:
                ids = batch['input_ids'].to(device)
                att = batch['attention_mask'].to(device)
                lbl = batch['label'].to(device)
                logits = model(ids, att)
                val_loss += criterion(logits, lbl).item()
                probs = torch.softmax(logits, dim=1).cpu().numpy()
                v_probs.extend(probs)
                v_preds.extend(logits.argmax(dim=1).cpu().tolist())
                v_labs.extend(lbl.cpu().tolist())

        val_loss = val_loss / len(val_loader)
        val_acc  = accuracy_score(v_labs, v_preds)

        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['train_acc'].append(train_acc)
        history['val_acc'].append(val_acc)

        print(f"Epoch {epoch}/{epochs} — Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f} ")

        if epoch == epochs:
            plot_metrics(history)
            plot_confusion(v_labs, v_preds, le.classes_)
            plot_roc_pr(np.array(v_labs), np.array(v_probs), n_classes)
            print("Classification Report:\n", classification_report(v_labs, v_preds, target_names=le.classes_))

    torch.save(model.state_dict(), "baseline_bert_model.pth")
    return model, tokenizer, le, device


if __name__ == "__main__":
    df = pd.read_csv(r"C:\Users\Lu\Desktop\hateXplain.csv")
    train_and_eval(df, bert_path=r"F:\koutu\nlp\bert-base-uncased")


### 7.3 Auxiliary Scripts


In [None]:

import pandas as pd

df = pd.read_csv(r"C:\Users\Lu\Desktop\hateXplain.csv")
print("First 5 rows of the dataset:")
print(df.head())
print("\nDataset information:")
print(df.info())
import nltk
nltk.download('stopwords')
import re
from nltk.corpus import stopwords
from collections import Counter

def majority_vote(labels):
    return Counter(labels).most_common(1)[0][0]

# Group by post_id and merge duplicate data
df_grouped = df.groupby('post_id').agg({
    'post_tokens': 'first',
    'label': majority_vote,
    'target': lambda x: ','.join(set(x.dropna()))
}).reset_index()

# Define the text cleaning function
def clean_text(text):
    # Remove URLs
    text = re.sub(r'http\S+', '', text)
    # Remove special characters and punctuation, keeping only letters, numbers, and spaces
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    # Convert to lowercase and remove stopwords
    stop_words = set(stopwords.words('english'))
    text = ' '.join([word for word in text.split() if word.lower() not in stop_words])
    return text

df_grouped['cleaned_text'] = df_grouped['post_tokens'].apply(clean_text)

# Display the cleaned text (first 5 rows)
print("\nCleaned text (first 5 rows):")
print(df_grouped['cleaned_text'].head())
from transformers import BertTokenizer

# Load pretrained BERT tokenizer
tokenizer = BertTokenizer.from_pretrained(
    r"F:\koutu\nlp\bert-base-uncased",
    local_files_only=True
)
# Define BERT tokenization function
def bert_tokenize(text):
    return tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=128,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt'
    )

# Apply BERT tokenization to cleaned text
df_grouped['bert_input_ids'] = df_grouped['cleaned_text'].apply(lambda x: bert_tokenize(x)['input_ids'])
df_grouped['bert_attention_mask'] = df_grouped['cleaned_text'].apply(lambda x: bert_tokenize(x)['attention_mask'])

# Display BERT tokenization results
print("\nBERT Tokenization input_ids (first 5 rows):")
print(df_grouped['bert_input_ids'].head())
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Initialize the CNN tokenizer
tokenizer_cnn = Tokenizer(num_words=10000, oov_token='<OOV>')
tokenizer_cnn.fit_on_texts(df_grouped['cleaned_text'])

# Convert text to sequences
sequences = tokenizer_cnn.texts_to_sequences(df_grouped['cleaned_text'])

# Pad sequences to a fixed length
padded_sequences = pad_sequences(sequences, maxlen=128, padding='post', truncating='post')

# Add padded sequences to the DataFrame
df_grouped['cnn_sequences'] = list(padded_sequences)

# Display CNN tokenization results
print("\nCNN Tokenization sequences (first 5 rows):")
print(df_grouped['cnn_sequences'].head())
from sklearn.feature_extraction.text import TfidfVectorizer

tfidf_vectorizer = TfidfVectorizer(max_features=5000)

# Convert cleaned text to a TF-IDF matrix
X_tfidf = tfidf_vectorizer.fit_transform(df_grouped['cleaned_text'])

# Extract labels
y = df_grouped['label']
print("TF-IDF matrix shape:", X_tfidf.shape)
import numpy as np

glove_path = r"C:\Users\Lu\Desktop\glove.twitter.27B.100d.txt"
embeddings_index = {}
with open(glove_path, encoding='utf-8') as f:
    for line in f:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype='float32')
        embeddings_index[word] = coefs

word_index = tokenizer_cnn.word_index
embedding_dim = 100
embedding_matrix = np.zeros((len(word_index) + 1, embedding_dim))
for word, i in word_index.items():
    if i < 10000:  # Limit to max_words, consistent with Tokenizer's num_words
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector

# Display embedding matrix shape
print("\nGloVe embedding matrix shape:")
print(embedding_matrix.shape)
import matplotlib.pyplot as plt
import seaborn as sns

sns.set(style='whitegrid')
plt.figure(figsize=(8, 6))
sns.countplot(x='label', data=df)
plt.title('Label Distribution')
plt.xlabel('Label')
plt.ylabel('Count')
plt.show()
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd

vectorizer = CountVectorizer(max_features=1000)
X = vectorizer.fit_transform(df_grouped['cleaned_text'])
# Convert to DataFrame and calculate word frequencies
word_freq = pd.DataFrame(X.toarray(), columns=vectorizer.get_feature_names_out())
word_freq_sum = word_freq.sum().sort_values(ascending=False)

# Top 20 most frequent words
plt.figure(figsize=(12, 6))
word_freq_sum.head(20).plot(kind='bar')
plt.title('Top 20 Most Frequent Words')
plt.xlabel('Word')
plt.ylabel('Frequency')
plt.xticks(rotation=45)
plt.show()
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_tfidf, y, test_size=0.2, random_state=42)

# Train and optimize Logistic Regression
log_reg = LogisticRegression(max_iter=1000)
param_grid_log_reg = {'C': [0.1, 1, 10]}
grid_log_reg = GridSearchCV(log_reg, param_grid_log_reg, cv=5)
grid_log_reg.fit(X_train, y_train)
best_log_reg = grid_log_reg.best_estimator_
y_pred_log_reg = best_log_reg.predict(X_test)

# Train and optimize SVM
svm_model = SVC()
param_grid_svm = {'C': [0.1, 1, 10], 'kernel': ['linear', 'rbf']}
grid_svm = GridSearchCV(svm_model, param_grid_svm, cv=5)
grid_svm.fit(X_train, y_train)
best_svm = grid_svm.best_estimator_
y_pred_svm = best_svm.predict(X_test)
from sklearn.metrics import accuracy_score, classification_report

# Logistic Regression model evaluation
print("Logistic Regression model evaluation:")
print("Best parameters:", grid_log_reg.best_params_)
print("Accuracy:", accuracy_score(y_test, y_pred_log_reg))
print(classification_report(y_test, y_pred_log_reg))

# SVM model evaluation
print("\nSVM model evaluation:")
print("Best parameters:", grid_svm.best_params_)
print("Accuracy:", accuracy_score(y_test, y_pred_svm))
print(classification_report(y_test, y_pred_svm))