##드라이브 마운트

In [None]:
# 구글 드라이브 연동
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## package 설치

In [None]:
try:
    import transformers, emoji, soynlp, pytorch_lightning
except:
    !pip install -U -q transformers emoji soynlp pytorch-lightning

In [None]:
!pip3 install adamp

### 패키지 import & 기본 Args 설정

In [None]:
import os 
import pandas as pd

from pprint import pprint

# 텐서 등의 다양한 수학 함수가 포함되어져 있으며 Numpy와 유사한 구조
import torch
#  torch.utils.data: SGD의 반복 연산을 실행할 때 사용하는 미니 배치용 유틸리티 함수가 포함되어져 있습니다.
from torch.utils.data import Dataset, DataLoader, TensorDataset
# lr_scheduler: 학습과정에서 learning rate를 조정
# torch.optim: 확률적 경사하강법을 중심으로 한 파라미터 최적화 알고리즘이 구현
# ExponentialLR: 매 epoch마다 이전 lr에 gamma만큼 곱해서 사용
from torch.optim.lr_scheduler import ExponentialLR

# pytorch_lightning: 정돈된 코드 스타일을 갖추기 위해 사용
# LightningModule: 모델 내부의 구조를 설계하는 research & science 클래스, 모델의 구조나 데이터 전처리, 손실함수 등의 설정 등을 통해 모델을 초기화
from pytorch_lightning import LightningModule, Trainer, seed_everything

from transformers import ElectraForSequenceClassification, AutoTokenizer, AdamW
# from adamp import AdamP
# precision_score: 양성 클래스에 속한다고 출력한 샘플 중 실제로 양성 클래스에 속하는 샘플 수의 비율, 높을수록 좋은 모형
# recall_score: 실제 양성 클래스에 속한 표본 중에 양성 클래스에 속한다고 출력한 표본의 수의 비율, 높을수록 좋은 모형
# f1_score: 정밀도와 재현율의 가중조화평균
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import re
import emoji
from soynlp.normalizer import repeat_normalize

### 기본 학습 Arguments

In [None]:
args = {
    'random_seed':42, # Random Seed, 동일한 난수를 발생시키기 위해
    'pretrained_model': 'beomi/KcELECTRA-base',  # Transformers PLM name
    'pretrained_tokenizer': '',  # Optional, Transformers Tokenizer Name. Overrides `pretrained_model`
    'batch_size': 64,
    'lr': 2e-5,  # Starting Learning Rate
    'epochs': 6,  # Max Epochs
    'max_length': 32,  # Max Length input size
    'train_data_path': "/content/drive/MyDrive/opinionlive/data/0824_train_80%.xlsx",  # Train Dataset file 
    'val_data_path': "/content/drive/MyDrive/opinionlive/data/0824_test_20%.xlsx",  # Validation Dataset file 
    'test_mode': False,  # Test Mode enables `fast_dev_run`
    'optimizer': 'AdamP',  # AdamW vs AdamP
    'lr_scheduler': 'exp',  # ExponentialLR vs CosineAnnealingWarmRestarts
    'fp16': True,  # Enable train on FP16(if GPU)
    'tpu_cores': 0,  # Enable TPU with 1 core or 8 cores
    'cpu_workers': os.cpu_count(),
}

## Pytorch Lightning으로 모델 만들기

In [None]:
acc_test = pd.DataFrame(columns=['train', 'test'])
loss_test = pd.DataFrame(columns=['train', 'test'])

In [None]:
train_acc = []
train_loss = []
test_acc = []
test_loss = []
a = 0

