# Language Model Example
## 혐오표현 분류

### 사용 방법
1. 데이터 다운로드: https://www.kaggle.com/competitions/kdtai-2/data 에서 train.csv 와 test.csv 다운로드
2. 데이터 폴더: root/data/Discrimination 폴더에 다운받은 두 csv 파일 위치
3. Mecab 이 설치되어 있다면 그냥 진행. 설치되어 있지 않다면 Mecab 설치 항목 진행

### Mecab 설치 방법
1. 터미널에서 root/settings 폴더로 이동
2. (중요!) 가상환경 진입
3. bash install_mecab.sh 실행


In [None]:
import re
from konlpy.tag import Mecab

def preprocess_korean_text(self, text):
    # Remove URLs and mentions
    text = re.sub(r"(http|https)?:\/\/\S+\b|www\.(\w+\.)+\S*", "", text)
    text = re.sub(r"@(\w+)", "", text)

    # Tokenize text using Mecab
    mecab = Mecab()
    tokens = mecab.morphs(text)

    # Remove stop words (optional)
    stop_words = ["은", "는", "이", "가", "을", "를", "에", "의", "로", "으로", "에서"]
    tokens = [t for t in tokens if t not in stop_words]

    # Remove punctuation and non-Korean characters
    tokens = [re.sub(r"[^\u3131-\u3163\uac00-\ud7a3]+", "", t) for t in tokens]
    tokens = [t for t in tokens if t]

    return tokens

preprocess_korean_text('안녕 이건 테스트야')

In [2]:
# Modules About Hydra
# from tqdm.notebook import tqdm
# from PIL import Image
from typing import List, Any
# from hydra import initialize, initialize_config_module, initialize_config_dir, compose
# from omegaconf import DictConfig

# Modules About Torch, Numpy
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import Dataset, DataLoader, random_split
# from torchvision import datasets, transforms

# Modules About Pytorch Lightning
import lightning.pytorch as pl
from lightning.pytorch.callbacks import ModelCheckpoint, EarlyStopping
# from lightning.pytorch.loggers import WandbLogger, TensorBoardLogger
from lightning.pytorch.utilities.types import EVAL_DATALOADERS, TRAIN_DATALOADERS, STEP_OUTPUT

# Modules About Pandas, Matplotlib, Numpy
import pandas as pd
# import matplotlib.pyplot as plt
# import seaborn as sns

# Modules About Language Pre-processing
import re
from konlpy.tag import Mecab

# Others
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# custom dataset class for korean text
class KoreanTextDataset(Dataset):
    def __init__(self, data, word_key_to_index, preprocess_korean_text, max_length=100):
        self.data = data
        self.word_key_to_index = word_key_to_index
        self.max_length = max_length
        self.preprocess_korean_text = preprocess_korean_text
        self.idx_to_class = sorted(data['label'].unique())
        self.class_to_idx = {}
        for i in range(len(self.idx_to_class)):
            self.class_to_idx[self.idx_to_class[i]] = i
        self.class_names = self.idx_to_class

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        text = self.data.loc[index, "text"]
        label = self.data.loc[index, "label"]

        # Preprocess text using the preprocess_korean_text() function
        tokens = self.preprocess_korean_text(text)
        # Truncate or pad tokens to a fixed length
        if len(tokens) > self.max_length:
            tokens = tokens[:self.max_length]
        else:
            tokens += [""] * (self.max_length - len(tokens))

        # Convert tokens to indices using the pre-trained GloVe or Word2Vec embeddings
        indices = []
        for token in tokens:
            if token in self.word_key_to_index:
                indices.append(self.word_key_to_index[token])
            else:
                indices.append(self.word_key_to_index['<unk>'])  # use the index of the <unk> token for out-of-vocabulary words

        if not np.isnan(label):
            return torch.tensor(indices), torch.tensor(label)

        return torch.tensor(indices)

In [4]:
from lightning.pytorch.utilities.types import EVAL_DATALOADERS


