<a href="https://colab.research.google.com/github/AyushiM1102/Electra_classification_fake_vs_real_news/blob/main/electra_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Reference tutorial:
# https://velog.io/@na2na8/ELECTRA%EB%A1%9C-Binary-Classification#electra-with-pytorch-lightning

In [13]:
#!pip install emoji --upgrade

Collecting emoji
  Downloading emoji-1.7.0.tar.gz (175 kB)
[?25l[K     |█▉                              | 10 kB 18.6 MB/s eta 0:00:01[K     |███▊                            | 20 kB 10.5 MB/s eta 0:00:01[K     |█████▋                          | 30 kB 8.8 MB/s eta 0:00:01[K     |███████▌                        | 40 kB 7.9 MB/s eta 0:00:01[K     |█████████▍                      | 51 kB 4.2 MB/s eta 0:00:01[K     |███████████▏                    | 61 kB 4.9 MB/s eta 0:00:01[K     |█████████████                   | 71 kB 5.4 MB/s eta 0:00:01[K     |███████████████                 | 81 kB 4.0 MB/s eta 0:00:01[K     |████████████████▉               | 92 kB 4.5 MB/s eta 0:00:01[K     |██████████████████▊             | 102 kB 5.0 MB/s eta 0:00:01[K     |████████████████████▌           | 112 kB 5.0 MB/s eta 0:00:01[K     |██████████████████████▍         | 122 kB 5.0 MB/s eta 0:00:01[K     |████████████████████████▎       | 133 kB 5.0 MB/s eta 0:00:01[K     |█████████

In [1]:
!pip install git+https://github.com/PyTorchLightning/pytorch-lightning --quiet
import pytorch_lightning as pl
print(pl.__version__)

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
1.7.0dev


In [2]:
import os
import re

#import emoji
import numpy as np
import pandas as pd
#from soynlp.normalizer import repeat_normalize

import torch
from torch.utils.data import DataLoader, Dataset

#import pytorch_lightning as pl
from pytorch_lightning import loggers as pl_loggers
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import transformers
from transformers import ElectraForSequenceClassification, ElectraTokenizer, AdamW

In [3]:
class ElectraClassificationDataset(Dataset) :
    def __init__(self, path, sep, doc_col, label_col, max_length, 
                num_workers=1, labels_dict=None) :
        self.tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-small-v3-discriminator")

        self.max_length = max_length
        self.doc_col = doc_col
        self.label_col = label_col

        # labels
        # None : label이 num으로 되어 있음
        # dict : label이 num이 아닌 것으로 되어 있음
        # ex : {True : 1, False : 0}
        self.labels_dict = labels_dict

        # dataset
        df = pd.read_csv(path, sep=sep)
        df = df.dropna(axis=0)
        df.drop_duplicates(subset=[self.doc_col], inplace=True)
        self.dataset = df

    def __len__(self) :
        return len(self.dataset)

    def cleanse(self, text) :
        emojis = ''.join(emoji.UNICODE_EMOJI.keys())  
        pattern = re.compile(f'[^ .,?!/@$%~％·∼()\x00-\x7Fㄱ-힣{emojis}]+')
        url_pattern = re.compile(
            r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)'
        )
        processed = pattern.sub(' ', text)
        processed = url_pattern.sub(' ', processed)
        processed = processed.strip()
        processed = repeat_normalize(processed, num_repeats=2)
      
        return processed

    def __getitem__(self, idx) :
        document = self.cleanse(self.dataset[self.doc_col].iloc[idx])
        inputs = self.tokenizer(
            document,
            return_tensors='pt',
            truncation=True,
            max_length=self.max_length,
            padding='max_length',
            add_special_tokens=True
        )

        if self.labels_dict :
            label = self.labels_dict[self.dataset[self.label_col].iloc[idx]]
        else :
            label = self.dataset[self.label_col].iloc[idx]

        return {
            'input_ids' : inputs['input_ids'][0],
            'attention_mask' : inputs['attention_mask'][0],
            'label' : int(label)
        }

In [None]:
class ElectraClassificationDataModule(pl.LightningDataModule) :
    def __init__(self, train_path, valid_path, max_length, batch_size, sep,
                doc_col, label_col, num_workers=1, labels_dict=None) :
        super().__init__()
        self.batch_size = batch_size
        self.train_path = train_path
        self.valid_path = valid_path
        self.max_length = max_length
        self.doc_col = doc_col
        self.label_col = label_col
        self.sep = sep
        self.num_workers = num_workers
        self.labels_dict = labels_dict

    def setup(self, stage=None) :
        self.set_train = ElectraClassificationDataset(self.train_path, sep=self.sep,
                                            doc_col=self.doc_col, label_col=self.label_col,
                                            max_length = self.max_length, labels_dict=self.labels_dict)
        self.set_valid = ElectraClassificationDataset(self.valid_path, sep=self.sep,
                                            doc_col=self.doc_col, label_col=self.label_col,
                                            max_length = self.max_length, labels_dict=self.labels_dict)

    def train_dataloader(self) :
        train = DataLoader(self.set_train, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=True)
        return train
    
    def val_dataloader(self) :
        val = DataLoader(self.set_valid, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False)
        return val
    
    def test_dataloader(self) :
        test = DataLoader(self.set_valid, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False)
        return test

In [None]:
electra = ElectraForSequenceClassification.from_pretrained("monologg/koelectra-small-v3-discriminator")

dm = ElectraClassificationDataModule(batch_size=8, train_path='./ratings_train_pre.txt', valid_path='./ratings_test_pre.txt',
                                    max_length=256, sep='\t', doc_col='document', label_col='label', num_workers=1)

dm.setup()

t = dm.train_dataloader()

for idx, data in enumerate(t) :
    print(idx, data['input_ids'].shape, data['attention_mask'].shape, data['label'].shape)

v = dm.val_dataloader()
for idx, data in enumerate(v) :
    print(idx, data['input_ids'].shape, data['attention_mask'].shape, data['label'].shape)
idx, data = enumerate(t)

print(data['input_ids'])
print(data['input_ids'].shape)

print(data['attention_mask'])
print(data['attention_mask'].shape)

print(data['label'])
print(data['label'].shape)

output = electra.forward(data['input_ids'], attention_mask=data['attention_mask'], labels=data['label'].view([-1,1]))

print(output.loss)
# print(output.loss.shape)
print(output.logits)
print(output.logits.shape)

softmax = nn.functional.softmax(output.logits, dim=1)
print('softmax', softmax)
pred = softmax.argmax(dim=1)
print('pred', pred)

y_true = data['label'].tolist()
y_pred = pred.tolist()

acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred)
rec = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f'acc : {acc}, prec : {prec}, rec : {rec}, f1 : {f1}')


In [None]:
import os

import torch
import torchmetrics
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

import pytorch_lightning as pl
from pytorch_lightning import loggers as pl_loggers
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import transformers
from transformers import ElectraForSequenceClassification, ElectraTokenizer, AdamW

device = torch.device("cuda")

# https://medium.com/huggingface/multi-label-text-classification-using-bert-the-mighty-transformer-69714fa3fb3d
# https://huggingface.co/docs/transformers/v4.15.0/en/model_doc/electra#transformers.ElectraForSequenceClassification

In [None]:
class ElectraClassification(pl.LightningModule) :
    def __init__(self, learning_rate) :
        super().__init__()
        self.learning_rate = learning_rate
        self.save_hyperparameters()
        
        self.electra = ElectraForSequenceClassification.from_pretrained("monologg/koelectra-small-v3-discriminator")

        self.metric_acc = torchmetrics.Accuracy()
        self.metric_f1 = torchmetrics.F1(num_classes=2)
        self.metric_rec = torchmetrics.Recall(num_classes=2)
        self.metric_pre = torchmetrics.Precision(num_classes=2)

        self.loss_func = nn.CrossEntropyLoss()

    def forward(self, input_ids, attention_mask, labels=None) :
        output = self.electra(input_ids=input_ids, 
                                attention_mask=attention_mask, 
                                labels=labels)
        return output

    def training_step(self, batch, batch_idx) :
        '''
        ##########################################################
        electra forward input shape information
        * input_ids.shape (batch_size, max_length)
        * attention_mask.shape (batch_size, max_length)
        * label.shape (batch_size,)
        ##########################################################
        '''

        # change label shape (list -> torch.Tensor((batch_size, 1)))
        label = batch['label'].view([-1,1])

        output = self(input_ids=batch['input_ids'].to(device),
                        attention_mask=batch['attention_mask'].to(device),
                        labels=label.to(device))
        '''
        ##########################################################
        electra forward output shape information
        * loss.shape (1,)
        * logits.shape (batch_size, config.num_labels=2)
        '''
        logits = output.logits

        loss = output.loss
        # loss = self.loss_func(logits.to(device), batch['label'].to(device))

        softmax = nn.functional.softmax(logits, dim=1)
        preds = softmax.argmax(dim=1)

        self.log("train_loss", loss, prog_bar=True)
        
        return {
            'loss' : loss,
            'pred' : preds,
            'label' : batch['label']
        }

    def training_epoch_end(self, outputs, state='train') :
        y_true = []
        y_pred = []
        for i in outputs :
            y_true += i['label'].tolist()
            y_pred += i['pred'].tolist()

        acc = accuracy_score(y_true, y_pred)
        prec = precision_score(y_true, y_pred)
        rec = recall_score(y_true, y_pred)
        f1 = f1_score(y_true, y_pred)

        # self.log(state+'_acc', acc, on_epoch=True, prog_bar=True)
        # self.log(state+'_precision', prec, on_epoch=True, prog_bar=True)
        # self.log(state+'_recall', rec, on_epoch=True, prog_bar=True)
        # self.log(state+'_f1', f1, on_epoch=True, prog_bar=True)
        print(f'[Epoch {self.trainer.current_epoch} {state.upper()}] Acc: {acc}, Prec: {prec}, Rec: {rec}, F1: {f1}')

    def validation_step(self, batch, batch_idx) :
        '''
        ##########################################################
        electra forward input shape information
        * input_ids.shape (batch_size, max_length)
        * attention_mask.shape (batch_size, max_length)
        ##########################################################
        '''
        output = self(input_ids=batch['input_ids'].to(device),
                        attention_mask=batch['attention_mask'].to(device))
        logits = output.logits
        preds = nn.functional.softmax(logits, dim=1).argmax(dim=1)

        labels = batch['label']
        accuracy = self.metric_acc(preds, labels)
        f1 = self.metric_f1(preds, labels)
        recall = self.metric_rec(preds, labels)
        precision = self.metric_pre(preds, labels)
        self.log('val_accuracy', accuracy, on_epoch=True, prog_bar=True)
        self.log('val_f1', f1, on_epoch=True, prog_bar=True)
        self.log('val_recall', recall, on_epoch=True, prog_bar=True)
        self.log('val_precision', precision, on_epoch=True, prog_bar=True)

        return {
            'accuracy' : accuracy,
            'f1' : f1,
            'recall' : recall,
            'precision' : precision
        }

    def validation_epoch_end(self, outputs) :
        val_acc = torch.stack([i['accuracy'] for i in outputs]).mean()
        val_f1 = torch.stack([i['f1'] for i in outputs]).mean()
        val_rec = torch.stack([i['recall'] for i in outputs]).mean()
        val_pre = torch.stack([i['precision'] for i in outputs]).mean()
        # self.log('val_f1', val_f1, on_epoch=True, prog_bar=True)
        # self.log('val_acc', val_acc, on_epoch=True, prog_bar=True)
        print(f'val_accuracy : {val_acc}, val_f1 : {val_f1}, val_recall : {val_rec}, val_precision : {val_pre}')
        
    
    def configure_optimizers(self) :
        optimizer = torch.optim.AdamW(self.electra.parameters(), lr=self.learning_rate)
        lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
        
        return {
            'optimizer' : optimizer,
            'lr_scheduler' : lr_scheduler
        }

In [None]:
def main():
    model = ElectraClassification(learning_rate=0.0001)

    dm = ElectraClassificationDataModule(batch_size=8, train_path='./ratings_train_pre.txt', valid_path='./ratings_test_pre.txt',
                                    max_length=256, sep='\t', doc_col='document', label_col='label', num_workers=1)
    
    checkpoint_callback = pl.callbacks.ModelCheckpoint(monitor='val_accuracy',
                                                    dirpath='./sample_electra_binary_nsmc_chpt',
                                                    filename='KoELECTRA/{epoch:02d}-{val_accuracy:.3f}',
                                                    verbose=True,
                                                    save_last=True,
                                                    mode='max',
                                                    save_top_k=-1,
                                                    )
    
    tb_logger = pl_loggers.TensorBoardLogger(os.path.join('./sample_electra_binary_nsmc_chpt', 'tb_logs'))

    lr_logger = pl.callbacks.LearningRateMonitor()

    trainer = pl.Trainer(
        default_root_dir='./sample_electra_binary_nsmc_chpt/checkpoints',
        logger = tb_logger,
        callbacks = [checkpoint_callback, lr_logger],
        max_epochs=3,
        gpus=1
    )

    trainer.fit(model, dm)

In [None]:
def infer(x, path) :
    model = ElectraClassification.load_from_checkpoint(path)
    tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-small-v3-discriminator")
    
    emojis = ''.join(emoji.UNICODE_EMOJI.keys())  
    pattern = re.compile(f'[^ .,?!/@$%~％·∼()\x00-\x7Fㄱ-힣{emojis}]+')
    url_pattern = re.compile(
        r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)'
    )
    processed = pattern.sub(' ', x)
    processed = url_pattern.sub(' ', processed)
    processed = processed.strip()
    processed = repeat_normalize(processed, num_repeats=2)

    tokenized = tokenizer(processed, return_tensors='pt')

    output = model(tokenized.input_ids, tokenized.attention_mask)
    return nn.functional.softmax(output.logits, dim=-1)