In [None]:
# 모델 클래스
class Model(LightningModule):
    def __init__(self, **kwargs):   #kwargs: keyword argument의 줄임말로 키워드를 제공, 모든 값을 모델 입력에 직접 넣어주는 것은 상당히 귀찮기 때문에 사용 
        super().__init__() #__init__ 클래스를 선언하는 순간  실행되는 함수
        self.save_hyperparameters() # 이 부분에서 self.hparams에 위 kwargs가 저장된다.
        
        self.clsfier = ElectraForSequenceClassification.from_pretrained(self.hparams.pretrained_model,num_labels=8)
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.hparams.pretrained_tokenizer
            if self.hparams.pretrained_tokenizer
            else self.hparams.pretrained_model
        )
    # forward: foward() 함수는 모델이 학습데이터를 입력받아서 forward 연산을 진행시키는 함수
    def forward(self, **kwargs):
        return self.clsfier(**kwargs)

    # step 함수: 활성화 함수 중 하나
    def step(self, batch, batch_idx):
        data, labels = batch
        output = self(input_ids=data, labels=labels)

        # Transformers 4.0.0+ 트랜스포머 자체를 좀 공부해야함
        
        loss = output.loss
        logits = output.logits

        # dim은 차원을 설정해주는 dim= -1 마지막 차원을 제거한다는 의미 2차원이면 열의 차원을 제거한다는 의미
        preds = logits.argmax(dim=-1) 

        #tensor값만 뽑기 위해 labes.cpu().numpy()를 하는 것
        y_true = list(labels.cpu().numpy())  
        y_pred = list(preds.cpu().numpy())


        return {
            'loss': loss,
            'y_true': y_true,
            'y_pred': y_pred,
        }
    #모델 학습 루프(Training, Validation, Test Loop)

    def training_step(self, batch, batch_idx):
        return self.step(batch, batch_idx) 

    def validation_step(self, batch, batch_idx):
        return self.step(batch, batch_idx)

      #  1 epoch 종료
    def epoch_end(self, outputs, state='train'):
        loss = torch.tensor(0, dtype=torch.float)
        for i in outputs:
            loss += i['loss'].cpu().detach()
        loss = loss / len(outputs)

        y_true = []
        y_pred = []
        for i in outputs:
            y_true += i['y_true']
            y_pred += i['y_pred']
        
        acc = accuracy_score(y_true, y_pred)
        # average = 'micro' 추가
        prec = precision_score(y_true, y_pred,average='micro')
        rec = recall_score(y_true, y_pred,average='micro')
        f1 = f1_score(y_true, y_pred,average='micro')

        self.log(state+'_loss', float(loss), on_epoch=True, prog_bar=True)
        self.log(state+'_acc', acc, on_epoch=True, prog_bar=True)
        self.log(state+'_precision', prec, on_epoch=True, prog_bar=True)
        self.log(state+'_recall', rec, on_epoch=True, prog_bar=True)
        self.log(state+'_f1', f1, on_epoch=True, prog_bar=True)
        
        # global: 전역변수 설정 그래프를 그리기 위해
        global a
        if a == 0 : 
          test_acc.append(acc)
          test_loss.append(loss)
          a = 1
        elif a == 1 :
          train_acc.append(acc)
          train_loss.append(loss)
          a = 0
          
        print(f'[Epoch {self.trainer.current_epoch} {state.upper()}] Loss: {loss}, Acc: {acc}, Prec: {prec}, Rec: {rec}, F1: {f1}')
        return {'loss': loss}
    
    def training_epoch_end(self, outputs):
        self.epoch_end(outputs, state='train')

    def validation_epoch_end(self, outputs):
        self.epoch_end(outputs, state='val')
    
    #optimizer 설정
    def configure_optimizers(self):
        if self.hparams.optimizer == 'AdamW':
            optimizer = AdamW(self.parameters(), lr=self.hparams.lr)
        elif self.hparams.optimizer == 'AdamP':
            from adamp import AdamP
            optimizer = AdamP(self.parameters(), lr=self.hparams.lr)
        else:
            raise NotImplementedError('Only AdamW and AdamP is Supported!')
        if self.hparams.lr_scheduler == 'cos':
            scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=1, T_mult=2)
        elif self.hparams.lr_scheduler == 'exp':
            scheduler = ExponentialLR(optimizer, gamma=0.5)
        else:
            raise NotImplementedError('Only cos and exp lr scheduler is Supported!')
        return {
            'optimizer': optimizer,
            'scheduler': scheduler,
        }

    #파일 형식 
    def read_data(self, path):
        if path.endswith('xlsx'):
            return pd.read_excel(path)
        elif path.endswith('csv'):
            return pd.read_csv(path)
        elif path.endswith('tsv') or path.endswith('txt'):
            return pd.read_csv(path, sep='\t')
        else:
            raise NotImplementedError('Only Excel(xlsx)/Csv/Tsv(txt) are Supported')
    
    def clean(self, x):
        emojis = ''.join(emoji.UNICODE_EMOJI.keys())
        pattern = re.compile(f'[^ .,?!/@$%~％·∼()\x00-\x7Fㄱ-힣{emojis}]+')
        url_pattern = re.compile(
            r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)')
        x = pattern.sub(' ', x)
        x = url_pattern.sub('', x)
        x = x.strip()
        x = repeat_normalize(x, num_repeats=2)
        return x

    def encode(self, x, **kwargs):
        return self.tokenizer.encode(
            self.clean(str(x)),
            padding='max_length',
            max_length=self.hparams.max_length,
            truncation=True,
            **kwargs,
        )

    def preprocess_dataframe(self, df):
        df['command'] = df['command'].map(self.encode)
        return df

    # shuffle=False: 순차적 분할
    def dataloader(self, path, shuffle=False):
        df = self.read_data(path)
        df = self.preprocess_dataframe(df)

        # TensorDataset은 Dataset을 상속한 클래스로 학습 데이터 X와 레이블 Y를 묶어 놓는 컨테이너
        # TensorDataset을 DataLoader에 전달하면 for 루프에서 데이터의 일부분만 간단히 추출
        dataset = TensorDataset(
            torch.tensor(df['command'].to_list(), dtype=torch.long),
            torch.tensor(df['label'].to_list(), dtype=torch.long),
        )

        return DataLoader(
            dataset,
            batch_size=self.hparams.batch_size * 1 if not self.hparams.tpu_cores else self.hparams.tpu_cores,
            shuffle=shuffle,
            num_workers=self.hparams.cpu_workers,
        )

    def train_dataloader(self):
        return self.dataloader(self.hparams.train_data_path, shuffle=True)

    def val_dataloader(self):
        return self.dataloader(self.hparams.val_data_path, shuffle=False)