class SequentialDataModule(pl.LightningDataModule):
    def __init__(self, batch_size: int = 64, data_dir: str = '../data/') -> None:
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size

        self.transform = None

        self.max_length = 200

        # load train data
        self.train_data = pd.read_csv(self.data_dir + 'Discrimination/train.csv')

        # load predict data
        self.predict_data = pd.read_csv(self.data_dir + 'Discrimination/test.csv')

        # make word map from train data
        print('Making word map')
        self._make_word_map(self.train_data)

    # custom function for preprocess korean text
    def _preprocess_korean_text(self, text):
        # Remove URLs and mentions
        text = re.sub(r"(http|https)?:\/\/\S+\b|www\.(\w+\.)+\S*", "", text)
        text = re.sub(r"@(\w+)", "", text)

        # Tokenize text using Mecab
        mecab = Mecab()
        tokens = mecab.morphs(text)

        # Remove stop words (optional)
        stop_words = ["은", "는", "이", "가", "을", "를", "에", "의", "로", "으로", "에서"]
        tokens = [t for t in tokens if t not in stop_words]

        # Remove punctuation and non-Korean characters
        tokens = [re.sub(r"[^\u3131-\u3163\uac00-\ud7a3]+", "", t) for t in tokens]
        tokens = [t for t in tokens if t]

        return tokens

    # custom function for making word map
    def _make_word_map(self, train_data):
        self.word_index_to_key = []
        self.word_key_to_index = {}

        for i in range(len(train_data)):
            text = train_data.iloc[i]['text']
            tokens = self._preprocess_korean_text(text)

            for token in tokens:
                if token not in self.word_key_to_index:
                    self.word_key_to_index[token] = len(self.word_index_to_key)
                    self.word_index_to_key.append(token)

        self.word_key_to_index['<unk>'] = len(self.word_index_to_key)
        self.word_index_to_key.append('<unk>')

    # just write for downloading actions
    def prepare_data(self) -> None:
        # make predict dataset
        self.predict_dataset = KoreanTextDataset(
            data=self.predict_data,
            word_key_to_index=self.word_key_to_index,
            preprocess_korean_text=self._preprocess_korean_text,
            max_length=self.max_length
        )

    def setup(self, stage: str) -> None:
        if stage == 'fit':
            train_test_ratio = 0.9
            train_val_ratio = 0.8

            # make train dataset
            train_dataset = KoreanTextDataset(
                data=self.train_data,
                word_key_to_index=self.word_key_to_index,
                preprocess_korean_text=self._preprocess_korean_text,
                max_length=self.max_length
            )

            # split train val test dataset
            self.train_dataset, self.val_dataset, self.test_dataset = random_split(
                train_dataset, [train_test_ratio * train_val_ratio, train_test_ratio * (1 - train_val_ratio), 1 - train_test_ratio]
            )

    def train_dataloader(self) -> TRAIN_DATALOADERS:
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self) -> EVAL_DATALOADERS:
        return DataLoader(self.val_dataset, batch_size=self.batch_size)

    def test_dataloader(self) -> EVAL_DATALOADERS:
        return DataLoader(self.test_dataset, batch_size=self.batch_size)

    def predict_dataloader(self) -> EVAL_DATALOADERS:
        return DataLoader(self.predict_dataset, batch_size=self.batch_size)


