<a href="https://colab.research.google.com/github/HyosunRyu/KPHC/blob/main/KPHC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Korean Political Hate Speech Classifier

# Setting


### (1) Install Package

In [None]:
!pip install beautifulsoup4==4.12.2
!pip install bs4==0.0.1
!pip install numpy==1.25.1
!pip install pandas==2.0.3
!pip install pip==23.1.2
!pip install python-dateutil==2.8.2
!pip install pytz==2023.3
!pip install tqdm==4.65.0
!pip install tzdata==2023.3
!pip install urllib3==2.0.3
!pip install wheel==0.38.4
!pip install pytorch_lightning==2.3.0
!pip install matplotlib==3.9.0
!pip install tensorboard==2.17.0
!pip install imblearn==0.0
!pip install emoji==1.6.3
!pip install soynlp
!pip install transformers==3.0.0

In [None]:
%env CUBLAS_WORKSPACE_CONFIG=:4096:8

### (2) Import Package

In [None]:
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
import datetime

from pprint import pprint
import matplotlib.pyplot as plt

import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.optim.lr_scheduler import ExponentialLR
from torch.utils.tensorboard import SummaryWriter
from pytorch_lightning import LightningModule, Trainer, seed_everything, loggers
from pytorch_lightning.callbacks import Callback

from transformers import AutoModelForSequenceClassification, AutoTokenizer, AdamW

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from imblearn.over_sampling import SMOTE

import re
import emoji
from soynlp.normalizer import repeat_normalize

### (3) Arguments

In [None]:
args = {
    'random_seed': 42, # Random Seed
    'pretrained_model': 'beomi/KcELECTRA-base-v2022',  # Transformers PLM name
    'pretrained_tokenizer': 'beomi/KcELECTRA-base-v2022',  # Optional, Transformers Tokenizer Name. Overrides `pretrained_model`
    'batch_size': 32,
    'lr': 5e-6,  # Starting Learning Rate
    'epochs': 50,  # Max Epochs
    'max_length': 150,  # Max Length input size
    'train_data_path': "train_hate.txt",  # Train Dataset file
    'val_data_path': "val_hate.txt",  # Validation Dataset file
    'test_mode': False,  # Test Mode enables `fast_dev_run`
    'optimizer': 'AdamW',  # AdamW vs AdamP
    'lr_scheduler': 'exp',  # ExponentialLR vs CosineAnnealingWarmRestarts
    'fp16': True,  # Enable train on FP16(if GPU)
    'tpu_cores': 0,  # Enable TPU with 1 core or 8 cores
    'cpu_workers': os.cpu_count(),
}
args

# Setting the Callback

### (1) Checkpoint

In [None]:
from pytorch_lightning.callbacks import ModelCheckpoint

checkpoint_callback = ModelCheckpoint(
    filename='epoch{epoch}-val_recall{val_recall:.4f}_'+ datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    monitor='val_recall',
    save_top_k=3,
    mode='max',
    auto_insert_metric_name=False,
)

### (2) EarlyStopping

In [None]:
# Early stopping based on Recall
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
early_stop_callback = EarlyStopping('val_recall', patience=3, mode='max')

# Modeling

### (1) Model class