In [None]:
from pytorch_lightning.callbacks import ModelCheckpoint

checkpoint_callback = ModelCheckpoint(
    filename='epoch{epoch}-val_acc{val_acc:.4f}',
    # monitor: 어떤 metric을 기준으로 체크포인트를 저장할지 지정
    monitor='val_acc',
    # save_top_k: 최대 몇 개의 체크포인트를 저장할지 지정, save_last에 의해 저장되는 체크포인트는 제외 
    save_top_k=3,
    # mode: 지정한 metric의 어떤 기준(min, max)으로 체크포인트를 저장할지 지정
    mode='max',
    auto_insert_metric_name=False,
)

## 학습하기

In [None]:
print("Using PyTorch Ver", torch.__version__)
print("Fix Seed:", args['random_seed'])
seed_everything(args['random_seed'])
model = Model(**args)

print(":: Start Training ::")
trainer = Trainer(
    callbacks=[checkpoint_callback],
    max_epochs=args['epochs'],
    fast_dev_run=args['test_mode'],
    num_sanity_val_steps=None if args['test_mode'] else 0,
    # For GPU Setup
    deterministic=torch.cuda.is_available(),
    gpus=[0] if torch.cuda.is_available() else None,  # 0번 idx GPU  사용
    precision=16 if args['fp16'] and torch.cuda.is_available() else 32,
    # For TPU Setup
    # tpu_cores=args['tpu_cores'] if args['tpu_cores'] else None,
)
trainer.fit(model)

### accuracy, loss값 그래프 시각화

In [None]:
for i in range(args['epochs']):
  acc_test.loc[i, 'train'] = test_acc[i]
  acc_test.loc[i, 'test'] = train_acc[i]

In [None]:
for i in range(args['epochs']):
  loss_test.loc[i, 'train'] = float(test_loss[i].numpy())
  loss_test.loc[i, 'test'] = float(train_loss[i].numpy())

In [None]:
import matplotlib.pyplot as plt
acc_test.plot()
plt.ylim([0.0, 1.1])     # Y축의 범위: [ymin, ymax]
# plt.title("정확도")
plt.xlabel("epochs")
plt.ylabel("acc")
plt.show()

In [None]:
import matplotlib.pyplot as plt
loss_test.plot()
plt.ylim([0.0, 2.1])     # Y축의 범위: [ymin, ymax]
# plt.title("정확도")
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

## Inference

In [None]:
from glob import glob
sorted(glob('./lightning_logs/version_0/checkpoints/*.ckpt'))

In [None]:
from glob import glob

latest_ckpt = sorted(glob('./lightning_logs/version_0/checkpoints/*.ckpt'))[-1]
latest_ckpt

In [None]:
model = Model.load_from_checkpoint(latest_ckpt)

In [None]:
model = joblib.load('/content/drive/MyDrive/opinionlive/model_pkl/KoElectraModel.pkl')

def infer(x):
    return torch.softmax(
        model(**model.tokenizer(x, return_tensors='pt')
    ).logits, dim=-1), print("          1:명령, 2:약속, 3:정표, 4:기대, 5:주장, 6:갈등, 7:진술, 8:질문")


In [None]:
infer("사랑스러운 수미니")

##모델저장

In [None]:
import joblib
joblib.dump(model, '/content/drive/MyDrive/opinionlive/model_pkl/kcElectraModel.pkl')