In [5]:
# custom attention class
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.hidden_size = hidden_size
        self.attn = nn.Linear(self.hidden_size * 2, hidden_size)
        self.v = nn.Linear(hidden_size, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        max_len = encoder_outputs.size(0)

        repeated_hidden = hidden.unsqueeze(0).repeat(max_len, 1, 1)

        energy = torch.tanh(self.attn(torch.cat((repeated_hidden, encoder_outputs), dim=2)))
        attention = self.v(energy).squeeze(2)
        return F.softmax(attention, dim=0).unsqueeze(2)

In [6]:
# custom rnn_lstm_attention class
class RNN_LSTM_attention(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, LSTM_layers, dropout):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        for param in self.embedding.parameters():
            param.requires_grad = False

        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=LSTM_layers, bidirectional=False, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
        self.attention = Attention(hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text):
        # text = [batch size, seq len]

        embedded = self.embedding(text)
        embedded = self.dropout(embedded)
        # embedded = [batch size, seq len, emb dim]: [64, 200, 100]
        # print('embedded: ', embedded.shape)

        lstm_outputs, (hidden, _) = self.lstm(embedded.permute(1, 0, 2))
        # output = [batch size, seq len, hid dim * num directions]: [200, 64, 1024]
        # hidden/cell = [num layers * num directions, batch size, hid dim]: [6, 64, 512]
        # print('outputs, hidden: ', pre_lstm_outputs.shape, hidden.shape)

        # h = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)
        h = hidden[-1,:,:]
        # [64, 1024]
        # print('h: ', h.shape)

        attention_weights = self.attention(h, lstm_outputs)
        # # attention_weights = [batch size, seq len, 1]: [200, 64, 1]
        # print('attention_weights: ', attention_weights.shape)

        context_vector = torch.bmm(lstm_outputs.permute(1, 2, 0), attention_weights.permute(1, 0, 2)).squeeze(2)
        # # context_vector = [batch size, hid dim * num directions]: [64, 1024]
        # print('context_vector: ', context_vector.shape)

        out = self.fc(self.dropout(context_vector.squeeze(0)))
        # out = [batch size, output dim]: [64, 7]
        # print('out: ', out.shape)

        return out


In [7]:
class SequentialModel(pl.LightningModule):
    def __init__(self, word_index_to_key, config=None) -> None:
        super().__init__()

        self.save_hyperparameters()

        self.config = config

        # define loss function
        self.loss_func = nn.CrossEntropyLoss()

        # define model
        self.model = RNN_LSTM_attention(
            vocab_size=len(word_index_to_key),
            embedding_dim=100,
            hidden_dim=512,
            output_dim=7,
            LSTM_layers=2,
            dropout=0.2
        )

        # etc custom attributes
        self.idx_to_class = {
            0: 'Origin(출신차별)',
            1: 'Physical(외모차별) 외모(신체, 얼굴) 및 장애인 차별 발언을 포함합니다.',
            2: 'Politics(정치성향차별)',
            3: 'Profanity(혐오욕설) 욕설,저주,혐오 단어, 비속어 및 기타 혐오 발언을 포함합니다.',
            4: 'Age(연령차별)',
            5: 'Gender(성차별) 성별 또는 성적 취향에 대한 차별 발언을 포함합니다.',
            6: 'Not Hate Speech(해당사항없음)',
        }

    def forward(self, x, y=None) -> Any:
        output = self.model(x)
        if y:
            loss = self.loss_func(output, y)
            return loss, output
        return output


In [10]:
from typing import Any
from torchmetrics import Accuracy

class SequentialTask(pl.LightningModule):
    def __init__(self, model) -> None:
        super().__init__()

        self.model = model
        self.training_step_outputs = []
        self.validation_step_outputs = []

        # define accuracy function
        self.acc_func = Accuracy(
            task='multiclass',
            num_classes=7
        )

    def training_step(self, batch, batch_idx) -> STEP_OUTPUT:
        x, y = batch

        loss, output = self.model(x, y)
        acc = self.acc_func(output, y)

        metrics = {
            'train_acc': acc,
            'train_loss': loss,
        }
        self.training_step_outputs.append(metrics)
        self.log_dict(metrics, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx) -> STEP_OUTPUT:
        x, y = batch

        loss, output = self.model(x, y)
        acc = self.acc_func(output, y)

        metrics = {
            'val_acc': acc,
            'val_loss': loss,
        }
        self.validation_step_outputs.append(metrics)
        self.log_dict(metrics, prog_bar=True)

        return loss

    def on_validation_epoch_end(self):
        if not (self.training_step_outputs and self.validation_step_outputs):
            return

        train_avg_loss = torch.stack([x["train_loss"]
            for x in self.training_step_outputs]).mean()
        train_avg_acc = torch.stack([x["train_acc"]
            for x in self.training_step_outputs]).mean()
        metrics = {
            "train_avg_acc": train_avg_acc,
            "train_avg_loss": train_avg_loss
        }
        self.log_dict(metrics)

        val_avg_loss = torch.stack([x["val_loss"]
            for x in self.validation_step_outputs]).mean()
        val_avg_acc = torch.stack([x["val_acc"]
            for x in self.validation_step_outputs]).mean()
        metrics = {
            "val_avg_acc": val_avg_acc,
            "val_avg_loss": val_avg_loss
        }
        self.log_dict(metrics)

        print("\n" +
              (f'Epoch {self.current_epoch}, Avg. Training Loss: {train_avg_loss:.3f}, Avg. Training Accuracy: {train_avg_acc:.3f} ' +
               f'Avg. Validation Loss: {val_avg_loss:.3f}, Avg. Validation Accuracy: {val_avg_acc:.3f}'), flush=True)

        self.training_step_outputs.clear()
        self.validation_step_outputs.clear()

    def test_step(self, batch, batch_idx) -> None:
        x, y = batch

        loss, output = self.model(x, y)
        acc = self.acc_func(output, y)

        metrics = {
            'test_acc': acc,
            'test_loss': loss,
        }
        self.log_dict(metrics, prog_bar=True)

    def predict_step(self, batch: Any, batch_idx: int, dataloader_idx: int = 0) -> torch.Tensor:
        return torch.argmax(self.model(batch), dim=-1)

    def configure_optimizers(self) -> Any:
        return torch.optim.Adam(self.parameters(), lr=1e-3)

In [9]:
data_module = SequentialDataModule(batch_size=32)

model = SequentialModel(data_module.word_index_to_key)
task = SequentialTask(model)

callbacks = []

callbacks.append(ModelCheckpoint(
    monitor='val_avg_acc',
    save_top_k=3,
    mode='max'
))

callbacks.append(EarlyStopping(
    monitor='val_avg_acc',
    min_delta=0.01,
    patience=3,
    verbose=False,
    mode='max',
    # stopping_threshold=,
    # divergence_threshold=,
    # check_finite=,
    # check_on_train_epoch_end=,
))

trainer = pl.Trainer(
    max_epochs=10,
    callbacks=callbacks,
)

trainer.fit(model=task, datamodule=data_module)
trainer.test(model=task, datamodule=data_module)

Making word map


GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name     | Type               | Params
------------------------------------------------
0 | model    | SequentialModel    | 7.6 M 
1 | acc_func | MulticlassAccuracy | 0     
------------------------------------------------
3.9 M     Trainable params
3.7 M     Non-trainable params
7.6 M     Total params
30.249    Total estimated model params size (MB)


Epoch 0: 100%|██████████| 1482/1482 [02:51<00:00,  8.65it/s, v_num=12, train_acc=0.633, train_loss=1.020]
Epoch 0, Avg. Training Loss: 1.170, Avg. Training Accuracy: 0.605 Avg. Validation Loss: 0.907, Avg. Validation Accuracy: 0.697
Epoch 1: 100%|██████████| 1482/1482 [02:50<00:00,  8.69it/s, v_num=12, train_acc=0.733, train_loss=0.711, val_acc=0.701, val_loss=0.901]
Epoch 1, Avg. Training Loss: 0.827, Avg. Training Accuracy: 0.724 Avg. Validation Loss: 0.790, Avg. Validation Accuracy: 0.732
Epoch 2: 100%|██████████| 1482/1482 [02:48<00:00,  8.77it/s, v_num=12, train_acc=0.667, train_loss=0.653, val_acc=0.732, val_loss=0.789]
Epoch 2, Avg. Training Loss: 0.697, Avg. Training Accuracy: 0.763 Avg. Validation Loss: 0.731, Avg. Validation Accuracy: 0.752
Epoch 3: 100%|██████████| 1482/1482 [02:48<00:00,  8.80it/s, v_num=12, train_acc=0.733, train_loss=0.485, val_acc=0.752, val_loss=0.731]
Epoch 3, Avg. Training Loss: 0.600, Avg. Training Accuracy: 0.794 Avg. Validation Loss: 0.732, Avg. Va

[{'test_acc': 0.7610082030296326, 'test_loss': 0.785693883895874}]

In [11]:
data_module = SequentialDataModule(batch_size=32)

model = SequentialModel(data_module.word_index_to_key)
task = SequentialTask.load_from_checkpoint('./lightning_logs/version_12/checkpoints/epoch=5-step=8892.ckpt', model=model)

trainer = pl.Trainer()

predicted_classes = trainer.predict(task, datamodule=data_module)
concated_predicted_classes = torch.concat(predicted_classes, dim=-1)

Making word map


GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


Predicting DataLoader 0: 100%|██████████| 422/422 [00:23<00:00, 18.08it/s]


In [33]:
rand_idx = np.random.randint(low=0, high=len(concated_predicted_classes) - 1, size=20)

sample_text = []
predicted_class = []

for i in rand_idx:
    sample_text.append(data_module.predict_data.loc[i, 'text'])
    predicted_class.append(model.idx_to_class[concated_predicted_classes[i].item()])

pd.DataFrame({
    'sample_text': sample_text,
    'predicted_class': predicted_class
})

Unnamed: 0,sample_text,predicted_class
0,Excelsior!,Not Hate Speech(해당사항없음)
1,"""보수좀비들임. 아줌마, 틀딱노땅, 일베남자들이 많음""",Age(연령차별)
2,"""아직도 5%야? 어떤 친일파놈들이 쪽바리 제품에 ~~~""",Origin(출신차별)
3,"""좋아 죽을때는 섹스파트너, 헤어지면 성노리개, 이게 미투냐?""",Gender(성차별) 성별 또는 성적 취향에 대한 차별 발언을 포함합니다.
4,"""쪽빠리당 현정권 아베세끼와 연합하여 파괴시도중""",Origin(출신차별)
5,박근혜 아니겠 쥐,Not Hate Speech(해당사항없음)
6,"""짱깨 조선족 종북주사파들 공통점 - 본인들을 반대하면 기승전 토왜몰이""",Origin(출신차별)
7,"""이쯤되면 막 가자는거지요? 뻘게이 새끼들 존나 쳐 맞아야 되겠네~""","Profanity(혐오욕설) 욕설,저주,혐오 단어, 비속어 및 기타 혐오 발언을 포..."
8,와~ 적당히들 좀 해라 10년전 일에 지금와서 티비에 좀 자주보인다고 미투??ㅎㅎㅎ...,Not Hate Speech(해당사항없음)
9,"""문재인 노무현 두분다 인복이 없다..저런 쓰래기들을 달고다니니...""",Politics(정치성향차별)