In [None]:
class Model(LightningModule):
    def __init__(self, **kwargs):
        super().__init__()
        self.save_hyperparameters()
        self.validation_step_outputs = []

        self.clsfier = AutoModelForSequenceClassification.from_pretrained(self.hparams.pretrained_model)
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.hparams.pretrained_tokenizer
            if self.hparams.pretrained_tokenizer
            else self.hparams.pretrained_model
        )

    def forward(self, **kwargs):
        return self.clsfier(**kwargs)

    def step(self, batch, batch_idx):
        data, labels = batch
        output = self(input_ids=data, labels=labels)

        # Transformers 4.0.0+
        loss = output.loss
        logits = output.logits

        preds = logits.argmax(dim=-1)

        y_true = list(labels.cpu().numpy())
        y_pred = list(preds.cpu().numpy())

        return {
            'loss': loss,
            'y_true': y_true,
            'y_pred': y_pred,
        }

    def training_step(self, batch, batch_idx):
        return self.step(batch, batch_idx)

    def validation_step(self, batch, batch_idx):
        output = self.step(batch, batch_idx)
        self.validation_step_outputs.append(output)
        return output

    def epoch_end(self, outputs, state='train'):
        loss = torch.tensor(0, dtype=torch.float)
        for i in outputs:
            loss += i['loss'].cpu().detach()
        loss = loss / len(outputs)

        y_true = []
        y_pred = []
        for i in outputs:
            y_true += i['y_true']
            y_pred += i['y_pred']

        acc = accuracy_score(y_true, y_pred)
        prec = precision_score(y_true, y_pred)
        rec = recall_score(y_true, y_pred)
        f1 = f1_score(y_true, y_pred)

        self.log(state+'_loss', float(loss), on_epoch=True, prog_bar=True)
        self.log(state+'_acc', acc, on_epoch=True, prog_bar=True)
        self.log(state+'_precision', prec, on_epoch=True, prog_bar=True)
        self.log(state+'_recall', rec, on_epoch=True, prog_bar=True)
        self.log(state+'_f1', f1, on_epoch=True, prog_bar=True)
        print(f'[Epoch {self.trainer.current_epoch} {state.upper()}] Loss: {loss}, Acc: {acc}, Prec: {prec}, Rec: {rec}, F1: {f1}')
        return {'loss': loss}

    def on_training_epoch_end(self, outputs):
        self.epoch_end(outputs, state='train')

    def on_validation_epoch_end(self):
        self.epoch_end(self.validation_step_outputs, state='val')
        self.validation_step_outputs.clear()

    def configure_optimizers(self):
        if self.hparams.optimizer == 'AdamW':
            optimizer = torch.optim.AdamW(self.parameters(), lr=self.hparams.lr)
        elif self.hparams.optimizer == 'AdamP':
            from adamp import AdamP
            optimizer = AdamP(self.parameters(), lr=self.hparams.lr)
        else:
            raise NotImplementedError('Only AdamW and AdamP are Supported!')
        if self.hparams.lr_scheduler == 'cos':
            scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=1, T_mult=2)
        elif self.hparams.lr_scheduler == 'exp':
            scheduler = ExponentialLR(optimizer, gamma=0.5)
        else:
            raise NotImplementedError('Only cos and exp lr scheduler is Supported!')
        return {
            'optimizer': optimizer,
            'scheduler': scheduler,
        }

    def read_data(self, path):
        if path.endswith('xlsx'):
            return pd.read_excel(path)
        elif path.endswith('csv'):
            return pd.read_csv(path)
        elif path.endswith('tsv') or path.endswith('txt'):
            return pd.read_csv(path, sep='\t')
        else:
            raise NotImplementedError('Only Excel(xlsx)/Csv/Tsv(txt) are Supported')

    def clean(self, x):
        emojis = ''.join(emoji.UNICODE_EMOJI.keys())
        pattern = re.compile(f'[^ .,?!/@$%~％·∼()\x00-\x7Fㄱ-힣{emojis}]+')
        url_pattern = re.compile(
            r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)')
        x = pattern.sub(' ', x)
        x = url_pattern.sub('', x)
        x = x.strip()
        x = repeat_normalize(x, num_repeats=2)
        return x

    def encode(self, x, **kwargs):
        return self.tokenizer.encode(
            self.clean(str(x)),
            padding='max_length',
            max_length=self.hparams.max_length,
            truncation=True,
            **kwargs,
        )

    def preprocess_dataframe(self, df):
        df['document'] = df['document'].map(self.encode)
        return df

    def dataloader(self, path, shuffle=False):
        df = self.read_data(path)
        df = self.preprocess_dataframe(df)

        dataset = TensorDataset(
            torch.tensor(df['document'].to_list(), dtype=torch.long),
            torch.tensor(df['label'].to_list(), dtype=torch.long),
        )
        return DataLoader(
            dataset,
            batch_size=self.hparams.batch_size * 1 if not self.hparams.tpu_cores else self.hparams.tpu_cores,
            shuffle=shuffle,
            num_workers=self.hparams.cpu_workers,
        )

    def train_dataloader(self):
        return self.dataloader(self.hparams.train_data_path, shuffle=True)

    def val_dataloader(self):
        return self.dataloader(self.hparams.val_data_path, shuffle=False)


### (2) Fine-tuning

In [None]:
print("Using PyTorch Ver", torch.__version__)
print("Fix Seed:", args['random_seed'])
seed_everything(args['random_seed'])
model = Model(**args)

print(":: Start Training ::")

trainer = Trainer(
    callbacks=[checkpoint_callback,
               early_stop_callback],
    max_epochs=args['epochs'],
    fast_dev_run=args['test_mode'],
    num_sanity_val_steps=None if args['test_mode'] else 0,
    # For GPU Setup
    deterministic=torch.cuda.is_available(),
    accelerator='gpu', devices=[0] if torch.cuda.is_available() else None,  # 0번 idx GPU  사용
    precision=16 if args['fp16'] and torch.cuda.is_available() else 32,
    # For TPU Setup
    # tpu_cores=args['tpu_cores'] if args['tpu_cores'] else None,
)
history = trainer.fit(model)

# Model Test

In [None]:
from glob import glob
best_ckpt = sorted(glob(checkpoint_callback.best_model_path))[-1]
best_ckpt

In [None]:
model = Model.load_from_checkpoint(best_ckpt)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
def hate_index(x):
  try:
    # Check the device the model is using
    device = next(model.parameters()).device

    # Tokenize and convert to tensor
    inputs = model.tokenizer(x, return_tensors='pt')

    # Move input data to the same device as the model
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Predict using the model
    with torch.no_grad():
        logits = model(**inputs).logits

    # Calculate the result
    probabilities = torch.softmax(logits, dim=-1)
    hate_prob = probabilities[0][1].item()

    return hate_prob
  except:
        print('Excluded from analysis')
        pass

df = pd.read_csv('test_hate.txt', delimiter='\t')
df['hatescore'] = df['document'].map(lambda x: hate_index(x))

# To binary data
df['hatespeech'] = df['hatescore'].map(lambda x: 1 if (x >= 0.5) else 0)


In [None]:
# Get the prediction results and actual labels
y_true = df['label']  # Actual label
y_pred = df['hatespeech']  # Prediction result

# Calculate accuracy
accuracy = accuracy_score(y_true, y_pred)

# Calculate precision
precision = precision_score(y_true, y_pred)

# Calculate recall
recall = recall_score(y_true, y_pred)

# Calculate F1 score
f1 = f1_score(y_true, y_pred)

# Print results
print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1 Score: {f1}')