<a href="https://colab.research.google.com/github/dankimh/BERT-multilabel-classification/blob/main/multi_label_text_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np

from tqdm.auto import tqdm

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

!pip install transformers

from transformers import BertTokenizerFast as BertTokenizer, BertModel, AdamW, get_linear_schedule_with_warmup
from transformers import AutoTokenizer

!pip install pytorch_lightning
!pip install torchmetrics

import pytorch_lightning as pl
from torchmetrics.functional import accuracy, f1_score, auroc
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, multilabel_confusion_matrix

import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc

import urllib.request

BERT_MODEL='beomi/kcbert-base'

%matplotlib inline
%config InlineBackend.figure_format='retina'

RANDOM_SEED = 42

sns.set(style='whitegrid', palette='muted', font_scale=1.2)
HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]
sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))
rcParams['figure.figsize'] = 12, 8

pl.seed_everything(RANDOM_SEED)

In [None]:
urllib.request.urlretrieve("https://raw.githubusercontent.com/smilegate-ai/korean_unsmile_dataset/main/unsmile_train_v1.0.tsv", filename="unsmile_train_v1.0.tsv")
train_data=pd.read_table('unsmile_train_v1.0.tsv')
#train_data=train_data.rename(columns={'문장': 'sentence', '여성/가족': 'women/family', '남성': 'men', '성소수자': 'lgbtq', '인종/국적': 'nationality', '연령': 'age', '지역': 'region', '종교': 'religion', '기타 혐오': 'hate', '악플/욕설': 'bad'})
print(train_data.head())

urllib.request.urlretrieve("https://raw.githubusercontent.com/smilegate-ai/korean_unsmile_dataset/main/unsmile_valid_v1.0.tsv", filename="unsmile_valid_v1.0.tsv")
valid_data=pd.read_table('unsmile_valid_v1.0.tsv')
#valid_data.head()

In [None]:
LABEL_COLUMNS=train_data.columns.tolist()[2:]

train_doc_len=[len(x) for x in train_data['문장']]
plt.subplots(constrained_layout=True)
plt.subplot(2,1,1)
plt.title('train',fontsize=20)
plt.hist(train_doc_len,bins=30)


plt.show()

In [None]:

tokenizer = AutoTokenizer.from_pretrained(BERT_MODEL)


In [None]:
class BERTDataset(Dataset):

    def __init__(self,data,tokenizer,max_token_len):
        self.tokenizer=tokenizer
        self.data=data
        self.max_token_len=max_token_len

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self,index:int):
        data_row=self.data.iloc[index]

        문장=data_row.문장
        labels=data_row[LABEL_COLUMNS]

        encoding=self.tokenizer.encode_plus(문장,add_special_tokens=True,max_length=self.max_token_len,return_token_type_ids=False,padding="max_length",truncation=True,return_attention_mask=True,return_tensors='pt')

        return dict(문장=문장,input_ids=encoding["input_ids"].flatten(),attention_mask=encoding["attention_mask"].flatten(),labels=torch.FloatTensor(labels))

In [None]:
max_len=200
print(type(train_data))
train_data_sample=BERTDataset(train_data,tokenizer,max_token_len=max_len)

sample=train_data_sample[0]
sample.keys()

In [None]:
sample["input_ids"]

In [None]:
bert_model=BertModel.from_pretrained(BERT_MODEL,return_dict=True)


In [None]:
sample_batch=next(iter(DataLoader(train_data_sample,batch_size=8,num_workers=2)))
sample_batch["input_ids"].shape, sample_batch["attention_mask"].shape

In [None]:
output=bert_model(sample_batch["input_ids"],sample_batch["attention_mask"])
output.last_hidden_state.shape, output.pooler_output.shape

In [None]:
class HateSpeechDataModule(pl.LightningDataModule):

    def __init__(self,train_df,test_df,tokenizer,batch_size=8,max_token_len=128):
        super().__init__()
        self.batch_size=batch_size
        self.train_df=train_df
        self.test_df=test_df
        self.tokenizer=tokenizer
        self.max_token_len=max_token_len

    def setup(self,stage=None):
        self.train_dataset=BERTDataset(self.train_df,self.tokenizer,self.max_token_len)

        self.test_dataset=BERTDataset(self.test_df,self.tokenizer,self.max_token_len)

    def train_dataloader(self):
        return DataLoader(self.test_dataset,batch_size=self.batch_size,num_workers=2)


In [None]:
N_EPOCHS=10
BATCH_SIZE=12

data_module=HateSpeechDataModule(train_data,valid_data,tokenizer,batch_size=BATCH_SIZE,max_token_len=max_len)

In [None]:
class HateSpeechTagger(pl.LightningModule):
    def __init__(self,n_classes:int,n_training_steps=None,n_warmup_steps=None):
        super().__init__()
        self.bert=BertModel.from_pretrained(BERT_MODEL,return_dict=True)
        self.classifier=nn.Linear(self.bert.config.hidden_size,n_classes) #models
        self.n_training_steps=n_training_steps
        self.n_warmup_steps=n_warmup_steps
        self.criterion=nn.BCELoss()

    def forward(self,input_ids,attention_mask,labels=None):
        output=self.bert(input_ids,attention_mask=attention_mask)
        output=self.classifier(output.pooler_output)
        output=torch.sigmoid(output)
        loss=0
        if labels is not None:
            loss=self.criterion(output,labels)
        return loss,output

    def training_step(self,batch,batch_idx):
        input_ids=batch["input_ids"]
        attention_mask=batch["attention_mask"]
        labels=batch["labels"]
        loss,outputs=self(input_ids,attention_mask,labels)
        self.log("train_loss",loss,prog_bar=True,logger=True)
        return {"loss":loss,"predictions":outputs,"labels":labels}
        
    def validation_step(self,batch,batch_idx):
        input_ids=batch["input_ids"]
        attention_mask=batch["attention_mask"]
        labels=batch["labels"]
        loss,outputs=self(input_ids,attention_mask,labels)
        self.log("val_loss",loss,prog_bar=True,logger=True)
        return loss
    
    def test_step(self,batch,batch_idx):
        input_ids=batch["input_ids"]
        attention_mask=batch["attention_mask"]
        labels=batch["labels"]
        loss,outputs=self(input_ids,attention_mask,labels)
        self.log("test_loss",loss,prog_bar=True,logger=True)
        return loss
    
    def training_epoch_end(self,outputs):

        labels=[]
        predictions=[]
        for output in outputs:
            for out_labels in output["labels"].detach().cpu():
                labels.append(out_labels)
            for out_predictions in output["predictions"].detach().cpu():
                predictions.append(out_predictions)

        labels=torch.stack(labels).int()
        predictions=torch.stack(predictions)

        for i, name in enumerate(LABEL_COLUMNS):
            class_roc_auc = auroc(predictions[:, i], labels[:, i])
            self.logger.experiment.add_scalar(f"{name}_roc_auc/Train", class_roc_auc, self.current_epoch)

        
    def configure_optimizers(self):

        optimizer=AdamW(self.parameters(),lr=2e-5)

        scheduler=get_linear_schedule_with_warmup(optimizer,num_warmup_steps=self.n_warmup_steps,num_training_steps=self.n_training_steps)

        return dict(optimizer=optimizer,lr_scheduler=dict(scheduler=scheduler,interval='step'))


In [None]:
steps_per_epoch=len(train_data)
total_training_steps=steps_per_epoch*N_EPOCHS

In [None]:
warmup_steps=total_training_steps // 5
warmup_steps,total_training_steps

In [None]:
model=HateSpeechTagger(n_classes=len(LABEL_COLUMNS),n_warmup_steps=warmup_steps,n_training_steps=total_training_steps)


In [None]:
checkpoint_callback=ModelCheckpoint(dirpath="checkpoints",filename="best-checkpoint",save_top_k=1,verbose=True,monitor="val_loss",mode="min")


In [None]:
logger=TensorBoardLogger("lightning_logs",name="hate-speechs")

In [None]:
early_stopping_callback=EarlyStopping(monitor="train_loss",patience=2)

In [None]:
trainer=pl.Trainer(logger=logger,checkpoint_callback=checkpoint_callback,callbacks=[early_stopping_callback],max_epochs=N_EPOCHS,gpus=1,progress_bar_refresh_rate=30)

In [None]:
trainer.fit(model,data_module)

In [None]:
#trainer.test()

In [None]:
trained_model = HateSpeechTagger.load_from_checkpoint(
  trainer.checkpoint_callback.best_model_path,
  n_classes=len(LABEL_COLUMNS)
)
trained_model.eval()
trained_model.freeze()

In [None]:
test_comment = ""
encoding = tokenizer.encode_plus(
  test_comment,
  add_special_tokens=True,
  max_length=max_len,
  return_token_type_ids=False,
  padding="max_length",
  return_attention_mask=True,
  return_tensors='pt',
)
_, test_prediction = trained_model(encoding["input_ids"], encoding["attention_mask"])
test_prediction = test_prediction.flatten().numpy()
for label, prediction in zip(LABEL_COLUMNS, test_prediction):
  print(f"{label}: {prediction}")

In [None]:
THRESHOLD = 0.5
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
trained_model = trained_model.to(device)
val_dataset = BERTDataset(
  valid_data,
  tokenizer,
  max_token_len=max_len
)
predictions = []
labels = []
for item in tqdm(val_dataset):
  _, prediction = trained_model(
    item["input_ids"].unsqueeze(dim=0).to(device),
    item["attention_mask"].unsqueeze(dim=0).to(device)
  )
  predictions.append(prediction.flatten())
  labels.append(item["labels"].int())
predictions = torch.stack(predictions).detach().cpu()
labels = torch.stack(labels).detach().cpu()

In [None]:
accuracy(predictions, labels, threshold=THRESHOLD)

In [None]:
print("AUROC per tag")
for i, name in enumerate(LABEL_COLUMNS):
  tag_auroc = auroc(predictions[:, i], labels[:, i], pos_label=1)
  print(f"{name}: {tag_auroc}")

In [None]:
y_pred = predictions.numpy()
y_true = labels.numpy()
upper, lower = 1, 0
y_pred = np.where(y_pred > THRESHOLD, upper, lower)
print(classification_report(
  y_true,
  y_pred,
  target_names=LABEL_COLUMNS,
  zero_division=0
))